I know that there is similar topic here: Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>, but it does not solve my problem.
Here's my problem:
I'm trying to write a unit test, which is testing Apache Flink pipeline with data stream with window operator.
I have the following code:
inputStream
.keyBy(Event::getSessionId)
.window(SlidingProcessingTimeWindows.of(
Time.milliseconds(2000),
Time.milliseconds(200)
))
.process(new ProcessWindowFunction<Event, Feature, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Event> elements, Collector<Feature> out) {
// process window of events here
out.collect(Feature.newBuilder().build());
}
});
and I'm testing it like this:
@Test
public void shouldRunPipeline() throws Exception {
long sessionId = 1;
long userId = 1;
List<Event> events = new ArrayList<>();
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
events.add(createTouchDownEvent(sessionId, userId, createTsNow()));
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
events.add(createTouchUpEvent(sessionId, userId, createTsNow()));
events.add(createAccEvent(sessionId, userId, createTsNow(), 1f, 2f, 3f));
events.add(createAccEvent(sessionId, userId, createTsNow(), 4f, 5f, 6f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 7f, 8f, 9f));
events.add(createGyroEvent(sessionId, userId, createTsNow(), 10f, 11f, 12f));
CollectSink.values.clear();
DataStream<Event> source = getTestEnv()
.fromCollection(events)
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
getPipeline()
.process(source)
.addSink(new CollectSink());
getTestEnv().execute();
List<Feature> features = CollectSink.values;
assertThat(features).isNotEmpty();
}
private long createTsNow() {
return Instant.now().toEpochMilli();
}
private static class CollectSink implements SinkFunction<Feature> {
public static final List<Feature> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Feature value, Context context) {
values.add(value);
}
}
Method getPipeline() is creating my pipeline, getTestEnv() is getting test environment from the base testing class, and methods with names createXEvent() are my helper methods for creating events for the input stream for test.
I used Sink in the test because I read somewhere that DataStreamUtils.collect() may not work with window operator because data is accumulated in the different way in such case.
I run this test in the debugger and ProcessWindowFunction is never reached!
Once I remove window operator, ProcessFunction (this time KeyedProcessFunction) is reached. This example is shown on the snippet below.
inputStream
.keyBy(Event::getSessionId)
.process(new KeyedProcessFunction<Long, Event, Feature>() {
@Override
public void processElement(Event value, Context ctx, Collector<Feature> out) {
// process events here
out.collect(Feature.newBuilder().build());
}
});
My question is: How to write correct unit test for the stream, which uses window operator, so the instance of ProcessWindowFunction in the process method will be reached?
I could not find any valid solution to this problem in the Apache Flink documentation and on the web.
I'm able to unit test logic of the process function separately, but I'd like to unit test whole pipeline too.
I will appreciate any help.
Regards,
Piotr