Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
10.0.0
Description
Dictionaries do not seem to be updated correctly when sending a record on an IPC stream.
The following example creates a 1st record with a single field named "field" and initialized with the value "value_0. This record is then serialized with an ipc writer and deserialized with an ipc reader.
A second record is then created with the value "value_1". After serialization and deserialization, the expected value for the field is "value_1" but I get "value_0".
Based on a quick analysis via the debugger, I suspect an error in combining the dictionary from step 1 with the dictionary from step 2. The resulting dictionary contains the concatenation of the two dictionaries (i.e. value_0value_1), but the offsets values used to read the field (of the second record) refer "value_0". It may be that the offset arrays are not correctly combined or something like that when the second record is received.
Below a code snippet to reproduce the issue.
// NOTE: Release methods are not managed in this test for simplicity. func TestDictionary(t *testing.T) { pool := memory.NewGoAllocator() // A schema with a single dictionary field schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ IndexType: arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String, Ordered: false, }}}, nil) // IPC writer and reader var bufWriter bytes.Buffer ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema)) bufReader := bytes.NewReader([]byte{}) var ipcReader *ipc.Reader // Create a first record with field = "value_0" record := CreateRecord(t, pool, schema, 0) expectedJson, err := record.MarshalJSON() require.NoError(t, err) // Serialize and deserialize the record via an IPC stream json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) require.NoError(t, err) // Compare the expected JSON with the actual JSON require.JSONEq(t, string(expectedJson), string(json)) // Create a second record with field = "value_1" record = CreateRecord(t, pool, schema, 1) expectedJson, err = record.MarshalJSON() require.NoError(t, err) // Serialize and deserialize the record via an IPC stream json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) require.NoError(t, err) // Compare the expected JSON with the actual JSON // field = "value_0" but should be "value_1" require.JSONEq(t, string(expectedJson), string(json)) } // Create a record with a single field. // The value of field `field` depends on the value passed in parameter. func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, value int) arrow.Record { rb := array.NewRecordBuilder(pool, schema) fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder) err := fieldB.AppendString(fmt.Sprintf("value_%d", value)) if err != nil { t.Fatal(err) } return rb.NewRecord() } // Encode and decode a record over a tuple of IPC writer and reader. // IPC writer and reader are the same from one call to another. func EncodeDecodeIpcStream(t *testing.T, record arrow.Record, bufWriter *bytes.Buffer, ipcWriter *ipc.Writer, bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) { // Serialize the record via an ipc writer if err := ipcWriter.Write(record); err != nil { return nil, ipcReader, err } serializedRecord := bufWriter.Bytes() bufWriter.Reset() // Deserialize the record via an ipc reader bufReader.Reset(serializedRecord) if ipcReader == nil { newIpcReader, err := ipc.NewReader(bufReader) if err != nil { return nil, newIpcReader, err } ipcReader = newIpcReader } ipcReader.Next() record = ipcReader.Record() // Return the decoded record as a json string json, err := record.MarshalJSON() if err != nil { return nil, ipcReader, err } return json, ipcReader, nil }
Attachments
Issue Links
- links to