Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
hi,
I now with an example of a website, run our Kafka log, the log of our Kafka has 128 partitions, I use 128 mapper to run task, our data is very big, so I set the execution time 15 minutes of each task, ,but I found ths every time our get hdfs result is not integrity. for example ,frist time ,I got frist time partition 10 offset from 100000 to 90000,but after 15 minutes ,I got hdfs partitions 10 records is:23000; So I think the second time the offset is should begin 123000,but when I run the second time the begin offset is from 160000,I want to konw why ?
The following is the configuration file information:
-
-
- job.name=ZeusLogOnlineJob
-
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=####
topic.whitelist=zeus
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=128
metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
fs.uri=hdfs://
writer.fs.uri=hdfs://
state.store.fs.uri=hdfs://
mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output
extract.limit.enabled=true
extract.limit.type=time
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
_
Github Url : https://github.com/linkedin/gobblin/issues/751
Github Reporter : zhenglu696
Github Created At : 2016-02-24T03:18:12Z
Github Updated At : 2016-03-04T17:23:57Z
Comments
stakiar wrote on 2016-02-25T03:11:44Z : Hey @zhenglu696 I am not sure I fully understand the problem, can you elaborate a little more and provide a few more details? Thanks
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188582567
zhenglu696 wrote on 2016-02-25T07:59:35Z : hey,@sahilTakiar,I have the following questions:
1, In the configuration file, I have configured the execution time of each task, such as configuration for fifteen minutes,eg:
extract.limit.enabled=true
extract.limit.type=time
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
I configure the Kafka pull strategy is: earliest;
The task is not performed in 15 minutes, When I run the second time, When the second run, I want to know how to pull from which offset? whether to pull data from the last offset last time?
2, when I pull data with 128 partitions, with 128 map, using the above strategies that sometimes generated total files numbers is not 128? Why is this?
thank you
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188661097
stakiar wrote on 2016-02-25T18:33:52Z : @zliu41 correct me if I am wrong
1: The first run will pull from the earliest offsets. Let's say it pulled data from `[earliest, x]`. assuming the first run completed with no failures, the second run will pull from offsets `(x, y]` automatically.
2: Is it possible that some of the Kafka partitions have no data? This could explain why some map tasks don't write any files.
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188921010
zhenglu696 wrote on 2016-02-26T00:57:36Z : @sahilTakiar @zliu41
The first run will pull from the earliest offsets. this is Ok. I think the frist time offset is [earlist,X],
X is the current time this task read the newest offset,but in I run this task 15minites,in this times,the task is not performed ,eg :this task pull[earliest,Y],I want to konw the secnond run this task ,it begins
X or Y ????
thinks !!!!!
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-189059186
stakiar wrote on 2016-02-26T18:24:35Z : Kafka only deals with offsets, not with timestamps. Each offset corresponds to a single record. So you pull data from offset `x` to offset `y`. Kafka is agnostic to the timestamp of each record. The `extract.limit.time.limit` only controls how long Gobblin Tasks will spend trying to consume data from a topic.
The first run will pull data from the earliest offset to some offset `x` where `x` is `>` the earliest offset. The value of `x` is determined at runtime. Gobblin will start to pull from the earliest offset. It contacts Kafka and says give the record corresponding to the lastest offset. It processes that record and then requests the earliest offset + 1, then the earliest offset + 2, etc. It will do this until 15 minutes have elapsed. At which point it will be at some offset we call `x`.
The second run will pull data from `x` to `y` where `y` is `>` `x`
I also suggest taking a look at the Kafka documentation.
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-189412094
zhenglu696 wrote on 2016-02-29T07:31:42Z : Hi,@sahilTakiar @zliu41,
Thank you for your answer!
I also have some questions as follows :
1. if I set mapper.nums equals to kafka.topic.partition.nums, then whether each mapper consume a specific partition of the topic? if mapper.nums bigger than partitions.nums, for example 256 mappers ,I want to konw what will other mappers do?
2. Where is the consumption offset storage in? If my job failed , I want to manually modify offset spending again, is that possible? if possible , how to operate? I think it stoarge in the state.store.dir this config, I open this file ,I found this file is error code ,How to open this file and manually modify offset?
3、Since the data is pushed into Kafka streamingly and continuously during the Gobblin is running, Will a mapper records the latest offset of a partition before it starts consume the topic and just consumed to the recorded offset and finish the mapper? If not , how does a mapper judge a data of the partition is all consumed?
thanks a lot!
Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-190075318