It is quite common to have a streaming Flink application that reads incoming data and puts them into Parquet files with low latency (a couple of minutes) for analysts to be able to run both near-realtime and historical ad-hoc analysis mostly using SQL queries.
But let’s review write patterns and problems that can appear for such applications at scale.
Flink Streaming File Sink
Let’s consider a scenario when you need to read data from a streaming source (one or more Apache Kafka topics, or an Amazon Kinesis data stream), and route the data to different buckets depending on the event type (product name, title or user action, for example):
Additionally you also want to roll the data files based on the event date and hour, so you can have the corresponding partitions for SQL tables.
So if you have 1,000 event types, the Flink application will shuffle and write output data to at least 1,000 Parquet files. Since typically there is a data skew and some event types can generate much more data than others it makes sense to assign more buckets (and files) for large event types.
Write IOPS Storm on Checkpoint
In case of Parquet, Flink uses the bulk-encoded format as for a columnar storage you cannot effectively write data row by row, instead you have to accumulate then into row groups. See How Parquet Files are Written – Row Groups, Pages, Required Memory and Flush Operations for more details.
That’s one of the reasons why Flink closes and rolls the Parquet files on checkpoint. This brings up a problem that all Flink sink tasks start writing the output Parquet files at the same time.
For example, in my application Flink started a checkpoint at 13:37:11 and I can observe the following PUT.OBJECT operations in the S3 access log:
timestamp total PUTs total size p50 time ms p99.99 time ms max time ms --------- ---------- ---------- ----------- -------------- ----------- 13:37:02 1 136,807,976 1625 1625 1625 13:37:06 1 134,217,823 1710 1710 1710 13:37:09 1 135,100,445 1586 1586 1586 13:37:12 1618 12,787,532,521 85 6640 18795
You can see a massive spike at 13:37:12, just one second after the checkpoint, when all the tasks started 1618 PUT operations attempting to write 12 GB of data in a single second. Most writes completed within 85-6640 milliseconds interval while the slowest tail write took 18.8 seconds delaying the entire checkpoint.
Smooth IOPS
It would be nice to smooth S3 write operations between two checkpoints. How to do that?
You may have already noticed there are 3 single PUT operations above made at 37:02, 37:06 and 37:09 before the checkpoint. The write size can give you a clue, it is a single part of multi-part upload to S3.
So some data sets were quite large so their data spilled before the checkpoint. Note that this is the internal spill in S3, data will not be visible until committed upon the successful Flink checkpoint.
So how can we force more writes to happen before the checkpoint so we can smooth IOPS and probably reduce the overall checkpoint latency? First you have to reduce s3.upload.min.part.size
in flink-conf.yaml
or fs.s3a.multipart.size
in the core-site.xml
cluster configuration file.
But this is not enough. The Parquet writer will hold the data in memory until its size reaches the specified row group size. After that the data are sent to the output stream and can be uploaded to S3 if the multi part size is exceeded.
After changing the row group size and s3 block size to 32 MB I got the following results:
timestamp total PUTs total size p50 time ms p99.99 time ms max time ms --------- ---------- ---------- ----------- -------------- ----------- 13:41:02 11 383,856,833 553 5777 5857 13:41:03 9 310,251,567 517 7117 7168 13:41:04 10 347,426,701 631 13834 13834 ... 03:41:59 1506 4,387,725,346 86 7840 18553
You can see that now more chunks were written before the checkpoint at 41:59.
This solution will not help much if you write many small files. You can see that in my case I was able to significantly reduce the data size written at checkpoint, but the number of PUT operations is still large. Also reducing the row group size to a small value is not recommended as it can later slow read operations from the Parquet files.
Also this approach does not guarantee that you can reduce the latency of writes. You can see that in my case the slowest tail write still took 18.5 seconds. See S3 Low Latency Writes – Using Aggressive Retries to Get Consistent Latency – Request Timeouts how to mitigate this problem.