The task of the “Popular Places” exercise is to identify popular places from the taxi ride data stream. This is done by counting every five minutes the number of taxi rides that started and ended in the same area within the last 15 minutes. Arrival and departure locations should be separately counted. Only locations with more arrivals or departures than a provided popularity threshold should be forwarded to the result stream.

The GeoUtils class provides a static method GeoUtils.mapToGridCell(float lon, float lat) which maps a location (longitude, latitude) to a cell id that refers to an area of approximately 100x100 meters size. The GeoUtils class also provides reverse methods to compute the longitude and latitude of the center of a grid cell.

Please note that the program should operate in event time.

Input Data

The input data of this exercise is a stream of TaxiRide events generated by the Taxi Stream Source filtered by the New York City area filter of the Taxi Ride Cleansing exercise (reuse the FilterFunction of the Taxi Ride Cleansing exercise).

The TaxiRideSource annotates the generated DataStream<TaxiRide> with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.

Expected Output

The result of this exercise is a data stream of Tuple5<Float, Float, Long, Boolean, Integer> records. Each record contains the longitude and latitude of the location cell (two Float values), the timestamp of the count (Long), a flag indicating arrival or departure counts (Boolean), and the actual count (Integer).

The resulting stream should be printed to standard out.

Getting Started

Tests

com.dataartisans.flinktraining.exercises.datastream_java.windows.PopularPlacesTest

Exercise Classes

Implementation Hints

This task requires to count taxi ride events by cell id and event type (start or end event). Hence, we need to obtain a cell id for each record and group the records by cell id and event type. Subsequently, we need to define sliding time windows of 15 minutes length and 5 minutes evaluation interval. In each window we count the number of events. The counts need to be filtered by the popularity threshold. Finally, the cell id should be converted back into longitude and latitude before the result stream is emitted.
Each TaxiRide event must be mapped to a cell id. This can be done by a MapFunction which calls the GeoUtils.mapToGridCell() method. Start events are mapped to their departure location, end events are mapped to their destination location.
In order to compute separate area counts for arriving and departing taxi rides, the stream needs to be organized (keyed) by grid cell id and event type. You can key a data stream handing a list of keys to DataStream.keyBy().
Use DataStream.timeWindow(Time.minutes(15), Time.minutes(5)) to define a sliding time window of 15 minutes that triggers every five minutes. Use apply(WindowFunction) to count the number of elements in the window and add the end time of the window which can be queried with TimeWindow.getEnd().

Documentation

Reference Solutions

Reference solutions are available at GitHub: