Details
Description
I test kafkachannel in following scenario
- With Flume sink, but no source
my flume-conf.properities is very siample
a1.channels = channel1
a1.sinks = sink1
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.brokerList=10.142.90.28:9092,10.142.90.29:9092,10.142.90.30:9092
a1.channels.channel1.topic=test4
a1.channels.channel1.zookeeperConnect=10.142.90.63:2181,10.142.90.64:2181,10.142.90.65:2181
a1.channels.channel1.kafka.consumer.timeout.ms = 1000000
a1.sinks.sink1.type = logger
a1.sinks.sink1.channel= channel1
my command:
./bin/flume-ng agent -n a1 -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
and then throw a exception:
2015-09-28 17:24:30,735 (flume_j2a1-1443432269768-3f6340d1-leader-finder-thread) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] [ConsumerFetcherManager-1443432269913] Added fetcher for partitions ArrayBuffer([[test4,0], initOffset 6 to broker id:2,host:cs4n3.ecld.com,port:9092] )
2015-09-28 17:24:46,427 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:332)] Error while getting events from Kafka
java.lang.IndexOutOfBoundsException
at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
2015-09-28 17:24:46,430 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Error while getting events from Kafka
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
... 3 more
Caused by: java.lang.IndexOutOfBoundsException
at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
... 6 more