Amazon,  AWS,  RDS

Sqoop Import from Amazon RDS Read Replica – ERROR: Canceling Statement due to Conflict with Recovery

Apache Sqoop is widely used to import data from relational databases into cloud. One of our databases uses Amazon RDS for PostgreSQL to store sales data and its Sqoop import periodically failed with the following error:

Error: java.io.IOException: SQLException in nextKeyValue
Caused by: org.postgresql.util.PSQLException: ERROR: canceling statement due to conflict with recovery
  Detail: User query might have needed to see row versions that must be removed.

In this article I will describe a solution that helped resolve the problem in our specific case.

PostgreSQL – Conflict with Recovery

Amazon RDS provides a read replica to serve read-only requests to the database, and it uses the asynchronous replication to update the read replica from the master database instance.

If you run a long query on the read replica database it can be cancelled at some point when the refresh between master and replica occurs.

In my case Sqoop was used to export 200 million rows (77 GB) from the orders table and it failed after running about 30 minutes.

What options do we have to optimize this process without changing any settings at the PostgreSQL server side?

Increase Number of Mappers

As the first step I increased --num-mappers Sqoop option from 50 to 150 hoping that the increased parallelism will help finish the query faster. It did not help, and moreover all 150 mappers failed after running the same 30 minutes.

Split by Indexed Column

Increasing the number of mappers and getting the same extract time for every mapper was a clue: every mapper still scans all data, so something maybe wrong with the split column and its indexing.

This concern was confirmed by the duration of the split range query executed by Sqoop:

19/10/18 09:00:40 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: 
  SELECT MIN("created_at"), MAX("created_at") FROM "orders"
19/10/18 09:02:25 INFO mapreduce.JobSubmitter: number of splits:150

It took 105 seconds to get the MIN/MAX range. Looking at the table definition in PostgreSQL I noticed that the table did not have an index on created_at column so every split had to read the entire table despite using the filters on the created_at column, for example:

2019-10-18 09:02:51,335 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: 
  "created_at" >= '2017-03-26 08:20:19.873' AND "created_at" < '2017-04-07 23:54:55.832'

2019-10-18 09:02:51,337 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query: 
  SELECT ... FROM "orders" 
  WHERE "created_at" >= '2017-03-26 08:20:19.873' AND "created_at" < '2017-04-07 23:54:55.832'

2019-10-18 09:30:42,265 ERROR [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Top level exception:
  org.postgresql.util.PSQLException: ERROR: canceling statement due to conflict with recovery

Fortunately the orders table had an index on the updated_at column, so I tried to use it as --split-by column in Sqoop. Now it took just 1 second to get the MIN/MAX range:

19/10/18 15:00:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: 
  SELECT MIN("updated_at"), MAX("updated_at") FROM "orders"
19/10/18 15:00:47 INFO mapreduce.JobSubmitter: number of splits:150

With the new split column, the longest map task took 12 minutes to exports its data:

2019-10-18 15:01:01,065 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split:
  "updated_at" >= '2019-09-23 09:18:11.859' AND "updated_at" < '2019-10-06 00:09:29.38'

2019-10-18 15:01:01,066 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query: 
  SELECT ... FROM "orders" 
  WHERE ("updated_at" >= '2019-09-23 09:18:11.859') AND ("updated_at" < '2019-10-06 00:09:29.38')

2019-10-18 15:12:54,069 INFO [Thread-15] org.apache.sqoop.mapreduce.AutoProgressMapper: 
  Auto-progress thread is finished. keepGoing=false

While the fastest task took only 3 seconds:

2019-10-18 15:01:01,198 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: 
  "updated_at" >= '2014-12-16 23:19:53.961' AND "updated_at" < '2014-12-29 14:11:11.482'

2019-10-18 15:01:01,200 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query:
  SELECT ... FROM "orders" 
  WHERE ("updated_at" >= '2014-12-16 23:19:53.961') AND ("updated_at" < '2014-12-29 14:11:11.482')

2019-10-18 15:01:04,925 INFO [Thread-15] org.apache.sqoop.mapreduce.AutoProgressMapper: 
  Auto-progress thread is finished. keepGoing=false

This shows that now each map task does not force scanning all data pages by PostgreSQL, and can perform index range scan only that runs much faster.