Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
Flink-1.0
-
None
-
centos7
flink1.13
Description
when I use flink write data (from kafka and hive) to kudu , sometime (not every row) happens some error like :
java.lang.IllegalArgumentException: record_time cannot be set to null
at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
at org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
at org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)
Add code to AbstractSingleOperationMapper class can fix it , I don't know why ? Please see this promble , thx .
The code like :
@Override
public List<Operation> createOperations(T input, KuduTable table) {
Optional<Operation> operationOpt = this.createBaseOperation(input, table);
if (!operationOpt.isPresent())
else {
Operation operation = (Operation)operationOpt.get();
PartialRow partialRow = operation.getRow();
for(int i = 0; i < this.columnNames.length; ++i) {
Object field = this.getField(input, i);
if (field instanceof LazyBinaryFormat)
if (field instanceof TimestampData)
{ field = ((TimestampData)field).toTimestamp(); }if (field == null && !partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){
String tmp = input.toString(); // this code let field not null
field = this.getField(input, i);
if (field instanceof LazyBinaryFormat) {
field = ((LazyBinaryFormat)field).getJavaObject();
}
if (field instanceof TimestampData) {
field = ((TimestampData)field).toTimestamp();
}
}
partialRow.addObject(this.columnNames[i], field);
}
return Collections.singletonList(operation);
}
}
Attachment include the full error msg .