AWS,  Kinesis

Kinesis Client Library (KCL 2.x) Consumer – Load Balancing, Rebalancing – Taking, Renewing and Stealing Leases

For zero-downtime, large-scale systems you can have multiple compute clusters located in different availability zones.

The Kinesis KCL 2.x Consumer is very helpful to build highly scalable, elastic and fault-tolerant streaming data processing pipelines for Amazon Kinesis. Let’s review some of the KCL internals related to the load balancing and response to compute node/cluster failures and how you can tune and monitor such activities.

Kinesis Shards

In this article we will focus only on consuming data from an Amazon Kinesis stream composed of hundreds or thousands of shards. There are many clusters launched to process streaming data:

There is no any explicit coordination or even awareness between clusters or cluster nodes to decide how to share, load balance the data processing and recover from failures such as a node or cluster loss. All is done by the KCL Consumer. Let’s see how this is possible.

Application Name and Lease Table

With KCL all workers (you can also call them application instances or nodes) have to use the same application name to work together on the same data stream.

The KCL Consumer uses this application name to create a table in DynamoDB to track the overall processing state (worker-shard assignments, worker state and progress).

When you start the workers for the first time, the first worker creates the DynamoDb table:

INFO software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator - Created new lease table
 for coordinator with initial read capacity of 1000 and write capacity of 1000.

If the lease table is empty workers get the list of shards and create an item for each lease key without any worker assignment yet:

Lease Renewer – Cold Start

Each worker reads the lease information for all shards.

During the cold start when the lease table does not have any worker assignment yet (see the image above), each worker cannot find its own leases, so it has nothing to refresh.

Anyway, each worker starts the Lease Renewer thread:

// DynamoDBLeaseRenewer.java

// Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event 
// of degredation.   
leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L,
  renewerIntervalMillis, TimeUnit.MILLISECONDS);

The time interval for refreshes is defined as follows:

renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;

leaseDurationMillis is the FailoverTimeMillis option of the KCL Consumer with the default value of 10 seconds. This leads to the default value of the lease refresh every 3308 milliseconds (3.3 seconds).

Lease Taker – Cold Start

Each worker has a Lease Taker, a thread that monitors the states and progress of leases of other workers and tries to take or steal them if a lease is not assigned, not updated or if another worker has too many leases and some load balancing can be helpful.

// DynamoDBLeaseRenewer.java

// Taker runs with fixed DELAY because we want it to run slower in the event of 
// performance degredation.
takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(),
   0L, takerIntervalMillis, TimeUnit.MILLISECONDS);

The Lease Take runs every takerIntervalMillis (since the last run):

takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;

This leads to the default value of the lease taker run every 20050 milliseconds (20 seconds). You can view major lease coordinator parameters in the KCL log:

INFO software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator - With failover time 10000 ms 
and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, 
process maximum of 2147483647 leases and steal 1 lease(s) at a time.

The worker defines its Target, the number of leases it wants to eventually acquire:

 target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1)

 if (target > maxLeasesForWorker) 
     target = maxLeasesForWorker;

maxLeasesForWorker is the KCL configuration parameter that defines the maximum number of leases that every worker can acquire and it has very high default value Integer.MAX_VALUE (2147483647) that you should consider to lower.

So if the current number of leases is less than the target, the Lease Taker makes an attempt to obtain more leases.

During the cold start the Lease Takers gets the list of all available leases, shuffles it to reduce contention with other workers doing the same thing, and tries to acquire first leases.

To acquire a lease the lease taker tries to conditionally update the leaseCounter from 0 to 1 and set leaseOwner:

Since still many workers can run this request concurrently only the first worker will take the lease on the specific shard (KCL uses the atomic conditional update feature of DynamoDB).

Note that the during cold start each work does not know about any other workers so numWorkers = 1, and each worker tries to get all leases up to maxLeasesForWorker.

After a restart when other workers have their items in the lease table, each worker can properly estimate its target as it knows the estimated number of workers.

Lease Renewer – Restarts and Updates

When a worker restarts it immediately tries to find leases owned by it before and refreshes them, so no other workers can take them. Of course if the downtime of a worker is long and exceeds takerIntervalMillis time, then it is unlikely that the worker can get its leases back.

Then the Lease Renewer thread updates leases owned by the worker every renewerIntervalMillis.

The worker process internally tracks the current leaseCounter for own leases and increments it by 1, but only if the leaseCounter was not changed by another worker. In this case the current worker considers that it lost the lease as it was stolen by another worker.

Updating Target

The target number of leases for a worker is constantly updated by Lease Taker:

target = min(numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1), maxLeasesForWorker)

numWorkers includes only workers that updated their leases since the last check.

Lease Taker – Expired Leases

Every worker constantly monitors leaseCounters of all other workers checking if they are changed within leaseDurationMillis.

If a lease counter was not incremented the lease is considered as expired and the current worker will try to acquire it if its target is higher than the number of current leases. Note that there is no any last update timestamp in the lease table, every worker checks its own internal timer.

Remember that Lease Renewer of every worker runs in a separate thread, so it can successfully update owned leases even if processRecords can take time and exceeds the lease duration in some cases.

Lease Taker – Stealing Leases

If all leases are renewed in a timely manner, but a worker still needs more leases to meet its target, it tries to steal leases from other workers.

The worker finds the most loaded worker. If it has more leases than the current target, its leases will be stolen. If the most loaded worker has the number of leases equal to the target, its one lease still can be stolen if another worker needs more than one lease to meet its target.

maxLeasesToStealAtOneTime defines how many leases can be stolen at a time, but note that during the single Taker cycle (20 seconds by default), a worker can steal leases only from a single worker (most loaded).

So even if you set maxLeasesToStealAtOneTime = 10, for example, but no one worker has 10 leases exceeding the target than you may need many cycles to balance leases and maxLeasesToStealAtOneTime option will not have much effect.

Stable Disbalance

The KCL Consumer is very powerful in rebalancing the workload between workers, but let’s consider a scenario when a disbalance happens and it cannot be resolved.

Assume we add workers not one by one, but by clusters of 100 workers. For example, if we have a Kinesis stream with 500 shards, and add the first cluster and then add the second one.

When the first cluster is added, its workers take all 500 shards, 5 shards per worker:

When we add the second cluster of 100 workers, we have 500/200 = 2.5 shards per worker. But a worker can process the whole number of shards only, so every worker sets its target to 3 shards.

The second cluster starts stealing shards from the first cluster but eventually rebalancing settles on all nodes of the first cluster having 3 shards, while the nodes of the second cluster having only 2 shards.

So the first cluster will process 300 shards, the second just 200.

The code below shows that if a worker has 2 leases and needs just 1 more to meet its target 3, it cannot still from a worker that already has 3 leases and whose target is also 3.

But if a worker has 1 lease while another worker has 3, it can steal the lease no matter that the target of the other worker is 3:

// DynamoDBLeaseTaker.java

int numLeasesToSteal = 0;
if ((mostLoadedWorker.getValue() >= target) && (needed > 0)) {
    int leasesOverTarget = mostLoadedWorker.getValue() - target;
    numLeasesToSteal = Math.min(needed, leasesOverTarget);
    // steal 1 if we need > 1 and max loaded worker has target leases.
    if ((needed > 1) && (numLeasesToSteal == 0)) {
        numLeasesToSteal = 1;
    }
    numLeasesToSteal = Math.min(numLeasesToSteal, maxLeasesToStealAtOneTime);
}

It may be fine for individual workers to process 3 shards and other workers to process just 2, but not in the scope of the entire cluster as they become disbalanced.

You should take into account that the number of shards divided by the number of workers should be close to the whole number when planning how many workers are added in clusters.

Check Work Distribution

You can use the following Python script to check how shards are distributed between workers and shards:

import boto3
import pandas as pd

table_name = "ApplicationName"

# Define the cluster name from leaseOwner string: cluster_name-worker_id
def getClusterName(leaseOwner):
    return leaseOwner.split('-')[0]

ddb = boto3.resource("dynamodb", region_name = "us-east-1")
table = ddb.Table(table_name)

last_evaluated_key = None
lease_table = []

while True:
    if last_evaluated_key:
        response = table.scan(ExclusiveStartKey = last_evaluated_key)
    else: 
        response = table.scan()
        
    for i in response["Items"]:
        lease_table.append(i)
        
    last_evaluated_key = response.get("LastEvaluatedKey")
    if not last_evaluated_key:
        break

df = pd.DataFrame(lease_table)
df["clusterName"] = df["leaseOwner"].apply(lambda x: getClusterName(x))

df.groupby("clusterName")["clusterName"].count()