The enviroCar Project provides a public data set (Source: 52°North enviroCar Server) of anonymized connected car data. We will be working with a subset of that data.

1. Schema of Connected Car Events

Each event in our connected car dataset contains 23 fields, but we will only concern ourselves with these:

id             : String  // a unique id for each event
car_id         : String  // a unique id for the car
timestamp      : long    // timestamp (milliseconds since the epoch)
longitude      : float   // GPS longitude
latitude       : float   // GPS latitude
consumption    : float   // fuel consumption (liters per hour)
speed          : float   // speed (kilometers per hour)
throttle       : float   // throttle position (%)
engineload     : float   // engine load (%)

2. Download the connected car data files

Download the data by running the following commands:


These two files contain the same event records, but in one file the data is sorted by timestamp, while in the other the data is out of order (by at most 30 seconds). Except when data is missing (i.e. when the car was turned off), there should be an event every 5 seconds.

All exercises should be implemented using event-time characteristics. Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.

Note: You have to add the flink-training-exercises dependency to your Maven pom.xml file as described in the setup instructions because the ConnectedCarEvent class and other helpful utility classes are contained in the flink-training-exercises dependency.

Here’s an example of how you can create a DataStream<ConnectedCarEvent> stream:


// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// connect to the data file
DataStream<String> carData = env.readTextFile(input);

// create event stream
DataStream<ConnectedCarEvent> events = carData
.map(new MapFunction<String, ConnectedCarEvent>() {
public ConnectedCarEvent map(String line) throws Exception {
return ConnectedCarEvent.fromString(line);

The ConnectedCarEvent records will be served as fast as possible (unlike the TaxiRideSource, which uses sleep() to simulate a more realistic data source).

You will need to figure out how to generate appropriate watermarks. You should test each exercise with both the in-order and out-of-order data files, and make sure they produce consistent results.