The task of the “Popular Places” exercise is to identify popular places from a table of taxi rides records just like the previous Popular Places exercise. This is done by counting every five minutes the number of taxi rides that started and ended in the same area within the last 15 minutes. Arrival and departure locations should be separately counted. Only locations with more arrivals or departures than a provided popularity threshold should be forwarded to the result.
You can implement a solution for the exercise with Flink’s Table API or SQL interface. For that you need to add the following dependencies to the
pom.xml of your Maven project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.4.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.4.2</version> </dependency>
Please note that the query should operate in event time.
The input data of this exercise is a
Table of taxi ride events. The table is provided by the Taxi Rides Table Source. The table rows should be filtered for valid departure and arrival coordinates.
GeoUtils class provides as set of user-defined function (UDFs) for the Table API and SQL:
GeoUtils.IsInNYCchecks if a location (longitude, latitude) is in New York City.
GeoUtils.ToCellIdmaps a location (longitude, latitude) to a cell id that refers to an area of approximately 100x100 meters size
GeoUtils.ToCoordsconverts a grid cell id back into a longitude/latitude pair.
UDFs need to be registered at a
TableEnvironment before they can be used.
The result of this exercise is a
Table with the following schema:
coords : (Float, Float) // pair of longitude/latitude isStart : Boolean // flag indicating departure or arrival count wstart : Timestamp // the start time of the sliding window wend : Timestamp // the end time of the sliding window popCnt : Long // the number of rides departing or arriving
Table should be printed to standard out.
startLator on the
endLatvalues. The Table API and SQL support conditional expressions in their SELECT method or clause.
Reference solutions are available at GitHub: