Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36534

No way to check If Spark Session is created successfully with No Exceptions and ready to execute Tasks

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.1
    • None
    • Java API, Kubernetes, Scheduler
    • None
    • Spark 3.0.1

    Description

      I am running Spark on Kubernetes in Client mode. Spark driver is spawned programmatically (No Spark-Submit). Below is the dummy code to set SparkSession with KubeApiServer as Master.

       

      // code placeholder
      private static SparkSession getSparkSession()
      {
      mySparkSessionBuilder = SparkSession.builder()
                    .master("k8s://http://<some_IP>:6443")
                    .appName("spark-K8sDemo")
                    .config("spark.kubernetes.container.image","spark:3.0")
                    .appName("spark-K8sDemo")
                    .config("spark.jars", "/tmp/jt/database-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
                    .config("spark.kubernetes.executor.podTemplateFile","/tmp/jt/sparkExecutorPodTemplate.yaml")
                    .config("spark.kubernetes.container.image.pullPolicy","Always")
                    .config("spark.kubernetes.namespace","my_namespace")
                    .config("spark.driver.host", "spark-driver-example")
                    .config("spark.driver.port", "29413")
                    .config("spark.kubernetes.authenticate.driver.serviceAccountName","spark")
                    .config("spark.extraListeners","K8sPoc.MyHealthCheckListener");
      setAditionalConfig();
      mySession= mySparkSessionBuilder.getOrCreate();
      return mySession;
      }
      

       

      Now the  problem is that, in certain scenarios like if K8s master is not reachable or master URL is incorrect or spark.kubernetes.container.image config is missing then it throws below exceptions (Exception 1 and Exception 2 given below).

      These exceptions are never propagated to Spark Driver program which in turn makes Spark Application in stuck state forever.

      There should be a way to know via SparkSession or SparkContext object if Session was created successful without any such exceptions and can run SparkTasks??

      I have looked at SparkSession, SparkContext API documentation and SparkListeners but didn't find any such way to check if SparkSession is ready to run the Tasks or if not then dont keep the Spark Application in hanging state rather return a proper error/warn message to calling API.

       

      Exception 1: (If spark.kubernetes.container.image config is missing:

       

      21/08/16 16:27:07 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 21/08/16 16:27:07 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes. 21/08/16 16:27:07 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1 org.apache.spark.SparkException: Must specify the executor container image at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.$anonfun$executorContainerImage$1(BasicExecutorFeatureStep.scala:41) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.(BasicExecutorFeatureStep.scala:41) at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:216) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 
      
      

      Exception 2: (If K8s master is not reachable or wrong URL:

       21/08/16 16:45:07 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1 io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create] for kind: [Pod] with name: [null] in namespace: [default] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:338) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:222) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: connect timed out at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at java.net.Socket.connect(Socket.java:606) at okhttp3.internal.platform.Platform.connectSocket(Platform.java:129) at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:247) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:111) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257) at okhttp3.RealCall.execute(RealCall.java:93) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:251) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:815) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:333) ... 16 more 21/08/16 16:45:08 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes. 21/08/16 16:45:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 21/08/16 16:45:11 WARN WatchConnectionManager: Exec Failure

      Attachments

        Activity

          People

            Unassigned Unassigned
            Tyagi Jahar
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: