You can use the Flink Web UI to monitor the checkpoint operations in Flink, but in some cases S3 access logs can provide more information, and can be especially useful if you run many Flink applications.
Although Amazon S3 can generate a lot of logs and it makes sense to have an ETL process to parse, combine and put the logs into Parquet or ORC format for better query performance, there is still an easy way to analyze logs using a Hive table created just on top of the raw S3 log directory.
For zero-downtime, large-scale systems you can have multiple compute clusters located in different availability zones.
The Kinesis KCL 2.x Consumer is very helpful to build highly scalable, elastic and fault-tolerant streaming data processing pipelines for Amazon Kinesis. Let’s review some of the KCL internals related to the load balancing and response to compute node/cluster failures and how you can tune and monitor such activities.
You can expect that the total number of vCores available to YARN limits the number of containers you can run concurrently, that’s not true in some cases.
Let’s consider one of them – Capacity Scheduler with DefaultResourceCalculator (Memory only).
Let’s review how EC2 vCPUs correspond to YARN vCores in Amazon EMR and Qubole Hadoop clusters. As an example, I will choose
c4.4xlargeEC2 instance types.
EC2 vCPU is a thread of a CPU core (typically, there are two threads per core). Does it mean that YARN vCores should be equal to the number of EC2 vCPU? That’s not always the case.
Let’s review major REST API requests for uploading files to S3 (
PutObject, CreateMultipartUpload, UploadPartand
CompleteMultipartUpload) that you can observe in S3 access logs.
This can be helpful for monitoring S3 write performance. See also S3 Multipart Upload – S3 Access Log Messages.
On one of my clusters I got my favorite YARN error, although now it was in a Flink application:
Container is running beyond physical memory limits. Current usage: 99.5 GB of 99.5 GB physical memory used; 105.1 GB of 227.8 GB virtual memory used. Killing container.
Why did the container take so much physical memory and fail? Let’s investigate in detail.
Most applications writing data into S3 use the S3 multipart upload API to upload data in parts. First, you initiate the load, then upload parts and finally complete the multipart upload.
Let’s see how this operation is reflected in the S3 access log. My application uploaded the file
data.gzinto S3, and I can view it as follows:
One of our Flink streaming jobs had significant variance in the time spent on writing files to S3 by the same Task Manager process.
What settings do you need to check first?
On one of the clusters I noticed an increased rate of shuffle errors, and the restart of a job did not help, it still failed with the same error.
The error was as follows:
Error: Error while running task ( failure ) : org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$ShuffleError: error in shuffle in Fetcher at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal (Shuffle.java:301) Caused by: java.io.IOException: Shuffle failed with too many fetch failures and insufficient progress!failureCounts=1, pendingInputs=1, fetcherHealthy=false, reducerProgressedEnough=true, reducerStalled=true