Parquet is one of the most popular columnar file formats used in many tools including Apache Hive, Spark, Presto, Flink and many others.
For tuning Parquet file writes for various workloads and scenarios let’s see how the Parquet writer works in detail (as of Parquet 1.10 but most concepts apply to later versions as well).
Parquet File Structure
A Parquet file consists of one or more Row Groups, a Row Group consists of one data chunk for every column following each other, and every data chunk consists one or more Pages with the column data:
So a single Row Group contains data for all columns for some number of rows (it can vary even within a single Parquet file, more on this below), and if you look at the content of a Row Group, first you will see the data for the first column, then the data for the second column and so on.
If later you need to read a single column from a Parquet file, you have to read the corresponding column chunks from all Row Groups, but you do not need to read the full content of all Row Groups.
Writing a Row
Although Parquet is a columnar format, this is its internal representation and you still have to write data row by row: InternalParquetRecordWriter.write(row)
Every row is immediately split into columns and column values are added into the individual in-memory column store for every column. The min/max values statistics as well as the number of NULL values are immediately updated for every column.
Everything is in memory yet.
Page
After writing first 100 values for a column (for 100 rows), the Parquet writer checks if this 100-values column content exceeds the specified page size (default is 1 MB).
If the raw data size for the column does not exceed the page size threshold then the next page size check is constantly adjusted based on the actual column size, so it neither checked after every column value nor after every 100 values. Thus the page size is not the strict limit.
If the raw data size size exceeds the page size, the column content is compressed (if a compression is specified for the Parquet file), and flushed into the Page store for the column.
Every page contains metadata (page header) that includes uncompressed size, compressed size, number of values, and statistics: minimum and maximum values for the column in this page and the number of NULL values.
Everything is still in memory, but unlike the Column store, the data is now compressed.
Row Group (Block Size)
After writing the first 100 rows (to memory), the Parquet writer checks if the data size exceeds the specified row group size (block size) for the Parquet file (default is 128 MB).
This size includes the uncompressed size of data in the Column store (not flushed to the Page store yet) as well as the compressed data size that already in the Page store for every column.
If the data size does not exceed the specified row group size, the Parquet writer estimates the next size check based on the average row size. It can be next 100 or even 10,000 rows so the Row group size limit is not strict as well.
If the data size exceeds the specified row group size, the Parquet writer flushes the content of Column stores into Page stores for every column, and then flushes the content of all Pages stores into the output stream, column by column.
This is the first time when the data are written to the outside stream (HadoopPositionOutputStream
) and can be potentially visible to the outside components (not end users), for example, S3 Multipart Upload transfer threads can start uploading data to S3 in the background.
When the row group is flushed the memory for the current column and page stores is freed, although some Garbage collector cycles are required to make it available again.
Note that the Row Group content does not contain any metadata (statistics, offsets etc.), the Row Groups metadata is added to the Parquet file footer instead.
File Footer
When all the row groups are written and before the closing the file the Parquet writer adds the footer to the end of the file.
The footer includes the file schema (column names and their types) as well as details about every row group (total size, number of rows, min/max statistics, number of NULL values for every column). Note that this column statistics is per row group, not for the entire file.
Writing all the metadata to the footer allows the Parquet writer to not keep the entire file in the memory or a local disk that’s why the row groups can be safely flushed when they are complete.
Logging
You can monitor how Parquet writer works by looking at the application logs. There are a few most important messages logged in INFO mode.
If the current row group size exceeds the row group threshold – checkBlockSizeReached():
LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
A sample from an application log:
May 29, 2020 1:58:35 PM org.apache.parquet.hadoop.InternalParquetRecordWriter checkBlockSizeReached INFO: mem size 268641769 > 268435456: flushing 324554 records to disk.
The log message above says that the current content is 268,641,769
bytes while the row group size threshold is 268,435,456
(256 MB), so 324,554
rows are flushed to the output stream (not necessarily a disk).
When a row group is flushed you can see the following log message – flushRowGroupToStore():
LOG.info("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize());
Note that the columnStore
size here includes the size of the page stores as well.
A sample from an application log:
May 29, 2020 1:58:35 PM org.apache.parquet.hadoop.InternalParquetRecordWriter flushRowGroupToStore INFO: Flushing mem columnStore to file. allocated memory: 199496450