One of our Flink streaming jobs had significant variance in the time spent on writing files to S3 by the same Task Manager process.
What settings do you need to check first?
In our environment every Task Manager runs 15 slots, and due to the high level of the Sink operator parallelism each slot is used to write data to S3 i.e there are 15 concurrent writers per Task Manager JVM process.
At the same time, fs.s3a.threads.max
option was set to 10, meaning that only 10 file parts (not even files) can be uploaded concurrently.
So we have to increase fs.s3a.threads.max
option to be not less than the number of sink slots in Task Manager.
Note that Flink supports bucketed writes to sinks when a single sink slot can write data to multiple files concurrently (partitioning data into different buckets based on some key value). In this case you can set even larger number of S3 write threads.
To check how many S3 transfer threads are using, go to the node running Task Manager and find the PID
of the YarnTaskExecutorRunner
process:
$ jps 18983 YarnTaskExecutorRunner
Then you can list S3 threads as follows:
jstack 18983 | grep "s3a-transfer" "s3a-transfer-shared-pool1-t9" #550 daemon prio=5 os_prio=0 tid=0x00007f8f4579b000 ... "s3a-transfer-shared-pool1-t2" #543 daemon prio=5 os_prio=0 tid=0x00007f8f4d063800 ... "s3a-transfer-shared-pool1-t3" #542 daemon prio=5 os_prio=0 tid=0x0000000002179000 ... "s3a-transfer-shared-pool1-t10" #549 daemon prio=5 os_prio=0 tid=0x00007f765bcd7000 ... "s3a-transfer-shared-pool1-t5" #546 daemon prio=5 os_prio=0 tid=0x00000000023f8000 ... ...