I/O,  Parquet,  Storage

How Map Column is Written to Parquet – Converting JSON to Map to Increase Read Performance

It is quite common today to convert incoming JSON data into Parquet format to improve the performance of analytical queries.

When JSON data has an arbitrary schema i.e. different records can contain different key-value pairs, it is common to parse such JSON payloads into a map column in Parquet.

How is it stored? What read performance can you expect? Will json_map["key"] read only data for key or the entire JSON?

Data

Consider an example of input JSON records:

{ "Type": "Enter", "Store": "Orange", "State": "NY", ... }
{ "Type": "New Product", "Name": "Box", "Category": "Goods", ... }
{ "Type": "Purchase", "Product": "Apple", "Color": "Red", ... } 
...

So there is a stream of events having different structure for different Type. The consumers select required data based on the event type. For example, if you store JSON in a string column, you can use the following Hive query to get the number of purchases per product:

select 
  get_json_object(json, '$.Product'),
  count(*)
from events
where get_json_object(json, '$.Type') = 'Purchase'
group by get_json_object(json, '$.Product')

With this approach you have to read the entire JSON and parse it every time. What if you convert the data to a map column in Parquet?

select 
  json_map['Product'],
  count(*)
from events
where json_map['Type'] = 'Purchase'
group by json_map['Product']

We do not need to parse JSON data in every query anymore. But will the query read data only for keys Type and Product?

Map in Parquet

A map column has the following definition in the Avro schema created for a Parquet file:

{ "name": "json_map", "type": { "type": "map", "value": "string"} }

Or the following definition in the corresponding Protobuf message:

repeated group json_map {
  required string key;
  required string value;
}

That leads to the following definition in Parquet (you can see it using parquet-tools):

json_map:  REQUIRED F:1
.map:      REPEATED F:2
..key:     REQUIRED BINARY O:UTF8 R:1 D:1
..value:   REQUIRED BINARY O:UTF8 R:1 D:1

Briefly speaking, internally there is no concept Map in Parquet, it is implemented as a repeated group of keys and values.

So what is the problem?

From the internal representation of Map column in Parquet (and Dremel paper it was based on) you can grasp that all keys and all values are stored in just two separate columns, not a separate column with values per each key (looks like it is impossible to define in the internal schema).

So you can read all the map keys without reading the map values, but reading values for some key requires reading all other values for other keys as well. It is far from optimal if JSON data has hundreds of keys-value pairs in every row.

Sample Data in Parquet

Let’s generate some sample data in Parquet and see how it is stored. I will use the following Avro schema for my test:

{
  "type": "record",
  "namespace": "parquetapp",
  "name": "ParquetAppRecord",
  "fields": [
    { "name": "colA", "type": "string" },
    { "name": "colB", "type": "string" },
    { "name": "colC", "type": { "type": "map", "values": "string"} }
  ]
}

There are 3 columns: colA string, colB string and colC map[string, string].

A Java code snippet to write data:

Schema avroSchema = ParquetAppRecord.getClassSchema();
MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);

Path filePath = new Path("./example.parquet");
int blockSize = 10240;
int pageSize = 5000;

AvroParquetWriter parquetWriter = new AvroParquetWriter(
    filePath,
    avroSchema,
    CompressionCodecName.UNCOMPRESSED,
    blockSize,
    pageSize);

for(int i = 0; i < 1000; i++) {
    HashMap mapValues = new HashMap();
    mapValues.put("CCC", "CCC" + i);
    mapValues.put("DDD", "DDD" + i);
    mapValues.put("EEE", "EEE" + i);

    ParquetAppRecord rec = new ParquetAppRecord("AAA" + i, "BBB" + i, mapValues);
    parquetWriter.write(rec);
}

So the code writes the following rows to Parquet:

AAA1,   BBB1,   {"CCC":"CCC1", "DDD":"DDD1", "EEE": "EEE1"}
AAA2,   BBB2,   {"CCC":"CCC2", "DDD":"DDD2", "EEE": "EEE2"}
AAA3,   BBB3,   {"CCC":"CCC3", "DDD":"DDD3", "EEE": "EEE3"}
...

Let’s open the generated Parquet file in an text editor to see how rows were written. You can see that colA and colB values are stored in the columnar form (first chunk contains values for colA, second for colB):

While the values for the map column are interleaved and stored together (but may be reordered based on the hash value of a key), there is no separate storage for values for every key:

So storing JSON values in a map column in Parquet avoids parsing JSON in every query, but it still requires reading chunks of data for all map values, not just for the selected keys.

To get a better performance you can consider moving JSON keys into individual columns although this may complicate ETL and requires a separate table for every JSON type and more effort to maintain them if the schemas change and keys are added, modified or removed.