You can use the Flink Web UI to monitor the checkpoint operations in Flink, but in some cases S3 access logs can provide more information, and can be especially useful if you run many Flink applications.
Flink Web UI
The Flink Web UI provides an easy access to the checkpoint history and details, for example:
But it is not so easy to monitor many applications and perform a historical or advanced analysis.
End of Checkpoint
If Amazon S3 is used as the storage for the checkpoint metadata, you can use S3 access logs to track the end of each checkpoint.
The Flink Job Manager puts JobId/chk-CheckpointId/_metadata
file to the specified checkpoint path when a checkpoint completes, for example, /f82c9a2b7f3490f704e06936eb3a5137/chk-2271/_metadata
You can use the following query for S3 Access Logs to find the log records denoting the end of Flink checkpoints:
select request_datetime, total_time, turnaround_time, key from s3_access_logs where key like '%/_metadata%' and operation = 'REST.PUT.OBJECT';
You can see that the output matches the Flink UI:
request_datetime total_time turnaround_time key ---------------- ---------- --------------- ---------------------- 26/May/2020:07:02:10 45 21 .../chk-2268/_metadata 26/May/2020:07:04:30 77 61 .../chk-2269/_metadata 26/May/2020:07:06:52 167 147 .../chk-2270/_metadata 26/May/2020:07:09:14 93 76 .../chk-2271/_metadata
Note that the _metadata
file in created only for successfully completed checkpoints. If a checkpoint failed in the Flink application you cannot observe any completion records for it in the S3 access log.
Checkpoint Duration
Individual subtasks of the Flink job put their own checkpoints to S3 when they are complete. Since there are typically vertexes with subtasks that checkpoint very fast, then with some approximation you can treat the earliest subtask checkpoint as the start time of the Flink checkpoint for the application. Similarly, you can consider the latest subtask checkpoint as the end time of the Flink checkpoint.
A subtask writes a checkpoint with the following key
structure: /_entropy_/JobId/chk-CheckpointId/Guid-value
.
For example, the following query can be used to estimate the checkpoint duration:
select chk, max(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) - min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) duration from ( select split(key, '/')[3] chk, request_datetime from s3_access_logs where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT' ) r group by chk;
You can see that the results of the query closely matches the checkpoint duration shown in the Flink UI:
chk duration --- -------- chk-2268 22 chk-2269 19 chk-2270 20 chk-2271 20
Note that this approach will only work if your Flink application DAG contains vertexes and their subtasks that checkpoint very fast.
Checkpoint Skew
After you know the overall checkpoint duration it is useful to see whether there is any skew in the checkpoint time for various subtasks.
I am going to get 50% (median), 99%, 99.5%, 99.9% and 99.99% percentiles for the checkpoint duration for all subtasks of every checkpoint:
select r.chk, count(*) subtasks_cnt, max(req_dt - min_req_dt) chk_duration, percentile(cast(req_dt - min_req_dt as bigint), array(0.5, 0.99, 0.995, 0.999, 0.9999)) pct from ( select split(key, '/')[3] chk, unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z') req_dt from s3_access_logs where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT' ) r join ( select split(key, '/')[3] chk, min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) min_req_dt from s3_access_logs where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT' group by split(key, '/')[3] ) mn on r.chk = mn.chk group by r.chk
Here is my sample result:
chk subtasks_cnt chk_duration pct - 50%, 99%, 99.5%, 99.9%, 99.99% -------- ------------ ------------ ------------------------------------ chk-2268 1275 22 2.0, 5.0, 8.3, 17.6, 21.6 chk-2269 1275 19 2.0, 4.0, 6.0, 14.6, 18.6 chk-2270 1275 20 1.0, 3.0, 7.6, 19.0, 19.8 chk-2271 1275 20 2.0, 3.0, 4.0, 19.0, 19.8
You can see very high skew in the checkpoint time – there 1,275 subtasks and while the overall checkpoint time is between 19-22 seconds, 99.5% of tasks (1275 - (1275/1000)*5 = 1269
subtasks) complete their checkpoints within 4-8 seconds (after the fastest subtask checkpoint).
It is important to eliminate this skew as the slowest subtasks delay the entire Flink checkpoint.
Top N Slowest Instances
Next, you can find the slowest instances for every checkpoint:
select chk, collect_list(remote_ip) ip from ( select r.chk, remote_ip, row_number() over (partition by r.chk order by (req_dt - min_req_dt) desc) rownum from ( select split(key, '/')[3] chk, unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z') req_dt, remote_ip from s3_access_logs where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT' ) r join ( select split(key, '/')[3] chk, min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) min_req_dt from s3_access_logs where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT' group by split(key, '/')[3] ) mn on r.chk = mn.chk ) t where rownum <= 5 group by chk
My sample output:
chk ip ---- -- chk-2268 ["10.77.37.123","10.77.35.33", "10.77.34.13","10.77.37.174","10.77.34.5"] chk-2269 ["10.77.34.5", "10.77.36.135","10.77.37.33","10.77.37.174","10.77.35.13"] chk-2270 ["10.77.37.130","10.77.34.31", "10.77.34.31","10.77.35.254","10.77.35.17"] chk-2271 ["10.77.37.130","10.77.33.237","10.77.37.33","10.77.35.125","10.77.35.125"]
Additionally you can easily calculate how many times the instances (IP addresses) participated in the longest checkpoint to see if there is a permanent data skew or issues with instances.