当使用flink时间窗口实时计算时候, 如果没有后续数据,最后一个窗口永远不会关闭计算,,
使用事件时间(源数据中的 create_time)进行窗口计算。为方便观察,create_time1为格式化后的时间
任务条件,使用滑动时间窗口 ,窗口大小10s,滑动步长5s, 通过app_id分组,count >5 进行数据输出
发送测试kafka数据:
发送第1条:{"create_time1":"2022-06-30 16:10:48","create_time":1656576648431,"id":"46fb2b26-7fff-47b9-80de-d0cd7b713867_0","app_id":"000"}
发送第2条:{"create_time1":"2022-06-30 16:10:50","create_time":1656576650328,"id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}
发送第3条:{"create_time1":"2022-06-30 16:10:51","create_time":1656576651369,"id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}
发送第4条:{"create_time1":"2022-06-30 16:10:52","create_time":1656576652381,"id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}
发送第5条:{"create_time1":"2022-06-30 16:10:53","create_time":1656576653415,"id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}
发送第6条:{"create_time1":"2022-06-30 16:10:54","create_time":1656576654422,"id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}
发送第7条:{"create_time1":"2022-06-30 16:10:55","create_time":1656576655441,"id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}
发送第8条:{"create_time1":"2022-06-30 16:10:56","create_time":1656576656464,"id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}
发送第9条:{"create_time1":"2022-06-30 16:10:57","create_time":1656576657488,"id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}
发送第10条:{"create_time1":"2022-06-30 16:10:58","create_time":1656576658507,"id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}
发送第11条:{"create_time1":"2022-06-30 16:10:59","create_time":1656576659516,"id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}
发送第12条:{"create_time1":"2022-06-30 16:11:00","create_time":1656576660537,"id":"76b69295-69be-453e-bfed-80638b61cf9a_11","app_id":"000"}
发送第13条:{"create_time1":"2022-06-30 16:11:01","create_time":1656576661557,"id":"28947a33-7adc-416d-9ca8-5a1bb2d06531_12","app_id":"000"}
发送第14条:{"create_time1":"2022-06-30 16:11:02","create_time":1656576662574,"id":"91663273-4611-46a0-a7ad-96d14294d58d_13","app_id":"000"}
发送第15条:{"create_time1":"2022-06-30 16:11:03","create_time":1656576663594,"id":"d8e1ec82-9d82-4e20-9e22-d247d7537e2f_14","app_id":"000"}
发送第16条:{"create_time1":"2022-06-30 16:11:04","create_time":1656576664614,"id":"e6abce11-4464-44d5-b8ef-844fe7079543_15","app_id":"000"}
发送第17条:{"create_time1":"2022-06-30 16:11:05","create_time":1656576665622,"id":"0da49e6c-b3cc-414b-8d87-6abb5559885b_16","app_id":"000"}
发送第18条:{"create_time1":"2022-06-30 16:11:06","create_time":1656576666643,"id":"491b9e57-1b86-4138-b969-dab753162cb8_17","app_id":"000"}
发送第19条:{"create_time1":"2022-06-30 16:11:07","create_time":1656576667663,"id":"f69752bc-b37d-4f75-be1c-1c30fae18a2f_18","app_id":"000"}
发送第20条:{"create_time1":"2022-06-30 16:11:08","create_time":1656576668675,"id":"80a0a203-86d8-4dc3-bb5a-acd112eb0534_19","app_id":"000"}
flink任务运行日志:
窗口计算时间:2022-06-30 16:11:00,timeWindow:,窗口start:2022-06-30 16:10:40,窗口end:2022-06-30 16:10:50,窗口数据大小:1
窗口计算时间:2022-06-30 16:11:05,timeWindow:,窗口start:2022-06-30 16:10:45,窗口end:2022-06-30 16:10:55,窗口数据大小:6
窗口产生告警时间:2022-06-30 16:11:05,窗口数据,size:6,list:[{"create_time1":"2022-06-30 16:10:48","as":"A","create_time":1656576648431,"event_type_value":"single","id":"46fb2b26-7fff-47b9-80de-d0cd7b713867_0","app_id":"000"}, {"create_time1":"2022-06-30 16:10:50","as":"A","create_time":1656576650328,"event_type_value":"single","id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}, {"create_time1":"2022-06-30 16:10:51","as":"A","create_time":1656576651369,"event_type_value":"single","id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}, {"create_time1":"2022-06-30 16:10:52","as":"A","create_time":1656576652381,"event_type_value":"single","id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}, {"create_time1":"2022-06-30 16:10:53","as":"A","create_time":1656576653415,"event_type_value":"single","id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}, {"create_time1":"2022-06-30 16:10:54","as":"A","create_time":1656576654422,"event_type_value":"single","id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}]
窗口计算时间:2022-06-30 16:11:10,timeWindow:,窗口start:2022-06-30 16:10:50,窗口end:2022-06-30 16:11:00,窗口数据大小:10
窗口产生告警时间:2022-06-30 16:11:10,窗口数据,size:10,list:[{"create_time1":"2022-06-30 16:10:50","as":"A","create_time":1656576650328,"event_type_value":"single","id":"04c813d8-d940-4235-a568-574246f5ca37_1","app_id":"000"}, {"create_time1":"2022-06-30 16:10:51","as":"A","create_time":1656576651369,"event_type_value":"single","id":"31119c94-b707-4cf3-ba13-520f44c23968_2","app_id":"000"}, {"create_time1":"2022-06-30 16:10:52","as":"A","create_time":1656576652381,"event_type_value":"single","id":"727a8172-579c-49b0-819e-dfbfc6fad988_3","app_id":"000"}, {"create_time1":"2022-06-30 16:10:53","as":"A","create_time":1656576653415,"event_type_value":"single","id":"52bf00eb-501d-4a04-a55d-4e9ed3c96738_4","app_id":"000"}, {"create_time1":"2022-06-30 16:10:54","as":"A","create_time":1656576654422,"event_type_value":"single","id":"4cfacdd7-7ff4-4a4b-b0d9-2bd1b5541162_5","app_id":"000"}, {"create_time1":"2022-06-30 16:10:55","as":"A","create_time":1656576655441,"event_type_value":"single","id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}, {"create_time1":"2022-06-30 16:10:56","as":"A","create_time":1656576656464,"event_type_value":"single","id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}, {"create_time1":"2022-06-30 16:10:57","as":"A","create_time":1656576657488,"event_type_value":"single","id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}, {"create_time1":"2022-06-30 16:10:58","as":"A","create_time":1656576658507,"event_type_value":"single","id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}, {"create_time1":"2022-06-30 16:10:59","as":"A","create_time":1656576659516,"event_type_value":"single","id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}]
窗口计算时间:2022-06-30 16:11:15,timeWindow:,窗口start:2022-06-30 16:10:55,窗口end:2022-06-30 16:11:05,窗口数据大小:10
窗口产生告警时间:2022-06-30 16:11:15,窗口数据,size:10,list:[{"create_time1":"2022-06-30 16:10:55","as":"A","create_time":1656576655441,"event_type_value":"single","id":"f0937607-9c59-4ca4-9f43-b624043ba39c_6","app_id":"000"}, {"create_time1":"2022-06-30 16:10:56","as":"A","create_time":1656576656464,"event_type_value":"single","id":"32e106e1-71e0-4c3d-b433-eccd8ed2dce7_7","app_id":"000"}, {"create_time1":"2022-06-30 16:10:57","as":"A","create_time":1656576657488,"event_type_value":"single","id":"1ea5c390-f4f5-4568-af3a-fe9dcb2342cb_8","app_id":"000"}, {"create_time1":"2022-06-30 16:10:58","as":"A","create_time":1656576658507,"event_type_value":"single","id":"24d1db16-25a7-4ffe-9d20-2ff88d0ec462_9","app_id":"000"}, {"create_time1":"2022-06-30 16:10:59","as":"A","create_time":1656576659516,"event_type_value":"single","id":"22e5a704-2d25-4246-8b26-d789f353445b_10","app_id":"000"}, {"create_time1":"2022-06-30 16:11:00","as":"A","create_time":1656576660537,"event_type_value":"single","id":"76b69295-69be-453e-bfed-80638b61cf9a_11","app_id":"000"}, {"create_time1":"2022-06-30 16:11:01","as":"A","create_time":1656576661557,"event_type_value":"single","id":"28947a33-7adc-416d-9ca8-5a1bb2d06531_12","app_id":"000"}, {"create_time1":"2022-06-30 16:11:02","as":"A","create_time":1656576662574,"event_type_value":"single","id":"91663273-4611-46a0-a7ad-96d14294d58d_13","app_id":"000"}, {"create_time1":"2022-06-30 16:11:03","as":"A","create_time":1656576663594,"event_type_value":"single","id":"d8e1ec82-9d82-4e20-9e22-d247d7537e2f_14","app_id":"000"}, {"create_time1":"2022-06-30 16:11:04","as":"A","create_time":1656576664614,"event_type_value":"single","id":"e6abce11-4464-44d5-b8ef-844fe7079543_15","app_id":"000"}]
观察窗口计算,发现,窗口到 窗口start:2022-06-30 16:10:55,窗口end:2022-06-30 16:11:05 后就无后续窗口计算,
但是根据发送的源数据,应该还会有 窗口start:2022-06-30 16:11:00,窗口end:2022-06-30 16:11:10
窗口start:2022-06-30 16:11:05,窗口end:2022-06-30 16:11:15,
没有后续进来数据,这两个为何不闭窗计算,,?
这是代码中watermark的生成和事件时间的提取, 如果watermark使用System.currentTimeMillis() - maxOutOfOrderness
窗口会闭合计算,但是如果源数据是比较老的数据,就永远不会闭窗计算?
public class CustomTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
private static final long serialVersionUID = -22117297510152169L;
private long maxOutOfOrderness;
private long currentMaxTimestamp = 0l;
private String eventTimeFieldName;
CustomTimestampsAndWatermarks(Window window) {
this.maxOutOfOrderness = window.getMaxOutOfOrderness();
this.eventTimeFieldName = window.getAttrField();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
try {
long timestamp = element.getLong(this.eventTimeFieldName);
currentMaxTimestamp = Math.max(currentMaxTimestamp,timestamp);
return timestamp;
} catch (JSONException e) {
log.error("", e);
}
return previousElementTimestamp;
}
}
问题在于, 怎么才能 在使用事件时间的基础上,发送watermark,能在最后的窗口闭合计算?
虽然没人回答, 但是记录下解决方法
解决思路: 当没数据进来的时候, 也要向后推进watermark
代码如下
自定义生成WaterMark