I'm trying to use Apache Beam (via Scio) to run a continuous aggregation of the last 3 days of data (processing time) from a streaming source and output results from the earliest, active window every 5 minutes. Earliest meaning the window with the earliest start time, active meaning that the end of the window hasn't yet passed. Essentially I'm trying to get a 'rolling' aggregation by dropping the non-overlapping period between sliding windows.
A visualization of what I'm trying to accomplish with an example sliding window of size 3 days and period 1 day:
early firing - ^ no firing - x
|
** stop firing from this window once time passes this point
^ ^ ^ ^ ^ ^ ^ ^
| | | | | | | | ** stop firing from this window once time passes this point
w1: +====================+^ ^ ^
x x x x x x x | | |
w2: +====================+^ ^ ^
x x x x x x x | | |
w3: +====================+
time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->
I've tried using sliding windows (size=3 days, period=5 min), but they produce a new window for every 3 days/5 min combination in the future and are emitting early results for every window. I tried using trigger = AfterWatermark.pastEndOfWindow(), but I need early results when the job first starts. I've tried comparing the pane data (isLast, timestamp, etc.) between windows but they seem identical.
My most recent attempt, which seems somewhat of a hack, included attaching window information to each key in a DoFn, re-windowing into a fixed window, and attempting to group and reduce to the oldest window from the attached data, but the final reduceByKey doesn't seem to output anything.
DoFn to attach window information
// ValueType is just a case class I'm using for objects
type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]
class Test extends DoFnT {
// Window.toString looks like the following:
// [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
def parseWindow(window: String): Instant = {
Instant.parse(
window
.stripPrefix("[")
.stripSuffix(")")
.split("\\.\\.")(1))
}
@ProcessElement
def process(
context: DoFnT#ProcessContext,
window: BoundedWindow): Unit = {
context.output(
KV.of(
context.element().getKey,
(context.element().getValue, parseWindow(window.toString))
)
)
}
}
sc
.pubsubSubscription(...)
.keyBy(_.key)
.withSlidingWindows(
size = Duration.standardDays(3),
period = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO,
trigger = Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))))))
.reduceByKey(ValueType.combineFunction())
.applyPerKeyDoFn(new Test())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO))
.reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
.saveAsCustomOutput(
TextIO.write()...
)
Any suggestions?