codehaus


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Watermark not firing to push data


Hi,
Observations on Watermarks:
Read this great article:
https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy

* Watermark means when for any event TS, when to stop waiting for arrival of earlier events.
* Watermark t means all events with Timestamp < t have already arrived.
* When to push data out - When watermark with TS >= t arrives

Only using incrementing current time for watermark seems to be working correctly but not sure if it aligns up correctly with EventTime processing.
Using the incoming records intervalStart as the Watermark source  for EventTime causes data to not be pushed at all in cases when i have just 5 records in the Source.

My source generation for intervalStart has intervalStart incrementing at a regular interval.
I tried using the intervalStart for my Watermark with a out of order late boundedness of 3 secs.
The AggregateFunction I am using calls the add() fine but never calls the getResult().
My assumption was that the AggregateFunction I am using would push the data to getResult
based on the Watermark based on intervalStart incrementing beyong the previous watermark t.
But it doesn't -is it because I have limited number of input records and once intervalStart gets to the end 
of the input records too fast, it stops incrementing the watermar and hence doesn't push data ?

With System.currentTimeMillis, it happily keeps increasing and hence pushes the data.

Created this class:
public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
    private long bound = 3 * 1000;//3 secs out of order bound in millisecs

    public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        //simply emit a Watermark with every event
        return new Watermark(nextWatermark);
    }

    @Override
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        /*LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057
        long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);//using this stopped pushing recs after a certain time
        return extractedTS;*/
        return System.currentTimeMillis();//incrementing current time

    }