Details
Description
Below is the sample code snipet that is used to fetch data from hbase. This used to work fine with spark-3.1.1
However after upgrading to psark-3.2.0 it is not working, The issue is it is not throwing any exception, it just don't fill RDD.
def getInfo(sc: SparkContext, startDate:String, cachingValue: Int, sparkLoggerParams: SparkLoggerParams, zkIP: String, zkPort: String): RDD[(String)] = {{ val scan = new Scan scan.addFamily("family") scan.addColumn("family","time") val rdd = getHbaseConfiguredRDDFromScan(sc, zkIP, zkPort, "myTable", scan, cachingValue, sparkLoggerParams) val output: RDD[(String)] = rdd.map { row => (Bytes.toString(row._2.getRow)) } output } def getHbaseConfiguredRDDFromScan(sc: SparkContext, zkIP: String, zkPort: String, tableName: String, scan: Scan, cachingValue: Int, sparkLoggerParams: SparkLoggerParams): NewHadoopRDD[ImmutableBytesWritable, Result] = { scan.setCaching(cachingValue) val scanString = Base64.getEncoder.encodeToString(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(scan).toByteArray) val hbaseContext = new SparkHBaseContext(zkIP, zkPort) val hbaseConfig = hbaseContext.getConfiguration() hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableName) hbaseConfig.set(TableInputFormat.SCAN, scanString) sc.newAPIHadoopRDD( hbaseConfig, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ).asInstanceOf[NewHadoopRDD[ImmutableBytesWritable, Result]] }
If we fetch with using scan directly without using newAPIHadoopRDD, it works.
Attachments
Issue Links
- relates to
-
HBASE-28219 Document spark.hadoopRDD.ignoreEmptySplits issue for Spark Connector
- Open
-
PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1
- Resolved