Uploaded image for project: 'Bahir (Retired)'
  1. Bahir (Retired)
  2. BAHIR-303

AbstractSingleOperationMapper.createOperations method error with IllegalArgumentException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • Flink-1.0
    • Not Applicable
    • None
    • centos7
      flink1.13

    Description

      when I use flink  write data (from kafka and hive) to kudu , sometime (not every row) happens some error like :

      java.lang.IllegalArgumentException: record_time cannot be set to null
          at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
          at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
          at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
          at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
          at org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
          at org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)

       

      Add code  to AbstractSingleOperationMapper class can fix it , I don't know why ?  Please see this promble , thx . 

      The code like :

      @Override
          public List<Operation> createOperations(T input, KuduTable table) {
              Optional<Operation> operationOpt = this.createBaseOperation(input, table);
              if (!operationOpt.isPresent())

      {             return Collections.emptyList();         }

      else {
                  Operation operation = (Operation)operationOpt.get();
                  PartialRow partialRow = operation.getRow();

                  for(int i = 0; i < this.columnNames.length; ++i) {
                      Object field = this.getField(input, i);
                      if (field instanceof LazyBinaryFormat)

      {                     field = ((LazyBinaryFormat)field).getJavaObject();                 }

                      if (field instanceof TimestampData)

      {                     field = ((TimestampData)field).toTimestamp();                 }

                      if (field == null && !partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){

                          String tmp = input.toString();    // this code let field not null 
                        
                          field = this.getField(input, i);

                          if (field instanceof LazyBinaryFormat) {
                              field = ((LazyBinaryFormat)field).getJavaObject();
                          }

                          if (field instanceof TimestampData) {
                              field = ((TimestampData)field).toTimestamp();
                          }
                      }

                      partialRow.addObject(this.columnNames[i], field);
                  }

                  return Collections.singletonList(operation);
              }
          }

      Attachment include the full error msg .

      Attachments

        1. error.log
          5 kB
          chen

        Activity

          People

            Unassigned Unassigned
            haodan chen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: