Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8109

Encoder exception for structure contains Iterable of KV

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.14.0, 2.15.0
    • None
    • extensions-java-sorter

    Description

      When doing group by and then sort, the sort should get this structure:

      PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>

      However, for any SecondaryKeyT that I put I get coder exception:EOF, this happens for long and for string coders.

      Happens in DataFlow runner.

       

      Here is the full exception:

      java.lang.IllegalStateException: Unable to decode tag list using WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,IterableCoder(KvCoder(StringUtf8Coder,com.moonactive.data.processor.beam.coders.JsonNodeCoder@20df8330))),IntervalWindow$IntervalWindowCoder) org.apache.beam.runners.dataflow.worker.WindmillStateReader.bagPageValues(WindmillStateReader.java:575) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeBag(WindmillStateReader.java:597) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:504) org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:420) org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:313) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:503) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.read(WindmillStateInternals.java:536) org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:60) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:307) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:231) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1295) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1028) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:81) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) org.apache.beam.runners.dataflow.worker.WindmillStateReader.bagPageValues(WindmillStateReader.java:573) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeBag(WindmillStateReader.java:597) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:504) org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:420) org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:313) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:503) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.read(WindmillStateInternals.java:536) org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:60) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:307) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:231) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1295) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1028) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73) org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:56) org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90) org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:81) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) org.apache.beam.runners.dataflow.worker.WindmillStateReader.bagPageValues(WindmillStateReader.java:573) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeBag(WindmillStateReader.java:597) org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:504) org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:420) org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:313) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:503) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.read(WindmillStateInternals.java:536) org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:60) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:307) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:231) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1295) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1028) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            brachi_packter Brachi Packter
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: