codehaus


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Watermark not firing to push data


Hi,
After looking at the code in EventTimeTrigger, I changed the Watermark to be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS was <= watermark. I was able to consumer from Kinesis when I had only 50 records.

For TumblingWindow of 5 secs , the window maxTS was usually like around currTime + 5 secs.
So, I set the watermark to System.currentMillisecs + 5 secs. 
This way, the trigger fired and got into the AggregateFunction.getResult().

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This check had to be met
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan <bvijaykr@xxxxxxxxx> wrote:
Hi,
Thx for your reply and pointers on the currentLowWatermark. Looks like the Flink UI has tab for Watermarks itself for an Operator.

I dump 5 records into the Kinesis Data Stream and am trying to read the same record from the FlinkKinesisConsumer and am not able to.
I am using the same monitoring.getIntervalStart() in the Watermark generation(intervalStart - bound) in MonitoringAssigner class that I used to generate data on the producer side. I generate intervalStart on the Producer side which increments on each record by 3-10 millisecs. The watermark is being generated with intervalStart - bound(3 secs)-so, every watermark generated is > than the previous one. So, why does it not push data out ?  It gets into the MGroupingWindowAggregate.add(..) method but never gets into the MGroupingWindowAggregate.getResult(..) method ?? It works when i produce 1000 records or so into Kinesis data stream.

Here is a gist of my code- 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//FlinkConsumer
Properties kinesisConsumerConfig = new Properties();
        ......
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");//2000
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>(
                kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(kinesisConsumer);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringAssigner(3000));//code at bottom

org.apache.flink.streaming.api.windowing.time.Time timeWindow = Time.seconds(5);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                kinesisStream.timeWindow(timeWindow);
DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = windowStream.aggregate(
                new MGroupingWindowAggregate(....),//AggregateFunction impl
                new MGroupingAggregateWindowProcessing(...));

public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
    private long bound = 3 * 1000;//3 secs out of order bound in millisecs
public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        return new Watermark(nextWatermark);
    }
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057
        long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
        return extractedTS;
        //return System.currentTimeMillis(); //this works fine.
    }
}

TIA,
Vijay

On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
Hi Vijay,

Could you provide more information about your problem? For example
- Which kind of window do you use?
- What's the window size?
- A relatively complete code is better :-)

As for the problem, it is probably the event time has not reached the end of the window. You can monitor the watermark in the web dashboard[1].
Also, changing even time to processing time is another way to verify if it is a watermark problem.

Best, Hequn



On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvijaykr@xxxxxxxxx> wrote:
Hi,
Observations on Watermarks:
Read this great article:

* Watermark means when for any event TS, when to stop waiting for arrival of earlier events.
* Watermark t means all events with Timestamp < t have already arrived.
* When to push data out - When watermark with TS >= t arrives

Only using incrementing current time for watermark seems to be working correctly but not sure if it aligns up correctly with EventTime processing.
Using the incoming records intervalStart as the Watermark source  for EventTime causes data to not be pushed at all in cases when i have just 5 records in the Source.

My source generation for intervalStart has intervalStart incrementing at a regular interval.
I tried using the intervalStart for my Watermark with a out of order late boundedness of 3 secs.
The AggregateFunction I am using calls the add() fine but never calls the getResult().
My assumption was that the AggregateFunction I am using would push the data to getResult
based on the Watermark based on intervalStart incrementing beyong the previous watermark t.
But it doesn't -is it because I have limited number of input records and once intervalStart gets to the end 
of the input records too fast, it stops incrementing the watermar and hence doesn't push data ?

With System.currentTimeMillis, it happily keeps increasing and hence pushes the data.

Created this class:
public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
    private long bound = 3 * 1000;//3 secs out of order bound in millisecs

    public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        //simply emit a Watermark with every event
        return new Watermark(nextWatermark);
    }

    @Override
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        /*LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057
        long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);//using this stopped pushing recs after a certain time
        return extractedTS;*/
        return System.currentTimeMillis();//incrementing current time

    }