Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Won't Fix
-
2.0.0
-
None
-
None
Description
I'm trying to stream items from storm into hive using the HiveBolt, but Hive does not seem to see the records at all.
Test program:
package com.datto.hivetest; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.hive.bolt.HiveBolt; import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper; import org.apache.storm.hive.common.HiveOptions; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.streams.StreamBuilder; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Time; import java.util.Map; import java.util.Random; public class MainStorm { public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { HiveOptions hiveOptions = new HiveOptions( "<url>", "default", "test_table", new JsonRecordHiveMapper() .withColumnFields(new Fields("value")) ) .withAutoCreatePartitions(true); StreamBuilder builder = new StreamBuilder(); builder.newStream(new TestSpout()) .map(tup -> tup.getStringByField("word").toLowerCase()) .to(new HiveBolt(hiveOptions)); Config config = new Config(); config.setMessageTimeoutSecs(30); config.setMaxSpoutPending(1024); config.setClasspath("/etc/hadoop/conf/"); StormSubmitter.submitTopology("hive-test", config, builder.build()); } public static class TestSpout extends BaseRichSpout { private transient SpoutOutputCollector out; private transient Random random; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { out = collector; random = new Random(); } @Override public void nextTuple() { try { Time.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } final String[] words = new String[]{ "nathan", "mike", "jackson", "golda", "bertels" }; final String word = words[random.nextInt(words.length)]; out.emit(new Values(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } }
Table creation:
CREATE TABLE test_table (value string) CLUSTERED BY (value) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB', 'transactional' = 'true'); GRANT ALL ON test_table TO USER storm;
Setting the ACL:
sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx /warehouse/tablespace/managed/hive/test_table
sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx /warehouse/tablespace/managed/hive/test_table
Hive results after running for around 10 minutes:
> SELECT COUNT(*) FROM test_table; INFO : Compiling command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): SELECT COUNT(*) FROM test_table INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null) INFO : Completed compiling command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time taken: 1.138 seconds INFO : Executing command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): SELECT COUNT(*) FROM test_table INFO : Completed executing command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time taken: 0.013 seconds INFO : OK +------+ | _c0 | +------+ | 0 | +------+
So hive thinks there are no results, which isn't good. But if I look at hdfs, there are some files there:
# sudo -u hdfs hdfs dfs -ls -R -h /warehouse/tablespace/managed/hive/test_table drwxrwx---+ - storm hadoop 0 2019-07-22 19:15 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100 -rw-rw----+ 3 storm hadoop 1 2019-07-22 19:15 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/_orc_acid_version -rw-rw----+ 3 storm hadoop 74.4 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001 -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001_flush_length -rw-rw----+ 3 storm hadoop 73.4 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002 -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002_flush_length -rw-rw----+ 3 storm hadoop 84.9 K 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003 -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003_flush_length
And they seem to have valid rows:
❯❯❯ ./orc-contents /tmp/bucket_00002 | head {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 0, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 1, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 2, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 3, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 4, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 5, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 6, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 7, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 8, "currentTransaction": 1, "row": {"value": "bertels"}} {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 9, "currentTransaction": 1, "row": {"value": "bertels"}}
I can insert into the table manually, and I've also written a test java program that uses the hive streaming API to write one row, and hive sees those inserts. I don't see any errors in the storm logs; the tuples seem to be flushed and acked ok. I don't think I've seen any errors in the metastore logs either.
Anyone know what's up? I can get more info if needed.