The goal of this Ride Enrichment exercise is to join together the
TaxiFare records for each ride in a more robust way than we did in an earlier exercise.
The problem with using a
RichCoFlatMap for this application is that in a real-world system we have to expect that some records will be lost or corrupted. This means that over time we will accumulate an ever-growing collection of unmatched
TaxiFare records waiting to be matched with event data that will never arrive. Eventually our enrichment job will run out of memory.
You can solve this by using the timers available in a
CoProcessFunction to eventually clear any unmatched state that is being kept.
For this exercise you will work with two data streams, one with
TaxiRide events generated by a
CheckpointedTaxiRideSource and the other with
TaxiFare events generated by a
DataStream<TaxiRide> rides = env.addSource( new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)); DataStream<TaxiFare> fares = env.addSource( new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor));
We are recommending you use these checkpointed sources in case you want to run your solution on a cluster and experiment with making it truly fault tolerant.
Simulating Missing Data
You should arrange for some predictable fraction of the input records to be missing, so you can verify that you are correctly handling clearing the corresponding state.
The reference solution uses this
FilterFunction on the TaxiRides. It drops all START events, and every 1000th END event.
The result of this exercise is a data stream of
Tuple2<TaxiRide, TaxiFare> records, one for each distinct
rideId. You should ignore the START events, and only join the event for the END of each ride with its corresponding fare data.
In order to clearly see what is happening, create side outputs where you collect each unmatched
TaxiFare that is discarded in the
OnTimer method of the
Once the join is basically working, don’t bother printing the joined records. Instead, print to standard out everything going to the side outputs, and verify that the results make sense. If you use the filter proposed above, then you should see something like this. These are
TaxiFare records that were stored in state for a time, but eventually discarded because the matching
TaxiRide events had never arrived.
1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3 3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5 3> 4000,2013003768,2013003765,2013-01-01 00:13:00,CSH,0.0,0.0,26.5 4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38 4> 5000,2013004578,2013004575,2013-01-01 00:15:03,CSH,0.0,0.0,11.0
Reference solutions are available at GitHub: