A couple of these hands-on sessions are about implementing and testing fault-tolerant, stateful DataStream programs. Here we describe, step-by-step, how to test that a stateful program successfully recovers from a simulated worker failure.
1. Implement a stateful program
2. Start a local Flink cluster
In order to demonstrate a worker failure, we have to execute the program on a local Flink cluster, not from the IDE. The setup instructions show how to setup Flink locally. Instead of starting a local Flink instance via
./bin/start-local.sh we need to start a local cluster.
In contrast to a local instance, a local cluster starts two separate processes for master (JobManager) and worker (TaskManager). You can this with these shell commands:
./bin/jobmanager.sh start cluster ./bin/taskmanager.sh start
You can check that the local Flink cluster is running via the web dashboard at http://localhost:8081. You should see one available TaskManager with one task slot.
3. Compile and start program
The How to do the Exercises page explains how to build and package a Flink program with Maven.
Your Flink application writes its results to the standard out of the TaskManager process on which it is running, which is redirected into a file. The file is located in the
./log directory and follows the naming pattern
flink-<user>-taskmanager-<number>-<host>.out. Run the following command to continuously display the tail of the file.
tail -F flink-bob-taskmanager-0-localhost.out
Since we have not started the job yet, the out file does not receive any data.
The How to do the Exercises page explains how to execute a packaged program on a running Flink instance using the CLI client. Here’s an example of running one of these exercises, but you’ll need to make adjustments depending on where the jar and data files are:
flink run -c \ com.dataartisans.flinktraining.exercises.datastream_scala.process.LongRides \ ~/flink-training-exercises/target/flink-training-exercises-0.13.0.jar \ --input ~/nycTaxiRides.gz
After you have started the job, you will see what output it is writing. By looking at its output, you should be able to distinguish whether the application was started from the very beginning of its input stream, or restarted from some later point in time (via a checkpoint or savepoint).
You can also see the running job in the Flink web dashboard.
3. Stop a TaskManager (and start a new one)
Your application is now running in a single worker process (TaskManager) and producing output. Let us see what happens if we kill the worker process by calling
in your Flink directory.
You will notice that the output file is no longer receiving data. If you go to the web dashboard, you will also see that the connected TaskManager disappeared and the job’s status switched to restarting (you may have to wait a while for the heartbeat messages to timeout). The job is now going through the configured restart policy. In a production setup, either a standby TaskManager would pick-up the work or a resource manager like YARN or Mesos would start a new TaskManager to continue processing. In our local setup, we are responsible to bring up a new TaskManager process.
Before we start a new TaskManager, let us discuss what to expect when a new TaskManager continues to process the program.
The data source should continue from the last checkpoint before the TaskManager was killed. Hence, we want to see that the ride ids do not start from the beginning.
Now let’s bring up a new worker process and resume processing by calling
After a short time you will notice that the job continues to write output. (Note, however, that the output file will rollover; flink-david-taskmanager-0-singularity.local.out becomes flink-david-taskmanager-0-singularity.local.out.1, for example.) Looking at the data, we should see that the data source continued from the last checkpoint before the failure, rather than starting over.
The Flink web dashboard will also show that a new TaskManager connected and that the status of our job switched to running.
4. Disabling Checkpointing
We have seen how a program that checkpoints its operator state recovers from a worker failure. In case you want to find out what happens if the program does not checkpoint its state you can simply remove the
env.enableCheckpointing() line from your program and continue from “3. Compile and start program”. (With the filesystem state backend it’s also possible to simply delete the checkpoint directory.)
When the program resumes processing after the new TaskManager process has been started, you will notice that
- The data source started processing from the beginning (ride ids are reset).
- The operator state was lost.