The goal of this exercise is to join together the TaxiRide and TaxiFare records for each ride in a more robust way than we did in an earlier exercise.

The problem with using a RichCoFlatMap for this application is that in a real-world system we have to expect that some records will be lost or corrupted. This means that over time we will accumulate an ever-growing collection of unmatched TaxiRide and TaxiFare records waiting to be matched with event data that will never arrive. Eventually our enrichment job will run out of memory.

You can solve this by using the timers available in a CoProcessFunction to eventually clear any unmatched state that is being kept.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a CheckpointedTaxiRideSource and the other with TaxiFare events generated by a CheckpointedTaxiFareSource:

DataStream<TaxiRide> rides = env.addSource(
  new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor));
DataStream<TaxiFare> fares = env.addSource(
  new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor));

We are recommending you use these checkpointed sources in case you want to run your solution on a cluster and experiment with making it truly fault tolerant.

Simulating Missing Data

You should arrange for some predictable fraction of the input records to be missing, so you can verify that you are correctly handling clearing the corresponding state.

The reference solution uses this FilterFunction on the TaxiRides. It drops all START events, and every 1000th END event.

Java

DataStream<TaxiRide> rides = env
    .addSource(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))
    .filter(new FilterFunction<TaxiRide>() {
      @Override
      public boolean filter(TaxiRide ride) throws Exception {
        return !ride.isStart && (ride.rideId % 1000 != 0);
      }
    })

Scala

val rides = env
  .addSource(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))
  .filter { ride => !ride.isStart && (ride.rideId % 1000 != 0) }

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. You should ignore the START events, and only join the event for the END of each ride with its corresponding fare data.

In order to clearly see what is happening, create side outputs where you collect each unmatched TaxiRide and TaxiFare that is discarded in the OnTimer method of the CoProcessFunction.

Once the join is basically working, don’t bother printing the joined records. Instead, print to standard out everything going to the side outputs, and verify that the results make sense. If you use the filter proposed above, then you should see something like this. These are TaxiFare records that were stored in state for a time, but eventually discarded because the matching TaxiRide events had never arrived.

1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3
3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5
3> 4000,2013003768,2013003765,2013-01-01 00:13:00,CSH,0.0,0.0,26.5
4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38
4> 5000,2013004578,2013004575,2013-01-01 00:15:03,CSH,0.0,0.0,11.0

Documentation

Reference Solution

Reference solutions are available at GitHub: