Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Won't Do
-
1.11.0
-
None
-
None
-
Mac OS 10.13.6
Kubernetes 1.16.8
Flink 1.11.0
Description
I found a DateSet problem. In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap.I think it's a problem with the operator chain.I will post a screenshot of the corresponding stack call in the attachment.
text.filter(value -> value.f0.contains("any")).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception { Pattern pattern = Pattern.compile("\".*\""); Matcher matcher = pattern.matcher(value.f0); if(matcher.find()){ String match = matcher.group(0); out.collect(match); // here throw Exception } } }).map(value -> { try { String jsonS = value.replace("\"\"","\""); jsonS = jsonS.substring(1,jsonS.length()-1); JSONObject json = JSONObject.parseObject(jsonS); String result = json.getJSONObject("body").getJSONObject("message").getString("data"); return result; // this is null }catch (Exception e){ return value; } }).print(); Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects.Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects. at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at com.lemonbox.Test$1.flatMap(Test.java:42) at com.lemonbox.Test$1.flatMap(Test.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)