Description
Next code could hang in case if PairFunction logic will throw the exception:
public class Example { public static void main(String[] args) { String configPath = "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml"; IgniteSparkSession igniteSession = IgniteSparkSession.builder() .appName("Spark Ignite catalog example") .master("local") .config("ignite.disableSparkSQLOptimization", true) .igniteConfig(configPath) .getOrCreate(); JavaSparkContext sparkCtx = new JavaSparkContext(igniteSession.sparkContext()); final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList( new GenericRow() )); JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new PairFunction<Row, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Row row) throws Exception { throw new IllegalStateException("some error"); } }); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<>(sparkCtx, configPath); JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, Integer>fromCache("Person"); igniteRdd.savePairs(rdd_records); igniteContext.close(true); } } Looks like next internal code (saveValues method)should also close the IgniteContext in case of an unexpected exception, not only data streamer: try { it.foreach(value ⇒ { val key = affinityKeyFunc(value, node.orNull) streamer.addData(key, value) } ) } finally { streamer.close() } }) }
Attachments
Attachments
Issue Links
- links to