Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
Flink-1.0
-
None
Description
{{ /**
- This method calls the InfluxDB write API whenever the element list reaches the
{@link
* #bufferSize}
. It keeps track of the latest timestamp of each element. It compares the latest
- timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
* - @param in incoming data
- @param context current Flink context
- @see org.apache.flink.api.connector.sink.SinkWriter.Context
*/
@Override
public void write(final IN in, final Context context) throws IOException {
if (this.elements.size() == this.bufferSize) { LOG.debug("Buffer size reached preparing to write the elements."); this.writeCurrentElements(); this.elements.clear(); }else {
{ this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp()); }
LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
this.elements.add(this.schemaSerializer.serialize(in, context));
if (context.timestamp() != null)}
}}}
The bug is in this write method. If the number of elements in the buffer is less than the configured buffer size, the current element is added to the buffer. If the number of elements in the buffer is equal to the buffer size, the buffer is flushed and the current element is not added to the next buffer. This results in the current element being dropped.
Attachments
Issue Links
- links to