I was going through the Community#DOC-1006738 from Oracle related to the concurrency concepts of Flow.Publisher and Flow.Subscriber. There on one can find the Sample code to transform data stream using processor which has these two lines of code, which has left me a little puzzled.
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor =
new MyFilterProcessor<>(s -> s.equals("x"));
Question 1. How could the MyFilterProcessor be of type <String, String> here?
To what I at first thought was, these might have been <String, Boolean> instead, but then that would defy the further definition of the subscriber definition in the next line :-
MyTransformProcessor<String, Integer> transformProcessor =
new MyTransformProcessor<>(s -> Integer.parseInt(s));
Additional note here, unless I explicitly cast(correct) the above as
MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))
I get an error in parseInt reading, cannot be applied to Object.
-- Why do I need to explicitly cast the RHS here? --
Though the code is mostly present in the shared link, yet the useful constructor definitions I am using are
public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
and an identical one for filterProcessor as :-
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyFilterProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
Question. Now with those changes(one after resolving the question 1 and another from the additional note), how can one implement the sample correctly? Or am I simply missing out on something very basic?