Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5992

Kryo SerDe error when performing compaction with GenericData$Fixed types on Hudi-Flink

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.13.1
    • None

    Description

      Background

      Hudi-Flink is using Avro 1.10.0, while the rest (or rather, most) of the other Hudi-* modules are using Avro 1.8.2.

       

      Avro introduced a few changes after version 1.8.2. One of which is the introduction of anonymous classes in JsonProperties.java (org.apache.avro.JsonProperties#props). The field props is implemented via anonymous inner classes, causing issues when performing deSerialization.

       

      The error (in the stacktrace) can be triggered if a table with a precombineField  of type DECIMAL(p, s) is written into the BitCaskDiskMap when performing a compaction or when merge is required and the SpillableDiskMap is required to write values in/onto the *DiskMap.

      Same issue is met by other Apache projects

      More details can be found in this Apache's Jira issue here: AVRO-3438

      Spark has also encountered this issue: SPARK-34477

       

      A workaround for this is to copy Spark's implementation of GenericAvroSerializer over (and simplify it a little).

      Minimal example (Flink-SQL + Streaming mode)

      A minimal Flink-SQL snippet to trigger this issue under STREAMING MODE:

      CREATE TEMPORARY TABLE src_table (
          id bigint, 
          full_name ROW<first_name STRING, last_name STRING>,
          country string,
          age INT,
          update_time decimal(20,0)
      ) WITH (
          'connector' = 'datagen',
          'rows-per-second' = '50',
          'fields.age.min' = '0',
          'fields.age.max' = '2',
          'fields.country.length' = '1'
      );-- Hudi table to write to
      CREATE TEMPORARY TABLE dst_table
      (
          id bigint,
          full_name ROW<first_name STRING, last_name STRING>,
          country string,
          age INT,
          update_time decimal(20,0)
      ) PARTITIONED BY (age)
      WITH
      (
          -- Hudi settings
          'connector' = 'hudi',
          'hoodie.datasource.write.recordkey.field' = 'id',
          'path' = 'hdfs://path/to/dst_table',
          'write.operation' = 'upsert',
          -- 'write.operation' = 'upsert',
          'table.type' = 'MERGE_ON_READ', 
          'hoodie.table.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
          'write.precombine.field' = 'update_time',
          'index.type' = 'BUCKET',
          'hoodie.bucket.index.num.buckets' = '4',
          'hoodie.bucket.index.hash.field' = 'id',
          'compaction.max_memory' = '0', -- ensure that all records are spilled to disk
          
          -- Hive sync settings
          'hive_sync.enable' = 'false'
      );-- Insert into Hudi sink
      INSERT INTO dst_table
      SELECT id, full_name, country, age, update_time
      FROM src_table;

       

      Minimal example (Unit test)

      To reproduce this error, create this test under the directory:

       

      hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestBitCaskDiskMapFromFlink.java
      

       

      package org.apache.hudi.sink.utils;
      
      import java.io.IOException;
      import org.apache.avro.LogicalTypes;
      import org.apache.avro.Schema;
      import org.apache.avro.generic.GenericData;
      import org.apache.avro.generic.GenericFixed;
      import org.apache.hudi.common.model.EventTimeAvroPayload;
      import org.apache.hudi.common.model.HoodieAvroRecord;
      import org.apache.hudi.common.model.HoodieKey;
      import org.apache.hudi.common.model.HoodieRecord;
      import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
      import org.apache.hudi.common.util.collection.BitCaskDiskMap;
      import org.junit.jupiter.api.Test;
      
      public class TestBitCaskDiskMapFromFlink extends HoodieCommonTestHarness {
      
        @Test
        public void testPutDecimal() throws IOException {
          // the avro version used by hudi-flink module is 1.10.0
          // placing the test here will use avro 1.10.0, allowing the error caused by anonymous classes to be thrown
          BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, true);
          Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
              .addToSchema(Schema.createFixed("fixed", null, "record.precombineField", 9));
      
          byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, -32};
          GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, decimalFieldBytes);
      
          HoodieRecord avroRecord = new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
              new EventTimeAvroPayload(null, (Comparable) genericFixed));
      
          records.put("a", avroRecord);
          records.get("a");
        }
      
      } 

       

      Stacktrace

      Stacktrace for the error:

      com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
      Serialization trace:
      props (org.apache.avro.Schema$FixedSchema)
      schema (org.apache.avro.generic.GenericData$Fixed)
      orderingVal (org.apache.hudi.common.model.EventTimeAvroPayload)
      data (org.apache.hudi.common.model.HoodieAvroRecord)    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
          at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
          at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
          at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
          at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
          at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
          at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
          at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
          at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:106)
          at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:80)
          at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:210)
          at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:203)
          at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:199)
          at org.apache.hudi.sink.utils.TestBitCaskDiskMapFromFlink.testPutDecimal(TestBitCaskDiskMapFromFlink.java:51)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
          at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
          at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
          at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
          at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
          at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
          at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
          at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
          at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
          at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
          at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
          at java.util.ArrayList.forEach(ArrayList.java:1259)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
          at java.util.ArrayList.forEach(ArrayList.java:1259)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
          at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
          at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
          at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
          at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
          at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
          at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
          at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
          at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
          at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
          at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
          at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
      Caused by: java.lang.NullPointerException
          at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
          at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
          at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
          at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
          at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
          at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
          at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
          ... 84 more 

      Attachments

        Issue Links

          Activity

            People

              voonhous voon
              voonhous voon
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: