I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340
In my case, I have to test a TumblingProcessingTimeWindow where the process() operator use a ProcessWindowFunction to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)
WordCountPojo is a simple POJO with two field: word and count (you can use Tuple2 if you please)
This is the test I wrote:
@Test
void testCounter() throws Exception {
//create a WindowOperator<Key, Input, Accumulator, Output, Window>
WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
new TimeWindow.Serializer(), //window serializer
WordCountPojo::getWord, //key selector
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
new ListStateDescriptor<>( //window state descriptor (in order to accumulate events inside the window)
"window-content",
TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
new InternalIterableProcessWindowFunction<>(new Counter()), //my custom ProcessWindowFunction to invoke
ProcessingTimeTrigger.create(), //window trigger
0,
null);
//Flink Test Harness
OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
harness.open();
harness.setProcessingTime(10);
//Push data into window
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));
harness.setProcessingTime(3500); //Set processing time in order to trigger the window
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
//push other WordCountPojos to test global counting
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.setProcessingTime(7000); //trigger the window again
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
harness.close();
}
Attention points:
- Type of the accumulator for the
WindowOperator is
Iterable<WordCountPojo> and NOT simply WordCountPojo. This
because my Counter's process() method receive an Iterable and
not a single WordCountPojo (remember that Counter extends
WindowProcessFunction)
WindowOperator's state descriptor parameter is a ListStateDescriptor,
this means that when window collects WordCountPojos (WindowOperatorTest example use a ReducingStateDescriptor that reduce by sum, but I don't need to do these because I've the Counter function that is the function that I want to test)
WindowsOperator's internal window function parameter is of type InternaleIterableProcessWindowFunction. This function wraps my Counter function and is invoked when the window is triggered. Because the window accumulate an Iterable<WordCountPojo> collected by using aListStateDescriptor, when the Counter function is invoked this Iterable is passed as input parameter of the process() method.