环球时讯:聊聊Flink必知必会(四)
概述
Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。
(资料图)
Flink明确支持以下3个不同的时间概念。(1)事件时间:事件发生的时间,由产生(或存储)事件的设备记录。
(2)接入时间:Flink在接入事件时记录的时间戳。
(3)处理时间:管道中特定操作符处理事件的时间。
支持事件时间的流处理器需要一种方法来度量事件时间的进度。在Flink中测量事件时间进展的机制是水印(watermark)。水印是一种特殊类型的事件,是告诉系统事件时间进度的一种方式。水印流是数据流的一部分,并带有时间戳t。水印(t)声明事件时间已经到达该流中的时间t,这意味着时间戳t′≤t(时间戳更早或等于水印的事件)的流中不应该有更多的元素。
Flink中水印的处理
水印的时间戳
时间t的水印标记了数据流中的一个位置,并断言此时的流在时间t之前已经完成。为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要包含水印的流。水印就是系统事件时间的时钟。水印触发是基于事件时间的计时器的触发。
事件流的类型有两种,一个是顺序的,一个是无序的。先看顺序场景下,水印的排列。
对于无序流,水印是至关重要的,其中事件不是按照它们的时间戳排序的。
例如,当操作符接收到w(11)这条水印时,可以认为时间戳小于或等于11的数据已经到达,此时可以触发计算。同样,当接收到w(17)这条水印时,可以认为时间戳小于或等于17的数据已经到达,此时可以触发计算。
可以看出,水印的时间戳是单调递增的,时间戳为t的水印意味着所有后续记录的时间戳将大于t。一般来讲,水印是一种声明,在流中的那个点之前,即在某个时间戳之前的所有事件都应该已经到达。
水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常可以独立地生成水印。这些水印定义了特定并行源处的事件时间。
水印的生成
Flink提供了用于处理事件时间、时间戳和水印的API。
为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过TimestampAssigner从元素中的某个字段访问/提取时间戳实现的。
Flink提供了两种方式创建水印。
1.使用WatermarkStrategy上的静态辅助方法实现公共水印策略:
2.实现WatermarkStrategy接口,自定义TimestampAssigner与WatermarkGenerator捆绑在一起:
@Publicpublic interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier { @Override WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); @Override default TimestampAssigner createTimestampAssigner( TimestampAssignerSupplier.Context context) { return new RecordTimestampAssigner<>(); } @Experimental default WatermarkAlignmentParams getAlignmentParameters() { return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED; } default WatermarkStrategy withTimestampAssigner( TimestampAssignerSupplier timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner); } default WatermarkStrategy withTimestampAssigner( SerializableTimestampAssigner timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>( this, TimestampAssignerSupplier.of(timestampAssigner)); } default WatermarkStrategy withIdleness(Duration idleTimeout) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); return new WatermarkStrategyWithIdleness<>(this, idleTimeout); } @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift) { return withWatermarkAlignment( watermarkGroup, maxAllowedWatermarkDrift, WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL); } @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) { return new WatermarksWithWatermarkAlignment( this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval); } static WatermarkStrategy forMonotonousTimestamps() { return (ctx) -> new AscendingTimestampsWatermarks<>(); } static WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); } static WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) { return generatorSupplier::createWatermarkGenerator; } static WatermarkStrategy noWatermarks() { return (ctx) -> new NoWatermarksGenerator<>(); }}
这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是createWatermarkGenerator
方法。
所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator。
@Publicpublic interface WatermarkGenerator {/** * Called for every event, allows the watermark generator to examine and remember the * event timestamps, or to emit a watermark based on the event itself. */void onEvent(T event, long eventTimestamp, WatermarkOutput output);/** * Called periodically, and might emit a new watermark, or not. * * The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. */void onPeriodicEmit(WatermarkOutput output);}
这个方法简单明了,主要是有两个方法:
onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.
onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:
env.getConfig().setAutoWatermarkInterval(5000L)
标签:
推荐文章
- 饭店内2万余元黄金首饰被盗,3小时后警察找到了“最不可能的她”
- 为了防止造反,明朝不准王爷进京,清朝不准王爷出京,谁更高明
- 慈善夜市什么样?来半淞浦西世博园区看看吧
- 兰州市出台《关于进一步促进房地产市场平稳健康发展的若干措施》
- ⚽荷乙战报:赫尔肯斯头槌制胜 威廉二世2-1逆转马斯特里赫特
- 四合院开局一只旅行青蛙(旅行青蛙 明信片)
- 詹宁斯谈明年奥运首发:库里、布克、詹姆斯、杜兰特、浓眉
- 东西问·汉学家丨美国汉学家邰谧侠:《道德经》缘何成为外译最多的中国典籍?
- 顺发恒业(000631.SZ):选举许小建为董事长
- 非遗文化+群众体育!普宁连续24年举办7届运动会
- 13岁女孩被老师扇致耳膜穿孔,事发数月老师仍在上课,学校回应
- 杨幂男友曝光?周冬雨跑路?杜华背刺孟美岐?檀健次被代言退货?
- 2023内蒙古鄂尔多斯市党群部门所属事业单位招聘工作人员考试成绩汇总的公告
- 祥源新材(300980.SZ):研发生产的聚烯烃发泡材料已进入华为的供应链体系中,通过下游模切厂供给至华为
- 免票、半价!徐州最新发布!
- 华为Mate60和Mate50配置对比!5499起“加量不加价”
- 轩尼诗Hennessy亚洲首店将于上海前滩太古里开业
- 定西宽粉“圈粉”无数
- 河南各级工会三年消费援疆超4.5亿元
- 谨防文具盲盒“刺伤”孩子
- 一天三场重磅活动 温州新能源产业风鼓满帆
- 爱柯迪(600933)8月31日主力资金净卖出1283.43万元
- 机械革命鲨疯了 R7-7840H迷你主机2999元!
- 中贝通信:算力租赁业务尚未产生收入
- 金辉控股上半年净利润约7.37亿元,二线及核心三线土储占比为97%
- 街边烟火气聚拢起人气和财气
- 崩坏星穹铁道往复不止怎么达成
- 苏州五批次共计成交金额42.9亿元,2宗地块封顶摇号
- 华泰证券(06886):“21华泰11”将于9月7日付息
- 宣战书(关于宣战书介绍)
- 离婚后对方不给抚养费可以申请法院强制执行吗?
- 中国智能手表市场:华为居首 苹果、小米居二
- 中央气象台升级发布台风红色预警
- 吴子嘉曝郭民调再往下掉 没能力就不要玩了
- 阿特斯:8月30日融资买入2477万元,融资融券余额2.86亿元
- 180家上市公司中报分红,减持新规下3家“铁公鸡”终分红
- 今日中国组合timez组合解散了吗(能说实话吗 TimeZ和EXO那个人气更高呢为什么我觉得TimeZ有些弱呢)
- 治安管理处罚法修订草案首次提请审议,这些看点值得关注
- 强度堪比“杜苏芮”!新台风会影响江西!接下来天气……
- 龙湖集团2023年第一期中期票据成功发行,规模为11亿元
- 四箭齐发,大牛市来了?
- |不一样的新村民,舞出岱岳新风情
- 莱西格(关于莱西格的简介)
- 良品铺子行走的CD夏日歌会落地武汉美术馆,本周五将迎来收官之战
- 中集集团上半年营收606亿元 储能业务在手订单逾10亿元
- 天风证券:给予科前生物买入评级
- 国产芯片制造的突破手,青岛芯恩-澳柯玛
- 农业农村部:科学规范开展增殖放流 推进长江大保护工作
- 福田雷萨北方战区营销业务交流会圆满召开
- 奋力夺取秋粮好收成
- 【动脉严选新品鉴第27期】安速康医疗:首款国产分体式设计无主机超声刀
- AH300ETF:融资净偿还1.15万元,融资余额97.29万元(08-29)
- 食品安全板块8月29日涨1.78%,新 大 陆领涨,主力资金净流出1.11亿元
- 创建文明城市从我做起征文800字开头 创建文明城市从我做起征文
- [meteortale]被收养的姐弟 Mokiet&Undyne
- 海绵是什么材料做的(海绵是什么材料)
- 皮蛋“拌”香蕉 沉浸式艺术展登陆上海
- 曝北京首钢男篮下赛季季后赛主场定在首都体育馆
- 收评:两市收高创业板指涨近3% 科创50指数大涨超4% 资金爆买相关ETF
- 莫里斯·斯泰因(关于莫里斯·斯泰因简述)
- 生物医药产业高质量发展战略咨询会召开
- 北京来论:两岸经贸关系必须回归正常化轨道
- 中国电研:8月28日融资买入445.01万元,融资融券余额1.06亿元
- 襄阳市樊城区明晶巷社区:打造睦邻调解室,巧化百姓烦心事
- 浙能电力:8月28日融资买入2202.59万元,融资融券余额2.4亿元
- 国海证券给予马应龙买入评级,2023 年中报点评:基数影响上半年销售费用高增长,短期拖累利润端
- 残保金2019年2015号优惠政策(残保金2019)
- 财政部、国家税务总局公布延续一批个人所得税优惠政策
- 自行车和棒球来了!明天10点,杭州亚运会又有8个项目开票
- 国家二级保护动物雨中“落难”,警民联手救助
- 【互动掘金】泰胜风能:目前在手订单充裕 各生产基地产能利用较为饱满
- 二维码 推动数字时代发展新变革
- 公安河东分局开展平安建设主题宣传活动
- 券商观点|食品饮料行业研究周报:中国啤酒的高端化,道阻且长,溯游从之
- 暴雨将至!台风“苏拉”逐渐靠近,海南未来天气
- 瑞尔特8月28日快速反弹
- 悄无声息的意思(别具匠心的意思)
- “不只选好作品,更要选好苗子”——2023青创赛侧记
- 【碧蓝档案×明日方舟】当罗德岛误入基沃托斯27:初入灰鸡窝的日富美
- 发泄小游戏下载(可以发泄的小游戏)
- 凯隐技能机制(lol凯隐技能)
- 中国石化:上半年净利同比降20.1% 拟8亿至15亿元回购股份
- 摩托罗拉耳机(关于摩托罗拉耳机的基本详情介绍)
- “苏拉”已升至超强台风,预报路径直指福建?未来泉州天气……
- 睡前八个动作瘦小腿
- 红楼梦:蘅芜苑是大荒山?薛宝钗是山鬼转世?
- qq涂鸦简单画法撩人(qq涂鸦怎么画好看)
- 奥迪时间怎么设置(奥迪时间怎么调整?)
- 郭艾伦:开局心态调整的不错&尤其张镇麟 替补上来节奏有点停滞
- 盘古越狱工具怎么用(盘古越狱工具)
- Target推进布局区域物流网络 提供就近配送服务
- 科威尔(688551.SH):拟使用不超4.5亿元暂时闲置募集资金进行现金管理
- 老师补课学生提高60分,家长明知违规却引而不发,原来是时机未到
- 多措并举促消费丨税惠扮靓夜经济
X 关闭
资讯
X 关闭