The goal of the "Long Ride Alerts" exercise is to provide a warning whenever a taxi ride lasts for more than two hours.
This should be done using the event time timestamps and watermarks that are provided in the data stream.
The stream is out-of-order, and it is possible that the END event for a ride will be processed before its START event.
An END event may be missing, but you may assume there are no duplicated events, and no missing START events.
It is not enough to simply wait for the END event and calculate the duration, as we want to be alerted about the long ride as soon as possible.
You should eventually clear any state you create.
The input data of this exercise is a DataStream
of taxi ride events.
The result of the exercise should be a DataStream<LONG>
that contains the rideId
for rides
with a duration that exceeds two hours.
The resulting stream should be printed to standard out.
ℹ️ Rather than following these links to the sources, you might prefer to open these classes in your IDE.
- Java:
org.apache.flink.training.exercises.longrides.LongRidesExercise
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesExercise
- Java:
org.apache.flink.training.exercises.longrides.LongRidesUnitTest
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesUnitTest
- Java:
org.apache.flink.training.exercises.longrides.LongRidesIntegrationTest
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesIntegrationTest
Overall approach
This exercise revolves around using a KeyedProcessFunction
to manage some state and event time timers,
and doing so in a way that works even when the END event for a given rideId
arrives before the START.
The challenge is figuring out what state and timers to use, and when to set and clear the state (and timers).
Read the discussion of the reference solutions.
Reference solutions:
- Java API:
org.apache.flink.training.solutions.longrides.LongRidesSolution
- Scala API:
org.apache.flink.training.solutions.longrides.scala.LongRidesSolution