Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.2, 3.1.1
-
None
-
None
Description
The codes of "MyPartitionReaderFactory" :
// Implemention Class package com.lynn.spark.sql.v2 import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import com.lynn.spark.sql.v2.MyPartitionReaderFactory.{MY_VECTORIZED_READER_BATCH_SIZE, MY_VECTORIZED_READER_ENABLED} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.sql.internal.SQLConf.buildConf case class MyPartitionReaderFactory(sqlConf: SQLConf, dataSchema: StructType, readSchema: StructType) extends PartitionReaderFactory with Logging { val enableVectorized = sqlConf.getConf(MY_VECTORIZED_READER_ENABLED, false) val batchSize = sqlConf.getConf(MY_VECTORIZED_READER_BATCH_SIZE, 4096) override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { MyRowReader(batchSize, dataSchema, readSchema) } override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { if(!supportColumnarReads(partition)) throw new UnsupportedOperationException("Cannot create columnar reader.") MyColumnReader(batchSize, dataSchema, readSchema) } override def supportColumnarReads(partition: InputPartition) = enableVectorized } object MyPartitionReaderFactory { val MY_VECTORIZED_READER_ENABLED = buildConf("spark.sql.my.enableVectorizedReader") .doc("Enables vectorized my source scan.") .version("1.0.0") .booleanConf .createWithDefault(false) val MY_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.my.columnarReaderBatchSize") .doc("The number of rows to include in a my source vectorized reader batch. The number should " + "be carefully chosen to minimize overhead and avoid OOMs in reading data.") .version("1.0.0") .intConf .createWithDefault(4096) }
The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter pass to the MyPartitionReaderFactory is not null.
But when the executor deserialize the RDD, the sqlConf parameter is null.
The codes as follows:
// RunTask.scala override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTimeNs = System.nanoTime() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() // the rdd val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L func(context, rdd.iterator(partition, context)) }
Attachments
Attachments
Issue Links
- relates to
-
SPARK-38328 SQLConf.get flaky causes NON-default spark session settings being lost
- Open