The goal of the “Long Ride Alerts” exercise is to indicate whenever a taxi ride started two hours ago, and is still ongoing.

This can be done fairly straightforwardly with a ProcessFunction (see the Long Ride Alerts exercise), but this requires you to explicitly manage state and timers. In this exercise we’ll do this more simply using Flink’s CEP library.

Using the CEP library requires adding these dependencies to the pom.xml of your Maven project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.6.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.6.0</version>
</dependency>

Input Data

The input data of this exercise is a DataStream of taxi ride events. You will want to use a CheckpointedTaxiRideSource:

DataStream<TaxiRide> rides = env.addSource(
  new CheckpointedTaxiRideSource(input, servingSpeedFactor));

Even when used with a delay factor of zero, the TaxiRideSource that you may’ve used before will reorder events with the same timestamp, and that adds complexity that we’d rather avoid for now.

Don’t bother trying to filter the events (as is done in the Taxi Ride Cleansing exercise).

Expected Output

The result of the exercise should be a DataStream<TaxiRide> that only contains START events of taxi rides which have no matching END event within the first two hours of the ride.

The resulting stream should be printed to standard out.

The expected results are something like this:

Here are the rideIds and start times of the first few rides that go on for more than two hours, but you might want to print other info as well:

> 2758,2013-01-01 00:10:13
> 7575,2013-01-01 00:20:23
> 22131,2013-01-01 00:47:03
> 25473,2013-01-01 00:53:10
> 29907,2013-01-01 01:01:15
> 30796,2013-01-01 01:03:00
...

Documentation

Reference Solution

Reference solutions are available at GitHub: