The task of this exercise is connect the TaxiRide Cleansing program and the Popular Places program through a Apache Kafka topic. For that both programs need to be modified:

  1. The TaxiRide cleansing program shall write its result stream to a Kafka topic and
  2. the Popular Places program shall read its input stream from that that Kafka topic.

The Kafka installation instructions explain how to setup and start Kafka. The following instructions help with the necessary modifications:

Adding the Kafka Connector dependency

Flink features connectors to several external systems. In order to keep the dependencies on the core slim, these connectors are organized in separate modules and have to be included as needed. The connector for Kafka 0.11 can be used by adding the following dependency to your pom.xml file.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>1.4.0</version>
</dependency>

Writing to Kafka

The result of the TaxiRide Cleansing program is a DataStream<TaxiRide>. The program needs to be modified to write this DataStream into an Kafka topic instead of printing it to standard out.

Flink’s Kafka Connector provides the FlinkKafkaProducer011 class to write a DataStream to a Kafka 0.11 topic. It can be used as follows:

DataStream<TaxiRide> filteredRides = ...
filteredRides.addSink(new FlinkKafkaProducer011<TaxiRide>(
"localhost:9092", // Kafka broker host:port
"cleansedRides", // Topic to write to
new TaxiRideSchema()) // Serializer (provided as util)
);

When you start a program that writes to a Kafka sink, the resulting records are appended to the the configured Kafka topic. You can check if the Kafka topic is receiving data by starting Kafka’s Console Consumer, which prints the records of a topic to the console, as follows:

./bin/kafka-console-consumer.sh \
  --zookeeper localhost:2181 \
  --topic cleansedRides \
  --from-beginning

Note: Kafka topics are designed as durable logs. Restarting a program that writes to a Kafka topic means that all records are appended, i.e., the topic is not overwritten! Check the Kafka instructions to learn how a topic can be removed.

Reading from Kafka

After the Kafka topic was filled with cleansed TaxiRides, the next step is to adapt the Popular Places program such that it reads its input from that topic. For that we need to replace the TaxiRideSource by a KafkaConsumer data source. The following code snippet shows how to configure and use a KafkaConsumer source.

// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time characteristics
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// generate a Watermark every second
env.getConfig().setAutoWatermarkInterval(1000);

// configure Kafka consumer
Properties props = new Properties();
props.setProperty("zookeeper.connect", "localhost:2181"); // Zookeeper default host:port
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "myGroup"); // Consumer group ID
props.setProperty("auto.offset.reset", "earliest"); // Always read topic from start

// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer =
new FlinkKafkaConsumer011<>(
"cleansedRides",
new TaxiRideSchema(),
props);

// create Kafka consumer data source
DataStream<TaxiRide> rides = env.addSource(consumer);

Note: A stream read from Kafka does not automatically have timestamps and watermarks assigned. You have to take care of this yourself in order to make the event-time windows working. Otherwise the program won’t emit any results. Please refer to the Implementation Hints or to the reference implementation below if you need help.

The resulting stream should be printed to standard out.

When you run your program, it will start reading the Kafka topic from the beginning (given that you set the auto.offset.reset property to earliest) and stop at the end of the topic. You can also concurrently run the writing and reading program in order to send data from the TaxiRide Cleansing program through Kafka to the Popular Place program.

Implementation Hints

The KafkaConsumer011 class has a method assignTimestampsAndWatermarks() to provide a custom timestamp and watermark assigner. Flink provides the abstract BoundedOutOfOrdernessTimestampExtractor class to implement timestamp extractors with bounded out-of-orderness (watermarks follow timestamps after a fixed time interval). You should extend this class to implement a custom timestamp and watermark assigner. The out-of-orderness of the TaxiRide events that were provided by the TaxiRideSource and that were written to the Kafka topic depends on the maxEventDelay parameter of the TaxiRideSource of the TaxiRide Cleansing program. The extracted timestamp should be the TaxiRide.startTime field or the TaxiRide.endTime field converted to a long.

Reference Solution

Reference solutions are available at GitHub: