Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9336

Queryable state fails with Exception after task manager restore

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.5.0
    • 1.5.0
    • None

    Description

       

      The following steps can be used to reproduce the Exception:

      1. Run an app with a FlatMap which exposes its MapState through Queryable State with RocksDB
      2. Run a queryable state client: ✔️
      3. Kill the TM
      4. Start a new TM
      5. Run a queryable state client: ❌ (StackTrace below)

       

      This happens if we run our queryable state client after the new TM has started and the app is running but before the FlatMap has processed elements again

       

      With a little help and some debugging I found out that very likely the reason is that in the RocksDBMapState there is a private field userKeyOffset, which is initialised through the code path of RocksDBMapState::serializeCurrentKeyAndNamespace. This will only be called when processing an element and therefore accessing the state. If now the state is accessed before that through queryable state, this value will be 0 and therefore the deserialization of the user key will fail as seen below. 

       

      The observed stacktrace

       

      Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: java.lang.RuntimeException: Error while deserializing the user key. at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414) at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220) at org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readUTF(DataInputStream.java:609) at java.io.DataInputStream.readUTF(DataInputStream.java:564) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381) at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341) at org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61) at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412) ... 11 more at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122) at org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75) Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: java.lang.RuntimeException: Error while deserializing the user key. at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414) at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220) at org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readUTF(DataInputStream.java:609) at java.io.DataInputStream.readUTF(DataInputStream.java:564) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381) at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341) at org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61) at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412) ... 11 more at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
      

       

      Attachments

        Issue Links

          Activity

            People

              sihuazhou Sihua Zhou
              florianschmidt Florian Schmidt
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: