Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.4.0
-
None
-
None
Description
When I'm trying to do the following inside the listener class, and go to the UI to check the streaming query tab and click on Sort by Latest Batch, I get the below error.
class MyListener extends StreamingQueryListener { private val kafkaAdminClient = try { Some(Admin.create(getKafkaProp())) } catch { case _: Throwable => None } private val registeredConsumerGroups = kafkaAdminClient.get.listConsumerGroups().all().get() override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val eventName = event.progress.name commitConsumerOffset(event.progress.sources.head.endOffset, eventName) } private def getKafkaProp(eventName: String = "dummy-admin"): Properties = { val props: Properties = new Properties() props.put("bootstrap.servers", Configuration.configurationMap("kafka.broker")) props.put("group.id", eventName) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put("security.protocol", Configuration.configurationMap("kafka.security.protocol")) props.put("sasl.mechanism", Configuration.configurationMap("kafka.sasl.mechanism")) props.put("sasl.jaas.config", Configuration.configurationMap("kafka.sasl.jaas.config")) props } private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, Map[String, Long]]) = { val offsets = jsonOffsets.head._2 val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]() offsets.keys.foreach(partition => { val tp = new TopicPartition(topic, partition.toInt) val oam = new OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue()) topicPartitionMap.put(tp, oam) }) topicPartitionMap } private def commitConsumerOffset(endOffset: String, eventName: String) = { val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, Map[String, Long]]]) val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets) if (!registeredConsumerGroups.contains(eventName)) { print("topic consumer not created! creating") val kafkaConsumer = new KafkaConsumer[String, String](getKafkaProp(eventName)) kafkaConsumer.commitSync(topicPartitionMap) kafkaConsumer.close() } else { print("committing cg offsets using admin") kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, topicPartitionMap).all().get() } } }
HTTP ERROR 500 java.lang.NullPointerException
URI: | /StreamingQuery/active |
---|---|
STATUS: | 500 |
MESSAGE: | java.lang.NullPointerException |
SERVLET: | org.apache.spark.ui.JettyUtils$$anon$1-522a82dd |
CAUSED BY: | java.lang.NullPointerException |
Caused by:
java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.<init>(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.base/java.lang.Thread.run(Thread.java:829)