Flink,  I/O,  Parquet,  S3

Flink Streaming to Parquet Files in S3 – Massive Write IOPS on Checkpoint

It is quite common to have a streaming Flink application that reads incoming data and puts them into Parquet files with low latency (a couple of minutes) for analysts to be able to run both near-realtime and historical ad-hoc analysis mostly using SQL queries.

But let’s review write patterns and problems that can appear for such applications at scale.

Flink Streaming File Sink

Let’s consider a scenario when you need to read data from a streaming source (one or more Apache Kafka topics, or an Amazon Kinesis data stream), and route the data to different buckets depending on the event type (product name, title or user action, for example):

Additionally you also want to roll the data files based on the event date and hour, so you can have the corresponding partitions for SQL tables.

So if you have 1,000 event types, the Flink application will shuffle and write output data to at least 1,000 Parquet files. Since typically there is a data skew and some event types can generate much more data than others it makes sense to assign more buckets (and files) for large event types.

Write IOPS Storm on Checkpoint

In case of Parquet, Flink uses the bulk-encoded format as for a columnar storage you cannot effectively write data row by row, instead you have to accumulate then into row groups. See How Parquet Files are Written – Row Groups, Pages, Required Memory and Flush Operations for more details.

That’s one of the reasons why Flink closes and rolls the Parquet files on checkpoint. This brings up a problem that all Flink sink tasks start writing the output Parquet files at the same time.

For example, in my application Flink started a checkpoint at 13:37:11 and I can observe the following PUT.OBJECT operations in the S3 access log:

timestamp      total PUTs   total size      p50 time ms   p99.99 time ms    max time ms
---------      ----------   ----------      -----------   --------------    -----------
13:37:02       1            136,807,976     1625          1625              1625
13:37:06       1            134,217,823     1710          1710              1710
13:37:09       1            135,100,445     1586          1586              1586
13:37:12       1618         12,787,532,521  85            6640              18795

You can see a massive spike at 13:37:12, just one second after the checkpoint, when all the tasks started 1618 PUT operations attempting to write 12 GB of data in a single second. Most writes completed within 85-6640 milliseconds interval while the slowest tail write took 18.8 seconds delaying the entire checkpoint.

Smooth IOPS

It would be nice to smooth S3 write operations between two checkpoints. How to do that?

You may have already noticed there are 3 single PUT operations above made at 37:02, 37:06 and 37:09 before the checkpoint. The write size can give you a clue, it is a single part of multi-part upload to S3.

So some data sets were quite large so their data spilled before the checkpoint. Note that this is the internal spill in S3, data will not be visible until committed upon the successful Flink checkpoint.

So how can we force more writes to happen before the checkpoint so we can smooth IOPS and probably reduce the overall checkpoint latency? First you have to reduce s3.upload.min.part.size in flink-conf.yaml or fs.s3a.multipart.size in the core-site.xml cluster configuration file.

But this is not enough. The Parquet writer will hold the data in memory until its size reaches the specified row group size. After that the data are sent to the output stream and can be uploaded to S3 if the multi part size is exceeded.

After changing the row group size and s3 block size to 32 MB I got the following results:

timestamp      total PUTs   total size      p50 time ms   p99.99 time ms    max time ms
---------      ----------   ----------      -----------   --------------    -----------
13:41:02       11           383,856,833     553           5777              5857
13:41:03       9            310,251,567     517           7117              7168
13:41:04       10           347,426,701     631           13834             13834
...
03:41:59       1506         4,387,725,346   86            7840              18553

You can see that now more chunks were written before the checkpoint at 41:59.

This solution will not help much if you write many small files. You can see that in my case I was able to significantly reduce the data size written at checkpoint, but the number of PUT operations is still large. Also reducing the row group size to a small value is not recommended as it can later slow read operations from the Parquet files.

Also this approach does not guarantee that you can reduce the latency of writes. You can see that in my case the slowest tail write still took 18.5 seconds. See S3 Low Latency Writes – Using Aggressive Retries to Get Consistent Latency – Request Timeouts how to mitigate this problem.