Skip to content

Commit b1dfb69

Browse files
Fix row slice bug in Union column decoding with many columns (apache#9000)
# Which issue does this PR close? - Closes apache#8999 # Rationale for this change This PR fixes a bug in the row-to-column conversion for Union types when multiple union columns are present in the same row converter Previously, the row slice was being consumed from reading their data correctly. The fix tracks bytes consumed per row across all union fields, this way it properly advances row slices
1 parent a9d6e92 commit b1dfb69

File tree

1 file changed

+154
-3
lines changed

1 file changed

+154
-3
lines changed

arrow-row/src/lib.rs

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,12 +1901,9 @@ unsafe fn decode_column(
19011901

19021902
let child_row = &row[1..];
19031903
rows_by_field[field_idx].push((idx, child_row));
1904-
1905-
*row = &row[row.len()..];
19061904
}
19071905

19081906
let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1909-
19101907
let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
19111908

19121909
for (field_idx, converter) in converters.iter().enumerate() {
@@ -1928,6 +1925,14 @@ unsafe fn decode_column(
19281925
let child_array =
19291926
unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
19301927

1928+
// advance row slices by the bytes consumed
1929+
for ((row_idx, original_bytes), remaining_bytes) in
1930+
field_rows.iter().zip(child_data)
1931+
{
1932+
let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
1933+
rows[*row_idx] = &rows[*row_idx][consumed_length..];
1934+
}
1935+
19311936
child_arrays.push(child_array.into_iter().next().unwrap());
19321937
}
19331938
UnionMode::Sparse => {
@@ -1949,6 +1954,14 @@ unsafe fn decode_column(
19491954

19501955
let child_array =
19511956
unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
1957+
1958+
// advance row slices by the bytes consumed for rows that belong to this field
1959+
for (row_idx, child_row) in field_rows.iter() {
1960+
let remaining_len = sparse_data[*row_idx].len();
1961+
let consumed_length = 1 + child_row.len() - remaining_len;
1962+
rows[*row_idx] = &rows[*row_idx][consumed_length..];
1963+
}
1964+
19521965
child_arrays.push(child_array.into_iter().next().unwrap());
19531966
}
19541967
}
@@ -4049,6 +4062,144 @@ mod tests {
40494062
assert!(rows.row(3) < rows.row(1));
40504063
}
40514064

4065+
#[test]
4066+
fn test_row_converter_roundtrip_with_many_union_columns() {
4067+
// col 1: Union(Int32, Utf8) [67, "hello"]
4068+
let fields1 = UnionFields::try_new(
4069+
vec![0, 1],
4070+
vec![
4071+
Field::new("int", DataType::Int32, true),
4072+
Field::new("string", DataType::Utf8, true),
4073+
],
4074+
)
4075+
.unwrap();
4076+
4077+
let int_array1 = Int32Array::from(vec![Some(67), None]);
4078+
let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
4079+
let type_ids1 = vec![0i8, 1].into();
4080+
4081+
let union_array1 = UnionArray::try_new(
4082+
fields1.clone(),
4083+
type_ids1,
4084+
None,
4085+
vec![
4086+
Arc::new(int_array1) as ArrayRef,
4087+
Arc::new(string_array1) as ArrayRef,
4088+
],
4089+
)
4090+
.unwrap();
4091+
4092+
// col 2: Union(Int32, Utf8) [100, "world"]
4093+
let fields2 = UnionFields::try_new(
4094+
vec![0, 1],
4095+
vec![
4096+
Field::new("int", DataType::Int32, true),
4097+
Field::new("string", DataType::Utf8, true),
4098+
],
4099+
)
4100+
.unwrap();
4101+
4102+
let int_array2 = Int32Array::from(vec![Some(100), None]);
4103+
let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
4104+
let type_ids2 = vec![0i8, 1].into();
4105+
4106+
let union_array2 = UnionArray::try_new(
4107+
fields2.clone(),
4108+
type_ids2,
4109+
None,
4110+
vec![
4111+
Arc::new(int_array2) as ArrayRef,
4112+
Arc::new(string_array2) as ArrayRef,
4113+
],
4114+
)
4115+
.unwrap();
4116+
4117+
// create a row converter with 2 union columns
4118+
let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
4119+
let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
4120+
4121+
let sort_field1 = SortField::new(field1.data_type().clone());
4122+
let sort_field2 = SortField::new(field2.data_type().clone());
4123+
4124+
let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
4125+
4126+
let rows = converter
4127+
.convert_columns(&[
4128+
Arc::new(union_array1.clone()) as ArrayRef,
4129+
Arc::new(union_array2.clone()) as ArrayRef,
4130+
])
4131+
.unwrap();
4132+
4133+
// roundtrip
4134+
let out = converter.convert_rows(&rows).unwrap();
4135+
4136+
let [col1, col2] = out.as_slice() else {
4137+
panic!("expected 2 columns")
4138+
};
4139+
4140+
let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4141+
let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
4142+
4143+
for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
4144+
assert_eq!(expected.len(), got.len());
4145+
assert_eq!(expected.type_ids(), got.type_ids());
4146+
4147+
for i in 0..expected.len() {
4148+
assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
4149+
}
4150+
}
4151+
}
4152+
4153+
#[test]
4154+
fn test_row_converter_roundtrip_with_one_union_column() {
4155+
let fields = UnionFields::try_new(
4156+
vec![0, 1],
4157+
vec![
4158+
Field::new("int", DataType::Int32, true),
4159+
Field::new("string", DataType::Utf8, true),
4160+
],
4161+
)
4162+
.unwrap();
4163+
4164+
let int_array = Int32Array::from(vec![Some(67), None]);
4165+
let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
4166+
let type_ids = vec![0i8, 1].into();
4167+
4168+
let union_array = UnionArray::try_new(
4169+
fields.clone(),
4170+
type_ids,
4171+
None,
4172+
vec![
4173+
Arc::new(int_array) as ArrayRef,
4174+
Arc::new(string_array) as ArrayRef,
4175+
],
4176+
)
4177+
.unwrap();
4178+
4179+
let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
4180+
let sort_field = SortField::new(field.data_type().clone());
4181+
let converter = RowConverter::new(vec![sort_field]).unwrap();
4182+
4183+
let rows = converter
4184+
.convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
4185+
.unwrap();
4186+
4187+
// roundtrip
4188+
let out = converter.convert_rows(&rows).unwrap();
4189+
4190+
let [col1] = out.as_slice() else {
4191+
panic!("expected 1 column")
4192+
};
4193+
4194+
let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4195+
assert_eq!(col.len(), union_array.len());
4196+
assert_eq!(col.type_ids(), union_array.type_ids());
4197+
4198+
for i in 0..col.len() {
4199+
assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
4200+
}
4201+
}
4202+
40524203
#[test]
40534204
fn rows_size_should_count_for_capacity() {
40544205
let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();

0 commit comments

Comments
 (0)