AWS,  Flink,  S3

Flink S3 Checkpoints – Monitoring Using S3 Access Logs

You can use the Flink Web UI to monitor the checkpoint operations in Flink, but in some cases S3 access logs can provide more information, and can be especially useful if you run many Flink applications.

Flink Web UI

The Flink Web UI provides an easy access to the checkpoint history and details, for example:

But it is not so easy to monitor many applications and perform a historical or advanced analysis.

End of Checkpoint

If Amazon S3 is used as the storage for the checkpoint metadata, you can use S3 access logs to track the end of each checkpoint.

The Flink Job Manager puts JobId/chk-CheckpointId/_metadata file to the specified checkpoint path when a checkpoint completes, for example, /f82c9a2b7f3490f704e06936eb3a5137/chk-2271/_metadata

You can use the following query for S3 Access Logs to find the log records denoting the end of Flink checkpoints:

select request_datetime, total_time, turnaround_time, key
from s3_access_logs
where key like '%/_metadata%' and operation = 'REST.PUT.OBJECT';

You can see that the output matches the Flink UI:

request_datetime       total_time   turnaround_time   key
----------------       ----------   ---------------   ----------------------
26/May/2020:07:02:10   45           21                .../chk-2268/_metadata
26/May/2020:07:04:30   77           61                .../chk-2269/_metadata
26/May/2020:07:06:52   167          147               .../chk-2270/_metadata
26/May/2020:07:09:14   93           76                .../chk-2271/_metadata

Note that the _metadata file in created only for successfully completed checkpoints. If a checkpoint failed in the Flink application you cannot observe any completion records for it in the S3 access log.

Checkpoint Duration

Individual subtasks of the Flink job put their own checkpoints to S3 when they are complete. Since there are typically vertexes with subtasks that checkpoint very fast, then with some approximation you can treat the earliest subtask checkpoint as the start time of the Flink checkpoint for the application. Similarly, you can consider the latest subtask checkpoint as the end time of the Flink checkpoint.

A subtask writes a checkpoint with the following key structure: /_entropy_/JobId/chk-CheckpointId/Guid-value.

For example, the following query can be used to estimate the checkpoint duration:

  max(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) - 
    min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) duration
  select split(key, '/')[3] chk, request_datetime
  from s3_access_logs
  where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT'
) r
group by chk;

You can see that the results of the query closely matches the checkpoint duration shown in the Flink UI:

chk        duration 
---        --------
chk-2268   22
chk-2269   19
chk-2270   20
chk-2271   20

Note that this approach will only work if your Flink application DAG contains vertexes and their subtasks that checkpoint very fast.

Checkpoint Skew

After you know the overall checkpoint duration it is useful to see whether there is any skew in the checkpoint time for various subtasks.

I am going to get 50% (median), 99%, 99.5%, 99.9% and 99.99% percentiles for the checkpoint duration for all subtasks of every checkpoint:

  count(*) subtasks_cnt,
  max(req_dt - min_req_dt) chk_duration,
  percentile(cast(req_dt - min_req_dt as bigint), array(0.5, 0.99, 0.995, 0.999, 0.9999)) pct
  select split(key, '/')[3] chk, unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z') req_dt
  from s3_access_logs
  where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT'
) r
    split(key, '/')[3] chk, 
    min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) min_req_dt
  from s3_access_logs
  where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT'
  group by split(key, '/')[3]
) mn
on r.chk = mn.chk
group by r.chk

Here is my sample result:

chk         subtasks_cnt    chk_duration   pct - 50%, 99%, 99.5%, 99.9%, 99.99%
--------    ------------    ------------   ------------------------------------
chk-2268    1275            22	           2.0, 5.0, 8.3, 17.6, 21.6
chk-2269    1275            19             2.0, 4.0, 6.0, 14.6, 18.6
chk-2270    1275            20             1.0, 3.0, 7.6, 19.0, 19.8
chk-2271    1275            20             2.0, 3.0, 4.0, 19.0, 19.8

You can see very high skew in the checkpoint time – there 1,275 subtasks and while the overall checkpoint time is between 19-22 seconds, 99.5% of tasks (1275 - (1275/1000)*5 = 1269 subtasks) complete their checkpoints within 4-8 seconds (after the fastest subtask checkpoint).

It is important to eliminate this skew as the slowest subtasks delay the entire Flink checkpoint.

Top N Slowest Instances

Next, you can find the slowest instances for every checkpoint:

  collect_list(remote_ip) ip
    row_number() over (partition by r.chk order by (req_dt - min_req_dt) desc) rownum
      split(key, '/')[3] chk, 
      unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z') req_dt, 
    from s3_access_logs
    where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT'
  ) r
      split(key, '/')[3] chk, 
      min(unix_timestamp(request_datetime, 'dd/MMM/yyyy:HH:mm:ss z')) min_req_dt
    from s3_access_logs
    where key like '%/chk-%/%-%' and operation = 'REST.PUT.OBJECT'
    group by split(key, '/')[3]
  ) mn
  on r.chk = mn.chk
) t
where rownum <= 5
group by chk

My sample output:

chk        ip
----       --
chk-2268   ["","", "","",""]
chk-2269   ["",  "","","",""]	
chk-2270   ["","", "","",""]	
chk-2271   ["","","","",""]	

Additionally you can easily calculate how many times the instances (IP addresses) participated in the longest checkpoint to see if there is a permanent data skew or issues with instances.