flink窗口计算最后一个窗口不关闭计算??

KerryLi 发布于 06/30 16:25
阅读 205
收藏 0

开源之夏第三届火热来袭,高校学生参与赢万元奖金!>>>

当使用flink时间窗口实时计算时候, 如果没有后续数据,最后一个窗口永远不会关闭计算,,

使用事件时间(源数据中的 create_time)进行窗口计算。为方便观察,create_time1为格式化后的时间

任务条件,使用滑动时间窗口 ,窗口大小10s,滑动步长5s, 通过app_id分组,count >5 进行数据输出

发送测试kafka数据:

发送第1条:{"create_time1":"2022-06-30 16:10:48","create_time":1656576648431,"id":"46fb2b26-7fff-47b9-80de-d0cd7b713867_0","app_id":"000"}
发送第2条:{"create_time1":"2022-06-30 16:10:50","create_time":1656576650328,"id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}
发送第3条:{"create_time1":"2022-06-30 16:10:51","create_time":1656576651369,"id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}
发送第4条:{"create_time1":"2022-06-30 16:10:52","create_time":1656576652381,"id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}
发送第5条:{"create_time1":"2022-06-30 16:10:53","create_time":1656576653415,"id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}
发送第6条:{"create_time1":"2022-06-30 16:10:54","create_time":1656576654422,"id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}
发送第7条:{"create_time1":"2022-06-30 16:10:55","create_time":1656576655441,"id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}
发送第8条:{"create_time1":"2022-06-30 16:10:56","create_time":1656576656464,"id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}
发送第9条:{"create_time1":"2022-06-30 16:10:57","create_time":1656576657488,"id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}
发送第10条:{"create_time1":"2022-06-30 16:10:58","create_time":1656576658507,"id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}
发送第11条:{"create_time1":"2022-06-30 16:10:59","create_time":1656576659516,"id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}
发送第12条:{"create_time1":"2022-06-30 16:11:00","create_time":1656576660537,"id":"76b69295-69be-453e-bfed-80638b61cf9a_11","app_id":"000"}
发送第13条:{"create_time1":"2022-06-30 16:11:01","create_time":1656576661557,"id":"28947a33-7adc-416d-9ca8-5a1bb2d06531_12","app_id":"000"}
发送第14条:{"create_time1":"2022-06-30 16:11:02","create_time":1656576662574,"id":"91663273-4611-46a0-a7ad-96d14294d58d_13","app_id":"000"}
发送第15条:{"create_time1":"2022-06-30 16:11:03","create_time":1656576663594,"id":"d8e1ec82-9d82-4e20-9e22-d247d7537e2f_14","app_id":"000"}
发送第16条:{"create_time1":"2022-06-30 16:11:04","create_time":1656576664614,"id":"e6abce11-4464-44d5-b8ef-844fe7079543_15","app_id":"000"}
发送第17条:{"create_time1":"2022-06-30 16:11:05","create_time":1656576665622,"id":"0da49e6c-b3cc-414b-8d87-6abb5559885b_16","app_id":"000"}
发送第18条:{"create_time1":"2022-06-30 16:11:06","create_time":1656576666643,"id":"491b9e57-1b86-4138-b969-dab753162cb8_17","app_id":"000"}
发送第19条:{"create_time1":"2022-06-30 16:11:07","create_time":1656576667663,"id":"f69752bc-b37d-4f75-be1c-1c30fae18a2f_18","app_id":"000"}
发送第20条:{"create_time1":"2022-06-30 16:11:08","create_time":1656576668675,"id":"80a0a203-86d8-4dc3-bb5a-acd112eb0534_19","app_id":"000"}

 

flink任务运行日志:

窗口计算时间:2022-06-30 16:11:00,timeWindow:,窗口start:2022-06-30 16:10:40,窗口end:2022-06-30 16:10:50,窗口数据大小:1
窗口计算时间:2022-06-30 16:11:05,timeWindow:,窗口start:2022-06-30 16:10:45,窗口end:2022-06-30 16:10:55,窗口数据大小:6
窗口产生告警时间:2022-06-30 16:11:05,窗口数据,size:6,list:[{"create_time1":"2022-06-30 16:10:48","as":"A","create_time":1656576648431,"event_type_value":"single","id":"46fb2b26-7fff-47b9-80de-d0cd7b713867_0","app_id":"000"}, {"create_time1":"2022-06-30 16:10:50","as":"A","create_time":1656576650328,"event_type_value":"single","id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}, {"create_time1":"2022-06-30 16:10:51","as":"A","create_time":1656576651369,"event_type_value":"single","id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}, {"create_time1":"2022-06-30 16:10:52","as":"A","create_time":1656576652381,"event_type_value":"single","id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}, {"create_time1":"2022-06-30 16:10:53","as":"A","create_time":1656576653415,"event_type_value":"single","id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}, {"create_time1":"2022-06-30 16:10:54","as":"A","create_time":1656576654422,"event_type_value":"single","id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}]
窗口计算时间:2022-06-30 16:11:10,timeWindow:,窗口start:2022-06-30 16:10:50,窗口end:2022-06-30 16:11:00,窗口数据大小:10
窗口产生告警时间:2022-06-30 16:11:10,窗口数据,size:10,list:[{"create_time1":"2022-06-30 16:10:50","as":"A","create_time":1656576650328,"event_type_value":"single","id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}, {"create_time1":"2022-06-30 16:10:51","as":"A","create_time":1656576651369,"event_type_value":"single","id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}, {"create_time1":"2022-06-30 16:10:52","as":"A","create_time":1656576652381,"event_type_value":"single","id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}, {"create_time1":"2022-06-30 16:10:53","as":"A","create_time":1656576653415,"event_type_value":"single","id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}, {"create_time1":"2022-06-30 16:10:54","as":"A","create_time":1656576654422,"event_type_value":"single","id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}, {"create_time1":"2022-06-30 16:10:55","as":"A","create_time":1656576655441,"event_type_value":"single","id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}, {"create_time1":"2022-06-30 16:10:56","as":"A","create_time":1656576656464,"event_type_value":"single","id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}, {"create_time1":"2022-06-30 16:10:57","as":"A","create_time":1656576657488,"event_type_value":"single","id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}, {"create_time1":"2022-06-30 16:10:58","as":"A","create_time":1656576658507,"event_type_value":"single","id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}, {"create_time1":"2022-06-30 16:10:59","as":"A","create_time":1656576659516,"event_type_value":"single","id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}]
窗口计算时间:2022-06-30 16:11:15,timeWindow:,窗口start:2022-06-30 16:10:55,窗口end:2022-06-30 16:11:05,窗口数据大小:10
窗口产生告警时间:2022-06-30 16:11:15,窗口数据,size:10,list:[{"create_time1":"2022-06-30 16:10:55","as":"A","create_time":1656576655441,"event_type_value":"single","id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}, {"create_time1":"2022-06-30 16:10:56","as":"A","create_time":1656576656464,"event_type_value":"single","id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}, {"create_time1":"2022-06-30 16:10:57","as":"A","create_time":1656576657488,"event_type_value":"single","id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}, {"create_time1":"2022-06-30 16:10:58","as":"A","create_time":1656576658507,"event_type_value":"single","id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}, {"create_time1":"2022-06-30 16:10:59","as":"A","create_time":1656576659516,"event_type_value":"single","id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}, {"create_time1":"2022-06-30 16:11:00","as":"A","create_time":1656576660537,"event_type_value":"single","id":"76b69295-69be-453e-bfed-80638b61cf9a_11","app_id":"000"}, {"create_time1":"2022-06-30 16:11:01","as":"A","create_time":1656576661557,"event_type_value":"single","id":"28947a33-7adc-416d-9ca8-5a1bb2d06531_12","app_id":"000"}, {"create_time1":"2022-06-30 16:11:02","as":"A","create_time":1656576662574,"event_type_value":"single","id":"91663273-4611-46a0-a7ad-96d14294d58d_13","app_id":"000"}, {"create_time1":"2022-06-30 16:11:03","as":"A","create_time":1656576663594,"event_type_value":"single","id":"d8e1ec82-9d82-4e20-9e22-d247d7537e2f_14","app_id":"000"}, {"create_time1":"2022-06-30 16:11:04","as":"A","create_time":1656576664614,"event_type_value":"single","id":"e6abce11-4464-44d5-b8ef-844fe7079543_15","app_id":"000"}]

观察窗口计算,发现,窗口到 窗口start:2022-06-30 16:10:55,窗口end:2022-06-30 16:11:05 后就无后续窗口计算, 

但是根据发送的源数据,应该还会有   窗口start:2022-06-30 16:11:00,窗口end:2022-06-30 16:11:10

 窗口start:2022-06-30 16:11:05,窗口end:2022-06-30 16:11:15,

没有后续进来数据,这两个为何不闭窗计算,,?

这是代码中watermark的生成和事件时间的提取, 如果watermark使用System.currentTimeMillis() - maxOutOfOrderness

窗口会闭合计算,但是如果源数据是比较老的数据,就永远不会闭窗计算?

public class CustomTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
    private static final long serialVersionUID = -22117297510152169L;
    private long maxOutOfOrderness;
    private long currentMaxTimestamp = 0l;
    private String eventTimeFieldName;

    CustomTimestampsAndWatermarks(Window window) {
        this.maxOutOfOrderness = window.getMaxOutOfOrderness();
        this.eventTimeFieldName = window.getAttrField();


    }

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
        try {
            long timestamp = element.getLong(this.eventTimeFieldName);
            currentMaxTimestamp = Math.max(currentMaxTimestamp,timestamp);
            return timestamp;
        } catch (JSONException e) {
            log.error("", e);
        }
        return previousElementTimestamp;
    }
}

 

问题在于, 怎么才能 在使用事件时间的基础上,发送watermark,能在最后的窗口闭合计算?

 

加载中
1
KerryLi
KerryLi

虽然没人回答, 但是记录下解决方法

解决思路: 当没数据进来的时候, 也要向后推进watermark

代码如下

inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new EventWaterMarkInterval(window))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject, long l) {
                        return jsonObject.getLong(window.getAttrField());
                    }
                }));

 

 

自定义生成WaterMark

package com.idss.utils;

import com.alibaba.fastjson.JSONObject;
import com.idss.domain.window.Window;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.java.tuple.Tuple2;

 /**
   * @description: 周期生成水印,当无数据进来的时候, 水印也需要递增,避免最后一个窗口不关闭计算
   * @param
   * @return 
   * @author lirui
   * @date 2022/07/01
   */
public class EventWaterMarkInterval implements WatermarkGeneratorSupplier<JSONObject> {
    private static final long serialVersionUID = -2338922000184097299L;
    private final String eventTimeFieldNameJSONObject;
    private long currentMaxTimestamp;
    private String eventTimeFieldName;
    private long maxOutOfOrderness;

    // 当前数据进入的时间
    private long currentDateTimeMillis;


    public EventWaterMarkInterval(Window window) {
        this.maxOutOfOrderness = window.getMaxOutOfOrderness();
        this.eventTimeFieldNameJSONObject = window.getAttrField();
        this.eventTimeFieldName = window.getAttrField();

    }

    @Override
    public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
        return new WatermarkGenerator<JSONObject>() {

            @Override
            public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
                currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));

                // 记录 当前来的数据来到 时间戳
                currentDateTimeMillis = System.currentTimeMillis();

            }

            /**
             * 此方法会根据 配置的 setAutoWatermarkInterval 周期定时执行
             * @param watermarkOutput
             */
            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 无后续数据,最后一个窗口不会关闭计算,
//                watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
                // 使用System.currentTimeMillis(),如果第一条数据时间很早, 窗口不会进行关闭计算
//                watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));

                // 无初始数据进入 watermark不用推进, 多并行度时候,会取最小的watermark,依然不会触发窗口,针对数据源的并行度可以设置为1解决此问题。
                if (currentMaxTimestamp - maxOutOfOrderness <= 0) {
                } else {
                    // 3秒内无数据进入,watermark保持向后推进, 确保无数据进入,窗口能闭窗计算
                    if (System.currentTimeMillis() - currentDateTimeMillis >= 3000l) {
                        // 无数据进入,保持watermark递增
                        currentMaxTimestamp = currentMaxTimestamp - maxOutOfOrderness + 3000l;
                        watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

                        // 模拟数据进入 设置数据进入时间
                        currentDateTimeMillis = System.currentTimeMillis();
                    } else {
                        watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
                    }
                }
            }
        };
    }
}

 

不会飞的小龙人
不会飞的小龙人
没毛病,我也是这么处理,就是比较事件数据时间和System系统时间,在通过setAutoWatermarkInterval(1000L)每秒刷新一下水印事件周期,生成最新水印时间。适当的增加一点水印延迟时间,窗口时间到期了就计算,不在等有没有数据时间到达;
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部