Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
An exception was encountered when restarting a job with savepoint in our production env,
2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) ... 5 more Caused by: java.lang.RuntimeException: The class 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' is not instantiable: The class has no (implicit) public nullary constructor, i.e. a constructor without arguments. at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more
I add tests to CRowSerializerTest to make sure this is definitely a bug,
@Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() /////// This would fail the test val serializerConfigSnapshotClass = Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") InstantiationUtil.instantiate(serializerConfigSnapshotClass) } @Test def testStateRestore(): Unit = { class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] { var state: ListState[CRow] = _ override def open(parameters: Configuration): Unit = { val stateDesc = new ListStateDescriptor[CRow]("CRow", new CRowTypeInfo(new RowTypeInfo(Types.INT))) state = getRuntimeContext.getListState(stateDesc) } override def processElement(value: Integer, ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context, out: Collector[Integer]): Unit = { state.add(new CRow(Row.of(value), true)) } } val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction) var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( operator, new KeySelector[Integer, Integer] { override def getKey(value: Integer): Integer= -1 }, Types.INT, 1, 1, 0) testHarness.setup() testHarness.open() testHarness.processElement(new StreamRecord[Integer](1, 1L)) testHarness.processElement(new StreamRecord[Integer](2, 1L)) testHarness.processElement(new StreamRecord[Integer](3, 1L)) assertEquals(1, numKeyedStateEntries(operator)) val snapshot = testHarness.snapshot(0L, 0L) testHarness.close() testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( operator, new KeySelector[Integer, Integer] { override def getKey(value: Integer): Integer= -1 }, Types.INT, 1, 1, 0) testHarness.setup() /////// This would throw the same exception as our production app do. testHarness.initializeState(snapshot) testHarness.open() assertEquals(1, numKeyedStateEntries(operator)) testHarness.close() }
Attachments
Issue Links
- links to