buffer pool is destroyed

Hi Flink community,


I have a custom source that emits an user-defined data type, BaseEvent.  The following code works fine when BaseEvent is not POJO.

But, when I changed it to POJO by adding a default constructor, I’m getting “Buffer Pool is destroyed” runtime exception on the Collect method.


            DataStream<BaseEvent> eventStream = see.addSource(new AgoraSource(configFile, instance));


            DataStream<Tuple4<String, Long, Double, String>> result_order = eventStream

                    .filter(e -> e instanceof OrderEvent)

                    .map(e -> (OrderEvent)e)

                    .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)

                    .keyBy(e -> e.f0)


                    .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2))

                    .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, "Order")).returns(info_tuple4);


Any idea?




