The objective of the Connected Car Sessions exercise is to divide the connected car data stream into session windows, where a gap of more than 15 seconds should start a new session.

Your window function should look something like this, but we’ll leave it to you to fill in the details:

Java

public static class CreateGapSegment implements WindowFunction<...> {
    @Override
    public void apply(...) {
        out.collect(new GapSegment(events));
    }
}

We have supplied the GapSegment class, whose constructor takes an Iterable<ConnectedCarEvent> and returns an object that provides interesting statistics about the events that were collected in the Window:

startTime      : long    // timestamp (milliseconds since the epoch)
length         : long    // number of events
maxSpeed       : int     // the highest speed attained
erraticness    : float   // the standard deviation of the throttle

The resulting stream of GapSegments should be printed to stdout, and the results should look like this:

1> 1484892235000,333 events,141 kph,crazy
1> 1484893178000,152 events,130 kph,busy
1> 1487345789000,10 events,37 kph,busy

Implementation Hints

Until now you have been working with auto-watermarking, which is to say that Flink has automatically invoked your watermark generator at some regular interval (as measured by the CPU’s time-of-day clock). That won’t work for these exercises, because this small dataset will be completely emitted long before even 1msec of time (the smallest configurable interval for auto-watermarking) has elapsed. The alternative you need is to implement a AssignerWithPunctuatedWatermarks<ConnectedCarEvent>.

Reference Solution

A reference solution is available at GitHub: