Details
-
Question
-
Status: Closed
-
Blocker
-
Resolution: Won't Fix
-
None
-
None
Description
I am using storm-kafka-client 1.2.2.
When I ran my topology I got NPE as mentioned in https://issues.apache.org/jira/browse/STORM-3046.
So I modified the KafkaTridentSpoutEmitter.java#seek with patch mentioned in the Jira STORM-3046. Below is the modified code.
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) { if (isFirstPoll(tp, transactionId)) { if (firstPollOffsetStrategy == EARLIEST) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); kafkaConsumer.seekToBeginning(Collections.singleton(tp)); } else if (firstPollOffsetStrategy == LATEST) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp); kafkaConsumer.seekToBeginning(Collections.singleton(tp)); } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp); kafkaConsumer.seekToEnd(Collections.singleton(tp)); } firstPollTransaction.put(tp, transactionId); } else if (lastBatchMeta != null) { kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); } else { long initialFetchOffset = firstPollTransaction.get(tp); OffsetAndMetadata lastCommittedOffset = kafkaConsumer.committed(tp); kafkaConsumer.seek(tp, lastCommittedOffset.offset()); LOG.debug("First poll for topic partition [{}], no last batch metadata present." + " Using stored initial fetch offset [{}]", tp, initialFetchOffset); } final long fetchOffset = kafkaConsumer.position(tp); LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp); return fetchOffset; }
Now after code change - when the offset strategy is UNCOMMITTED_EARLIEST for the very first time when it is the first poll for the spout, lastBatchMeta is null and the kafka consumer seeks to the beginning offset. Should I be checking for last commit and starting from there? Likewise for the next fetch (when lastBatchMeta is not null), kafka consumer seeks to lastBatchMeta.getLastOffset() + 1. Should I be doing same here, checking for last commit and starting from there?