Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-625

Address performance concerns on DiskBasedMap.get() during upsert of thin records

    XMLWordPrintableJSON

Details

    Description

      https://github.com/apache/incubator-hudi/issues/1328

       

       So what's going on here is that each entry (single data field) is estimated to be around 500-750 bytes in memory and things spill a lot... 

      20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, currentLocation='HoodieRecordLocation {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', newLocation='HoodieRecordLocation {instantTime=20200220225921, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} 

       

      INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
      Total size in bytes of MemoryBasedMap => 83886580
      Number of entries in DiskBasedMap => 2849125
      Size of file spilled to disk => 1067101739 

      Reproduce steps

       

      export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
      ${SPARK_HOME}/bin/spark-shell \
          --executor-memory 6G \
          --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
          --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
      

       

      val HUDI_FORMAT = "org.apache.hudi"
      val TABLE_NAME = "hoodie.table.name"
      val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
      val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
      val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
      val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
      val UPSERT_OPERATION_OPT_VAL = "upsert"
      val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
      val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
      val config = Map(
      "table_name" -> "example_table",
      "target" -> "file:///tmp/example_table/",
      "primary_key" ->  "id",
      "sort_key" -> "id"
      )
      val readPath = config("target") + "/*"val json_data = (1 to 4000000).map(i => "{\"id\":" + i + "}")
      val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
      val df1 = spark.read.json(jsonRDD)
      
      println(s"${df1.count()} records in source 1")
      
      df1.write.format(HUDI_FORMAT).
        option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
        option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
        option(TABLE_NAME, config("table_name")).
        option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
        option(BULK_INSERT_PARALLELISM, 1).
        mode("Overwrite").
        save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table")
      
      // Runs very slow
      df1.limit(3000000).write.format(HUDI_FORMAT).
        option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
        option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
        option(TABLE_NAME, config("table_name")).
        option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
        option(UPSERT_PARALLELISM, 20).
        mode("Append").
        save(config("target"))
      
      // Runs very slow
      df1.write.format(HUDI_FORMAT).
        option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
        option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
        option(TABLE_NAME, config("table_name")).
        option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
        option(UPSERT_PARALLELISM, 20).
        mode("Append").
        save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table")
      

       

       

       

      Analysis

      Upsert (4000000 entries)

      WARN HoodieMergeHandle: 
      Number of entries in MemoryBasedMap => 150875 
      Total size in bytes of MemoryBasedMap => 83886580 
      Number of entries in DiskBasedMap => 3849125 
      Size of file spilled to disk => 1443046132
      

      Hang stackstrace (DiskBasedMap#get)

       

      "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
          at java.util.zip.ZipFile.getEntry(Native Method)
          at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
          -  locked java.util.jar.JarFile@1fc27ed4
          at java.util.jar.JarFile.getEntry(JarFile.java:240)
          at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
          at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
          at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
          at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
          at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
          at java.security.AccessController.doPrivileged(Native Method)
          at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
          -  locked java.lang.Object@28f65251
          at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
          -  locked scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
          at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
          -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
          at com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
          at com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorAccess.java:59)
          -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
          at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase.lambda$newInstantiator$0(SerializationUtils.java:151)
          at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase$$Lambda$265/1458915834.newInstance(Unknown Source)
          at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:562)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:538)
          at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
          at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
          at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
          at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:112)
          at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:86)
          at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
          at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
          at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
          at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:173)
          at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
          at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:280)
          at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:434)
          at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:424)
          at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
          at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
          at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$76/1412692041.call(Unknown Source)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      

       

      Average time of DiskBasedMap#get

       

      $ monitor *DiskBasedMap get -c 12
      
      Affect(class-cnt:1 , method-cnt:4) cost in 221 ms.
       timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
      ----------------------------------------------------------------------------------------
       2020-02-20 18:13:36  DiskBasedMap  get     5814   5814     0     6.12        0.00%
      
      
       timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
      ----------------------------------------------------------------------------------------
      2020-02-20 18:13:48  DiskBasedMap   get     9117   9117     0     3.89        0.00%
      
      
       timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
      ----------------------------------------------------------------------------------------
       2020-02-20 18:14:16  DiskBasedMap  get     8490   8490     0     4.10        0.00%
      

       

      Call time strace:

      thread-2;id=194;is_daemon=false;priority=5;TCCL=org.apache.spark.repl.ExecutorClassLoader@7a47bc29
          `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
              +---[0.001704ms] java.util.Map:get()
              `---[4.344261ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
                  `---[4.328981ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
                      +---[0.00122ms] org.apache.hudi.common.util.collection.DiskBasedMap:getRandomAccessFile()
                      `---[4.313586ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
                          `---[4.283509ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
                              +---[0.001169ms] org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getOffsetOfValue()
                              +---[7.1E-4ms] java.lang.Long:longValue()
                              +---[6.97E-4ms] org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getSizeOfValue()
                              +---[0.036483ms] org.apache.hudi.common.util.SpillableMapUtils:readBytesFromDisk()
                              `---[4.201996ms] org.apache.hudi.common.util.SerializationUtils:deserialize()

      Kryo deserialize performance test

       

      import org.apache.avro.Schema;
      import org.apache.avro.generic.GenericData;
      import org.apache.avro.generic.GenericRecord;
      import java.util.LinkedList;
      import java.util.List;
      import java.util.Random;
      
      /**
       * Test serialization.
       */
      public class TestSerializationUtils {
      
          public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
                  + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
                  + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
                  + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
                  + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
                  + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
                  + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"
                  + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
      
          public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
      
          public static GenericRecord generateGenericRecord() {
              Random RAND = new Random(46474747);
      
              GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
              rec.put("_row_key", "rowKey");
              rec.put("timestamp", "timestamp");
              rec.put("rider", "riderName");
              rec.put("driver", "driverName");
              rec.put("begin_lat", RAND.nextDouble());
              rec.put("begin_lon", RAND.nextDouble());
              rec.put("end_lat", RAND.nextDouble());
              rec.put("end_lon", RAND.nextDouble());
              rec.put("_hoodie_is_deleted", false);
              return rec;
          }
      
          public static void main(String[] args) throws Exception {
      
              GenericRecord genericRecord = generateGenericRecord();
              byte[] serializedObject = SerializationUtils.serialize(genericRecord);
      
              List<Object> datas = new LinkedList<>();
      
              long t1 = System.currentTimeMillis();
              for (int i = 0; i < 1000; i++) {
                  datas.add(SerializationUtils.<GenericRecord>deserialize(serializedObject));
              }
              long t2 = System.currentTimeMillis();
      
              System.out.println("dese times: " + datas.size());
              System.out.println("dese cost: " + (t2 - t1) + "ms");
      
          }
      
      }

       

      Attachments

        1. image-2020-02-24-08-17-33-739.png
          362 kB
          lamber-ken
        2. image-2020-02-24-08-15-48-615.png
          624 kB
          lamber-ken
        3. image-2020-02-21-15-35-56-637.png
          56 kB
          lamber-ken
        4. image-2020-02-20-23-34-27-466.png
          58 kB
          Vinoth Chandar
        5. image-2020-02-20-23-34-24-155.png
          26 kB
          Vinoth Chandar

        Activity

          People

            lamber-ken lamber-ken
            vinoth Vinoth Chandar
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m