Details
-
Question
-
Status: Resolved
-
Minor
-
Resolution: Invalid
-
3.3.0
-
None
Description
Hi team,
I was testing out Hive bucket table features. One of the benefits as most documentation suggested is that bucketed hive table can be used for query filer/predict pushdown to improve query performance.
However through my exploration, that doesn't seem to be true. Can you please help to clarify if Spark SQL supports query optimizations when using Hive bucketed table?
How to produce the issue:
Create a Hive 3 table using the following DDL:
create table test_db.bucket_table(user_id int, key string) comment 'A bucketed table' partitioned by(country string) clustered by(user_id) sorted by (key) into 10 buckets stored as ORC;
And then insert into this table using the following PySpark script:
from pyspark.sql import SparkSession appName = "PySpark Hive Bucketing Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # prepare sample data for inserting into hive table data = [] countries = ['CN', 'AU'] for i in range(0, 1000): data.append([int(i), 'U'+str(i), countries[i % 2]]) df = spark.createDataFrame(data, ['user_id', 'key', 'country']) df.show() # Save df to Hive table test_db.bucket_table df.write.mode('append').insertInto('test_db.bucket_table')
Then query the table using the following script:
from pyspark.sql import SparkSession appName = "PySpark Hive Bucketing Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() df = spark.sql("""select * from test_db.bucket_table where country='AU' and user_id=101 """) df.show() df.explain(extended=True)
I am expecting to read from only one bucket file in HDFS but instead Spark scanned all bucket files in partition folder country=AU.
== Parsed Logical Plan == 'Project [*] - 'Filter (('country = AU) AND ('t1.user_id = 101)) - 'SubqueryAlias t1 - 'UnresolvedRelation [test_db, bucket_table], [], false == Analyzed Logical Plan == user_id: int, key: string, country: string Project [user_id#20, key#21, country#22] - Filter ((country#22 = AU) AND (user_id#20 = 101)) - SubqueryAlias t1 - SubqueryAlias spark_catalog.test_db.bucket_table - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc == Optimized Logical Plan == Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = AU)) AND (user_id#20 = 101)) - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc == Physical Plan == *(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101)) - *(1) ColumnarToRow - FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun..., PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: [IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: struct<user_id:int,key:string>
Am I doing something wrong? or is it because Spark doesn't support it? Your guidance and help will be appreciated.