The goal of this Ride Enrichment exercise is to join together the TaxiRide and TaxiFare records for each ride.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a TaxiRideSource and the other with TaxiFare events generated by a TaxiFareSource. See Taxi Data Streams for information on how to download the data and how to work with these stream generators.

(Note that if you want to make your solution truly fault tolerant, you should use the CheckpointedTaxiRideSource and CheckpointedTaxiFareSource.)

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. You should ignore the START events, and only join the event for the END of each ride with its corresponding fare data.

The resulting stream should be printed to standard out.

Implementation Hints

You can use a RichCoFlatMap to implement this join operation. Note that you have no control over the order of arrival of the ride END and fare records for each rideId, so you’ll need to be prepared to store either piece of information until the matching info arrives, at which point you can emit a Tuple2<TaxiRide, TaxiFare> joining the two records together.
You should be using Flink’s managed, keyed state to buffer the data that is being held until the matching event arrives. And be sure to clear the state once it is no longer needed.


Reference Solution

Reference solutions are available at GitHub: