The task of the “Popular Places” exercise is to identify popular places from a table of taxi rides records just like the previous Popular Places exercise. This is done by counting every five minutes the number of taxi rides that started and ended in the same area within the last 15 minutes. Arrival and departure locations should be separately counted. Only locations with more arrivals or departures than a provided popularity threshold should be forwarded to the result.

You can implement a solution for the exercise with Flink’s Table API or SQL interface. For that you need to add the following dependencies to the pom.xml of your Maven project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.4.0</version>
</dependency>

Please note that the query should operate in event time.

Input Data

The input data of this exercise is a Table of taxi ride events. The table is provided by the Taxi Rides Table Source. The table rows should be filtered for valid departure and arrival coordinates.

Resources

The GeoUtils class provides as set of user-defined function (UDFs) for the Table API and SQL:

  • GeoUtils.IsInNYC checks if a location (longitude, latitude) is in New York City.
  • GeoUtils.ToCellId maps a location (longitude, latitude) to a cell id that refers to an area of approximately 100x100 meters size
  • GeoUtils.ToCoords converts a grid cell id back into a longitude/latitude pair.

UDFs need to be registered at a TableEnvironment before they can be used.

StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

// register UDF (works identically in Scala)
tEnv.registerFunction("isInNyc", new GeoUtils.IsInNYC);

// use UDF in SQL
Table t = tEnv.sqlQuery("SELECT isInNyc(startLon, startLat) FROM TaxiRides");

// use UDF in Table API
Table t2 = tEnv.scan("TaxiRides")
.select("isInNyc(startLon, startLat)");

Expected Output

The result of this exercise is a Table with the following schema:

coords         : (Float, Float) // pair of longitude/latitude
isStart        : Boolean        // flag indicating departure or arrival count
wstart         : Timestamp      // the start time of the sliding window
wend           : Timestamp      // the end time of the sliding window
popCnt         : Long           // the number of rides departing or arriving

The resulting Table should be printed to standard out.

Implementation Hints

This task requires to count taxi ride events by location (cell id), event type (departure or arrival), and time (sliding window). First, we need to obtain the corresponding cell id for each row. Subsequently, we define a sliding time windows of 15 minutes length and 5 minutes evaluation interval and group by the window, cell, and event type. For each group, we count the number of events. The counts need to be filtered by the popularity threshold. Finally, the cell id should be converted back into longitude and latitude before the result stream is emitted.
Depending on the event type (departure or arrival), we need to compute the cell ID either on the startLon and startLat or on the endLon and endLat values. The Table API and SQL support conditional expressions in their SELECT method or clause.

Reference Solution

Reference solutions are available at GitHub: