The goal of this exercise is to join together the TaxiRide and TaxiFare records for each ride.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a TaxiRideSource and the other with TaxiFare events generated by a TaxiFareSource. See Taxi Data Streams for information on how to download the data and how to work with these stream generators.

(Note that if you want to make your solution truly fault tolerant, you can use the CheckpointedTaxiRideSource and CheckpointedTaxiFareSource.)

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.

The resulting stream should be printed to standard out.

Getting Started

Rather than following these links to GitHub, you might prefer to open these classes in your IDE:

Tests

com.dataartisans.flinktraining.exercises.datastream_java.state.RidesAndFaresTest

Exercise Classes

Implementation Hints

You can use a RichCoFlatMap to implement this join operation. Note that you have no control over the order of arrival of the ride and fare records for each rideId, so you’ll need to be prepared to store either piece of information until the matching info arrives, at which point you can emit a Tuple2<TaxiRide, TaxiFare> joining the two records together.
You should be using Flink’s managed, keyed state to buffer the data that is being held until the matching event arrives. And be sure to clear the state once it is no longer needed.

Documentation

Reference Solutions

Reference solutions are available at GitHub: