Details
-
New Feature
-
Status: Resolved
-
Major
-
Resolution: Later
-
2.3.2
-
None
-
None
Description
Problem Statement
The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
Technical Solution
- Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
- Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
- Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.
Technical Details
Note: The code below kind of in messy format. The link below shows correct format.
Extend SparkToParquetSchemaConverter class
SparkToParquetMetadataSchemaConverter extends SparkToParquetSchemaConverter {
override def convert(catalystSchema: StructType): MessageType =
{ Types ._buildMessage_() .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*) .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_) }
private def convertFieldWithMetadata(field: StructField) : Type =
{ val extField = new ExtType[Any](convertField(field)) val metaBuilder = new MetadataBuilder().withMetadata(field.metadata) val metaData = metaBuilder.getMap extField.setMetadata(metaData) return extField }}
Extend ParquetWriteSupport
class CryptoParquetWriteSupport extends ParquetWriteSupport {
override def init(configuration: Configuration): WriteContext =
{ val converter = new *SparkToParquetMetadataSchemaConverter*(configuration) createContext(configuration, converter) }}
Make WriteSupport configurable
class ParquetFileFormat{
** override def prepareWrite(...) {
…
if (conf.get(ParquetOutputFormat.WRITE_SUPPORT_CLASS) == null) {
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
**
...
}
}
Verification
The ParquetHelloWorld.java in the github repository parquet-writesupport-extensions has a sample verification of passing down the field metadata and perform column encryption.
Dependency
- Parquet-1178
- Parquet-1396
- Parquet-1397
Attachments
Issue Links
- Blocked
-
PARQUET-1178 Parquet modular encryption
- Resolved
-
PARQUET-1396 EncryptionPropertiesFactory and DecryptionPropertiesFactory
- Resolved
-
PARQUET-1397 Sample of usage Parquet-1396 and Parquet-1178 for column level encryption with pluggable key access
- Open
- links to