“长车程警报”练习的目标是对于持续超过两个小时的出租车车程发出警报。
这应该使用数据流中提供的事件时间时间戳和水位线来完成。
流是无序的,并且可能会在其 START 事件之前处理车程的 END 事件。
END 事件可能会丢失,但你可以假设没有重复的事件,也没有丢失的 START 事件。
仅仅等待 END 事件并计算持续时间是不够的,因为我们希望尽快收到关于长车程的警报。
最终应该清除创建的任何状态。
输入数据是出租车乘车事件的 DataStream
。
所希望的结果应该是一个 DataStream<LONG>
,其中包含持续时间超过两小时的车程的 rideId
。
结果流应打印到标准输出。
ℹ️ 最好在 IDE 的 flink-training 项目中找到这些类,而不是使用本节中源文件的链接。
- Java:
org.apache.flink.training.exercises.longrides.LongRidesExercise
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesExercise
- Java:
org.apache.flink.training.exercises.longrides.LongRidesUnitTest
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesUnitTest
- Java:
org.apache.flink.training.exercises.longrides.LongRidesIntegrationTest
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesIntegrationTest
整体方案
这个练习围绕着使用 KeyedProcessFunction
来管理一些状态和事件时间计时器,
使用这种方法即使在给定 rideId
的 END 事件在 START 之前到达时也能正常工作。
挑战在于弄清楚要使用什么状态和计时器,以及何时设置和清除状态(和计时器)。
阅读参考解决方案的讨论.
项目中提供了参考解决方案:
- Java API:
org.apache.flink.training.solutions.longrides.LongRidesSolution
- Scala API:
org.apache.flink.training.solutions.longrides.scala.LongRidesSolution