Skip to content

Commit 3fa3f20

Browse files
zhuqi-lucasclaude
andcommitted
remove redistribute_files_across_groups_by_statistics
Redistributing files consecutively across groups makes SPM read all of partition 0 before starting partition 1 (since all values in group 0 < group 1), degrading multi-partition execution to single-threaded sequential I/O. The interleaved assignment from planning-phase bin-packing is actually beneficial: SPM alternates pulling from both partitions, keeping parallel I/O active on both simultaneously. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
1 parent 5f78936 commit 3fa3f20

2 files changed

Lines changed: 36 additions & 186 deletions

File tree

datafusion/datasource/src/file_scan_config.rs

Lines changed: 32 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,6 @@ impl DataSource for FileScanConfig {
916916
/// │ │ rebuild_with_source(exact=true)
917917
/// │ │ ├─ sort files by stats within groups
918918
/// │ │ ├─ verify non-overlapping
919-
/// │ │ ├─ redistribute across groups (consecutive)
920919
/// │ │ └─► keep output_ordering → SortExec removed
921920
/// │ │
922921
/// │ ├─ reversed ordering matches? ──► Inexact
@@ -1325,8 +1324,7 @@ impl FileScanConfig {
13251324
/// │ 1. Reverse file groups (if DESC matches reversed ordering) │
13261325
/// │ 2. Sort files within groups by min/max statistics │
13271326
/// │ 3. If Exact + non-overlapping: │
1328-
/// │ a. Redistribute files consecutively across groups │
1329-
/// │ b. Keep output_ordering → SortExec eliminated │
1327+
/// │ Keep output_ordering → SortExec eliminated │
13301328
/// │ Otherwise: clear output_ordering → SortExec stays │
13311329
/// └─────────────────────────────────────────────────────────────┘
13321330
/// ```
@@ -1340,14 +1338,6 @@ impl FileScanConfig {
13401338
/// Even when files overlap (Inexact), statistics-based ordering helps
13411339
/// TopK/LIMIT queries: reading low-value files first lets dynamic filters
13421340
/// prune high-value files earlier.
1343-
///
1344-
/// # Why redistribute across groups?
1345-
///
1346-
/// `split_groups_by_statistics` uses bin-packing to balance group sizes,
1347-
/// which can interleave file ranges across groups. We fix this by
1348-
/// assigning consecutive files to consecutive groups, so groups are
1349-
/// ordered relative to each other. This preserves parallel I/O while
1350-
/// ensuring SPM's merge is a cheap sequential read.
13511341
fn rebuild_with_source(
13521342
&self,
13531343
new_file_source: Arc<dyn FileSource>,
@@ -1414,40 +1404,28 @@ impl FileScanConfig {
14141404

14151405
if is_exact && all_non_overlapping {
14161406
// Truly exact: within-file ordering guaranteed and files are non-overlapping.
1407+
// Keep output_ordering so SortExec can be eliminated for each partition.
14171408
//
1418-
// When there are multiple groups, redistribute files using consecutive
1419-
// assignment so that each group remains non-overlapping AND groups are
1420-
// ordered relative to each other. This enables:
1421-
// - No SortExec per partition (files in each group are sorted & non-overlapping)
1422-
// - SPM cheaply merges ordered streams (O(n) merge)
1423-
// - Parallel I/O across partitions
1409+
// We intentionally do NOT redistribute files across groups here.
1410+
// The planning-phase bin-packing may interleave file ranges across groups:
14241411
//
1425-
// Before (bin-packing may interleave):
1426-
// Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1
1427-
// Group 1: [file_02(11-20), file_04(31-40)]
1412+
// Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
1413+
// Group 1: [f2(11-20), f4(31-40)]
14281414
//
1429-
// After (consecutive assignment):
1430-
// Group 0: [file_01(1-10), file_02(11-20)] ← consecutive, ordered
1431-
// Group 1: [file_03(21-30), file_04(31-40)] ← consecutive, ordered
1432-
if new_config.file_groups.len() > 1
1433-
&& let Some(sort_order) = LexOrdering::new(order.iter().cloned())
1434-
{
1435-
let projected_schema = new_config.projected_schema()?;
1436-
let projection_indices = new_config
1437-
.file_source
1438-
.projection()
1439-
.as_ref()
1440-
.and_then(|p| ordered_column_indices_from_projection(p));
1441-
let num_groups = new_config.file_groups.len();
1442-
new_config.file_groups =
1443-
Self::redistribute_files_across_groups_by_statistics(
1444-
&new_config.file_groups,
1445-
&sort_order,
1446-
&projected_schema,
1447-
projection_indices.as_deref(),
1448-
num_groups,
1449-
);
1450-
}
1415+
// This interleaving is actually beneficial because SPM pulls from both
1416+
// partitions concurrently, keeping parallel I/O active:
1417+
//
1418+
// SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40]
1419+
// ^^^^^^^^^^^^ ^^^^^^^^^^^^
1420+
// both partitions scanning files simultaneously
1421+
//
1422+
// If we were to redistribute files consecutively:
1423+
// Group 0: [f1(1-10), f2(11-20)] ← all values < group 1
1424+
// Group 1: [f3(21-30), f4(31-40)]
1425+
//
1426+
// SPM would read ALL of group 0 first (values always smaller), then group 1.
1427+
// This degrades to single-threaded sequential I/O — the other partition
1428+
// sits idle the entire time, losing the parallelism benefit.
14511429
} else {
14521430
new_config.output_ordering = vec![];
14531431
}
@@ -1543,82 +1521,6 @@ impl FileScanConfig {
15431521
}
15441522
}
15451523

1546-
/// Redistribute files across groups using consecutive assignment.
1547-
///
1548-
/// `split_groups_by_statistics` uses bin-packing which balances group sizes
1549-
/// but can interleave file ranges. This method fixes that by assigning
1550-
/// consecutive sorted files to consecutive groups:
1551-
///
1552-
/// ```text
1553-
/// Input (bin-packed, interleaved):
1554-
/// Group 0: [f1(0-9), f3(20-29)] max(f1)=9 but f3=20 > Group1.f2=10
1555-
/// Group 1: [f2(10-19), f4(30-39)] groups overlap!
1556-
///
1557-
/// After global sort + consecutive assignment:
1558-
/// Group 0: [f1(0-9), f2(10-19)] max=19
1559-
/// Group 1: [f3(20-29), f4(30-39)] min=20 > 19 ✓ groups are ordered!
1560-
///
1561-
/// Resulting plan:
1562-
/// SPM [col ASC] ← O(n) merge, reads group 0 then group 1
1563-
/// DataSourceExec [f1, f2] ← parallel I/O, no SortExec
1564-
/// DataSourceExec [f3, f4] ← parallel I/O, no SortExec
1565-
/// ```
1566-
///
1567-
/// Falls back to the original groups if statistics are unavailable.
1568-
fn redistribute_files_across_groups_by_statistics(
1569-
file_groups: &[FileGroup],
1570-
sort_order: &LexOrdering,
1571-
projected_schema: &SchemaRef,
1572-
projection_indices: Option<&[usize]>,
1573-
num_groups: usize,
1574-
) -> Vec<FileGroup> {
1575-
if num_groups <= 1 {
1576-
return file_groups.to_vec();
1577-
}
1578-
1579-
// Flatten all files
1580-
let all_files: Vec<_> = file_groups.iter().flat_map(|g| g.iter()).collect();
1581-
if all_files.is_empty() {
1582-
return file_groups.to_vec();
1583-
}
1584-
1585-
// Sort globally by statistics
1586-
let statistics = match MinMaxStatistics::new_from_files(
1587-
sort_order,
1588-
projected_schema,
1589-
projection_indices,
1590-
all_files.iter().copied(),
1591-
) {
1592-
Ok(stats) => stats,
1593-
Err(_) => return file_groups.to_vec(),
1594-
};
1595-
1596-
let sorted_indices = statistics.min_values_sorted();
1597-
1598-
// Assign consecutive files to groups
1599-
let total = sorted_indices.len();
1600-
let base_size = total / num_groups;
1601-
let remainder = total % num_groups;
1602-
1603-
let mut new_groups = Vec::with_capacity(num_groups);
1604-
let mut offset = 0;
1605-
for i in 0..num_groups {
1606-
// First `remainder` groups get one extra file
1607-
let group_size = base_size + if i < remainder { 1 } else { 0 };
1608-
if group_size == 0 {
1609-
continue;
1610-
}
1611-
let group: FileGroup = sorted_indices[offset..offset + group_size]
1612-
.iter()
1613-
.map(|(idx, _)| all_files[*idx].clone())
1614-
.collect();
1615-
new_groups.push(group);
1616-
offset += group_size;
1617-
}
1618-
1619-
new_groups
1620-
}
1621-
16221524
/// Last-resort optimization when FileSource returns `Unsupported`.
16231525
///
16241526
/// Even without within-file ordering guarantees, reordering files by
@@ -3415,9 +3317,12 @@ mod tests {
34153317
}
34163318

34173319
#[test]
3418-
fn sort_pushdown_exact_multi_group_redistributes_consecutively() -> Result<()> {
3419-
// ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups
3420-
// → files should be redistributed so groups are consecutive and ordered
3320+
fn sort_pushdown_exact_multi_group_preserves_parallelism() -> Result<()> {
3321+
// ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups.
3322+
// Groups should NOT be redistributed — interleaved groups allow SPM to
3323+
// pull from both partitions concurrently, keeping parallel I/O active.
3324+
// Redistributing consecutively would make SPM read one partition at a
3325+
// time (all values in group 0 < group 1), degrading to single-threaded I/O.
34213326
let file_schema =
34223327
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
34233328
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
@@ -3456,78 +3361,23 @@ mod tests {
34563361
.downcast_ref::<FileScanConfig>()
34573362
.expect("Expected FileScanConfig");
34583363

3459-
// Should still have 2 groups (preserving parallelism)
3364+
// 2 groups preserved (parallelism maintained)
34603365
assert_eq!(pushed_config.file_groups.len(), 2);
34613366

3462-
// Group 0 should have consecutive files [file_01, file_02]
3367+
// Files within each group are sorted by stats, but groups are NOT
3368+
// redistributed — interleaved assignment from bin-packing is kept
34633369
let files0 = pushed_config.file_groups[0].files();
34643370
assert_eq!(files0[0].object_meta.location.as_ref(), "file_01");
3465-
assert_eq!(files0[1].object_meta.location.as_ref(), "file_02");
3466-
3467-
// Group 1 should have consecutive files [file_03, file_04]
3371+
assert_eq!(files0[1].object_meta.location.as_ref(), "file_03");
34683372
let files1 = pushed_config.file_groups[1].files();
3469-
assert_eq!(files1[0].object_meta.location.as_ref(), "file_03");
3373+
assert_eq!(files1[0].object_meta.location.as_ref(), "file_02");
34703374
assert_eq!(files1[1].object_meta.location.as_ref(), "file_04");
34713375

3472-
// output_ordering preserved
3376+
// output_ordering preserved (Exact, each group internally non-overlapping)
34733377
assert!(!pushed_config.output_ordering.is_empty());
34743378
Ok(())
34753379
}
34763380

3477-
#[test]
3478-
fn sort_pushdown_exact_multi_group_uneven_distribution() -> Result<()> {
3479-
// 5 files across 2 groups → group 0 gets 3 files, group 1 gets 2
3480-
let file_schema =
3481-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
3482-
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3483-
let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
3484-
3485-
let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3486-
3487-
let file_groups = vec![
3488-
FileGroup::new(vec![
3489-
make_file_with_stats("f1", 0.0, 9.0),
3490-
make_file_with_stats("f3", 20.0, 29.0),
3491-
make_file_with_stats("f5", 40.0, 49.0),
3492-
]),
3493-
FileGroup::new(vec![
3494-
make_file_with_stats("f2", 10.0, 19.0),
3495-
make_file_with_stats("f4", 30.0, 39.0),
3496-
]),
3497-
];
3498-
3499-
let config =
3500-
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3501-
.with_file_groups(file_groups)
3502-
.with_output_ordering(vec![
3503-
LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3504-
])
3505-
.build();
3506-
3507-
let result = config.try_pushdown_sort(&[sort_expr])?;
3508-
let SortOrderPushdownResult::Exact { inner } = result else {
3509-
panic!("Expected Exact result, got {result:?}");
3510-
};
3511-
let pushed_config = inner
3512-
.as_any()
3513-
.downcast_ref::<FileScanConfig>()
3514-
.expect("Expected FileScanConfig");
3515-
3516-
assert_eq!(pushed_config.file_groups.len(), 2);
3517-
// Group 0: 3 files (5/2 = 2 base + 1 remainder)
3518-
let files0 = pushed_config.file_groups[0].files();
3519-
assert_eq!(files0.len(), 3);
3520-
assert_eq!(files0[0].object_meta.location.as_ref(), "f1");
3521-
assert_eq!(files0[1].object_meta.location.as_ref(), "f2");
3522-
assert_eq!(files0[2].object_meta.location.as_ref(), "f3");
3523-
// Group 1: 2 files
3524-
let files1 = pushed_config.file_groups[1].files();
3525-
assert_eq!(files1.len(), 2);
3526-
assert_eq!(files1[0].object_meta.location.as_ref(), "f4");
3527-
assert_eq!(files1[1].object_meta.location.as_ref(), "f5");
3528-
Ok(())
3529-
}
3530-
35313381
#[test]
35323382
fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> {
35333383
// Reverse scan should reverse file order but NOT apply statistics-based

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ logical_plan
11091109
02)--TableScan: reversed_parquet projection=[id, value]
11101110
physical_plan
11111111
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
1112-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet
1112+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet
11131113

11141114
# Test 4.2: Results must be correct
11151115
query II
@@ -1193,7 +1193,7 @@ logical_plan
11931193
02)--TableScan: reversed_with_order_parquet projection=[id, value]
11941194
physical_plan
11951195
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
1196-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet]]}, projection=[id, value], file_type=parquet
1196+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], file_type=parquet
11971197

11981198
# Test 6.2: Results must be correct
11991199
query II
@@ -1332,7 +1332,7 @@ logical_plan
13321332
02)--TableScan: desc_reversed_parquet projection=[id, value]
13331333
physical_plan
13341334
01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false]
1335-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet]]}, projection=[id, value], file_type=parquet
1335+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], file_type=parquet
13361336

13371337
# Test 8.2: Results must be correct
13381338
query II
@@ -1775,7 +1775,7 @@ logical_plan
17751775
02)--TableScan: tb_overlap projection=[id, value]
17761776
physical_plan
17771777
01)SortExec: TopK(fetch=5), expr=[id@0 DESC, value@1 DESC], preserve_partitioning=[false]
1778-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ]
1778+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_z.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_y.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tb_overlap/file_x.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ]
17791779

17801780
query II
17811781
SELECT * FROM tb_overlap ORDER BY id DESC, value DESC LIMIT 5;

0 commit comments

Comments
 (0)