The task of the “Mail Count” exercise is to count the number of emails in the archive of the Flink development mailing list for each unique combination of email address and month.

Input Data

This exercise uses the Mail Data Set which was extracted from the Apache Flink development mailing list archive. The Mail Data Set instructions show how to read the data set in a Flink program using the CsvInputFormat.

The task requires two fields, Timestamp and Sender. The input data can be read as a DataSet<Tuple2<String, String>>. When printed, the data set should look similar to this:

(2014-09-26-08:49:58,Fabian Hueske <>)
(2014-09-12-14:50:38,Aljoscha Krettek <>)
(2014-09-30-09:16:29,Stephan Ewen <>)

Expected Output

The result of the task should be a DataSet<Tuple3<String, String, Integer>>. The first field specifies the month, the second field the email address, and the third field the number of emails sent to the mailing list by the given email address in the given month. When printed, the data set should looks like:


The first line of the example result indicates that 16 mails were sent to the mailing list by in September 2014.

Implementation Hints

This exercise is conceptually very similar to the WordCount program, which is the standard example to introduce MapReduce. Similar like WordCount, this task requires two transformation, Map and Reduce.
In contrast to the WordCount program, this exercise requires to group data on two fields (month and email-address) instead of a single field. Flink’s DataSet.groupBy() transformation, accepts multiple grouping keys and treats them as one composite grouping key.
The Map transformation is used for record-at-a-time processing and should be used to extract the relevant information from from the input data, i.e., the month from the timestamp field and the email address from the sender field.
The GroupReduce transformation operates on groups of records and can also be used to count the number of element in a group.

Reference Solution

Reference solutions are available at GitHub: