@@ -916,7 +916,6 @@ impl DataSource for FileScanConfig {
916916 /// │ │ rebuild_with_source(exact=true)
917917 /// │ │ ├─ sort files by stats within groups
918918 /// │ │ ├─ verify non-overlapping
919- /// │ │ ├─ redistribute across groups (consecutive)
920919 /// │ │ └─► keep output_ordering → SortExec removed
921920 /// │ │
922921 /// │ ├─ reversed ordering matches? ──► Inexact
@@ -1325,8 +1324,7 @@ impl FileScanConfig {
13251324 /// │ 1. Reverse file groups (if DESC matches reversed ordering) │
13261325 /// │ 2. Sort files within groups by min/max statistics │
13271326 /// │ 3. If Exact + non-overlapping: │
1328- /// │ a. Redistribute files consecutively across groups │
1329- /// │ b. Keep output_ordering → SortExec eliminated │
1327+ /// │ Keep output_ordering → SortExec eliminated │
13301328 /// │ Otherwise: clear output_ordering → SortExec stays │
13311329 /// └─────────────────────────────────────────────────────────────┘
13321330 /// ```
@@ -1340,14 +1338,6 @@ impl FileScanConfig {
13401338 /// Even when files overlap (Inexact), statistics-based ordering helps
13411339 /// TopK/LIMIT queries: reading low-value files first lets dynamic filters
13421340 /// prune high-value files earlier.
1343- ///
1344- /// # Why redistribute across groups?
1345- ///
1346- /// `split_groups_by_statistics` uses bin-packing to balance group sizes,
1347- /// which can interleave file ranges across groups. We fix this by
1348- /// assigning consecutive files to consecutive groups, so groups are
1349- /// ordered relative to each other. This preserves parallel I/O while
1350- /// ensuring SPM's merge is a cheap sequential read.
13511341 fn rebuild_with_source (
13521342 & self ,
13531343 new_file_source : Arc < dyn FileSource > ,
@@ -1414,40 +1404,28 @@ impl FileScanConfig {
14141404
14151405 if is_exact && all_non_overlapping {
14161406 // Truly exact: within-file ordering guaranteed and files are non-overlapping.
1407+ // Keep output_ordering so SortExec can be eliminated for each partition.
14171408 //
1418- // When there are multiple groups, redistribute files using consecutive
1419- // assignment so that each group remains non-overlapping AND groups are
1420- // ordered relative to each other. This enables:
1421- // - No SortExec per partition (files in each group are sorted & non-overlapping)
1422- // - SPM cheaply merges ordered streams (O(n) merge)
1423- // - Parallel I/O across partitions
1409+ // We intentionally do NOT redistribute files across groups here.
1410+ // The planning-phase bin-packing may interleave file ranges across groups:
14241411 //
1425- // Before (bin-packing may interleave):
1426- // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1
1427- // Group 1: [file_02(11-20), file_04(31-40)]
1412+ // Group 0: [f1(1-10), f3(21-30)] ← interleaved with group 1
1413+ // Group 1: [f2(11-20), f4(31-40)]
14281414 //
1429- // After (consecutive assignment):
1430- // Group 0: [file_01(1-10), file_02(11-20)] ← consecutive, ordered
1431- // Group 1: [file_03(21-30), file_04(31-40)] ← consecutive, ordered
1432- if new_config. file_groups . len ( ) > 1
1433- && let Some ( sort_order) = LexOrdering :: new ( order. iter ( ) . cloned ( ) )
1434- {
1435- let projected_schema = new_config. projected_schema ( ) ?;
1436- let projection_indices = new_config
1437- . file_source
1438- . projection ( )
1439- . as_ref ( )
1440- . and_then ( |p| ordered_column_indices_from_projection ( p) ) ;
1441- let num_groups = new_config. file_groups . len ( ) ;
1442- new_config. file_groups =
1443- Self :: redistribute_files_across_groups_by_statistics (
1444- & new_config. file_groups ,
1445- & sort_order,
1446- & projected_schema,
1447- projection_indices. as_deref ( ) ,
1448- num_groups,
1449- ) ;
1450- }
1415+ // This interleaving is actually beneficial because SPM pulls from both
1416+ // partitions concurrently, keeping parallel I/O active:
1417+ //
1418+ // SPM: pull P0 [1-10] → pull P1 [11-20] → pull P0 [21-30] → pull P1 [31-40]
1419+ // ^^^^^^^^^^^^ ^^^^^^^^^^^^
1420+ // both partitions scanning files simultaneously
1421+ //
1422+ // If we were to redistribute files consecutively:
1423+ // Group 0: [f1(1-10), f2(11-20)] ← all values < group 1
1424+ // Group 1: [f3(21-30), f4(31-40)]
1425+ //
1426+ // SPM would read ALL of group 0 first (values always smaller), then group 1.
1427+ // This degrades to single-threaded sequential I/O — the other partition
1428+ // sits idle the entire time, losing the parallelism benefit.
14511429 } else {
14521430 new_config. output_ordering = vec ! [ ] ;
14531431 }
@@ -1543,82 +1521,6 @@ impl FileScanConfig {
15431521 }
15441522 }
15451523
1546- /// Redistribute files across groups using consecutive assignment.
1547- ///
1548- /// `split_groups_by_statistics` uses bin-packing which balances group sizes
1549- /// but can interleave file ranges. This method fixes that by assigning
1550- /// consecutive sorted files to consecutive groups:
1551- ///
1552- /// ```text
1553- /// Input (bin-packed, interleaved):
1554- /// Group 0: [f1(0-9), f3(20-29)] max(f1)=9 but f3=20 > Group1.f2=10
1555- /// Group 1: [f2(10-19), f4(30-39)] groups overlap!
1556- ///
1557- /// After global sort + consecutive assignment:
1558- /// Group 0: [f1(0-9), f2(10-19)] max=19
1559- /// Group 1: [f3(20-29), f4(30-39)] min=20 > 19 ✓ groups are ordered!
1560- ///
1561- /// Resulting plan:
1562- /// SPM [col ASC] ← O(n) merge, reads group 0 then group 1
1563- /// DataSourceExec [f1, f2] ← parallel I/O, no SortExec
1564- /// DataSourceExec [f3, f4] ← parallel I/O, no SortExec
1565- /// ```
1566- ///
1567- /// Falls back to the original groups if statistics are unavailable.
1568- fn redistribute_files_across_groups_by_statistics (
1569- file_groups : & [ FileGroup ] ,
1570- sort_order : & LexOrdering ,
1571- projected_schema : & SchemaRef ,
1572- projection_indices : Option < & [ usize ] > ,
1573- num_groups : usize ,
1574- ) -> Vec < FileGroup > {
1575- if num_groups <= 1 {
1576- return file_groups. to_vec ( ) ;
1577- }
1578-
1579- // Flatten all files
1580- let all_files: Vec < _ > = file_groups. iter ( ) . flat_map ( |g| g. iter ( ) ) . collect ( ) ;
1581- if all_files. is_empty ( ) {
1582- return file_groups. to_vec ( ) ;
1583- }
1584-
1585- // Sort globally by statistics
1586- let statistics = match MinMaxStatistics :: new_from_files (
1587- sort_order,
1588- projected_schema,
1589- projection_indices,
1590- all_files. iter ( ) . copied ( ) ,
1591- ) {
1592- Ok ( stats) => stats,
1593- Err ( _) => return file_groups. to_vec ( ) ,
1594- } ;
1595-
1596- let sorted_indices = statistics. min_values_sorted ( ) ;
1597-
1598- // Assign consecutive files to groups
1599- let total = sorted_indices. len ( ) ;
1600- let base_size = total / num_groups;
1601- let remainder = total % num_groups;
1602-
1603- let mut new_groups = Vec :: with_capacity ( num_groups) ;
1604- let mut offset = 0 ;
1605- for i in 0 ..num_groups {
1606- // First `remainder` groups get one extra file
1607- let group_size = base_size + if i < remainder { 1 } else { 0 } ;
1608- if group_size == 0 {
1609- continue ;
1610- }
1611- let group: FileGroup = sorted_indices[ offset..offset + group_size]
1612- . iter ( )
1613- . map ( |( idx, _) | all_files[ * idx] . clone ( ) )
1614- . collect ( ) ;
1615- new_groups. push ( group) ;
1616- offset += group_size;
1617- }
1618-
1619- new_groups
1620- }
1621-
16221524 /// Last-resort optimization when FileSource returns `Unsupported`.
16231525 ///
16241526 /// Even without within-file ordering guarantees, reordering files by
@@ -3415,9 +3317,12 @@ mod tests {
34153317 }
34163318
34173319 #[ test]
3418- fn sort_pushdown_exact_multi_group_redistributes_consecutively ( ) -> Result < ( ) > {
3419- // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups
3420- // → files should be redistributed so groups are consecutive and ordered
3320+ fn sort_pushdown_exact_multi_group_preserves_parallelism ( ) -> Result < ( ) > {
3321+ // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups.
3322+ // Groups should NOT be redistributed — interleaved groups allow SPM to
3323+ // pull from both partitions concurrently, keeping parallel I/O active.
3324+ // Redistributing consecutively would make SPM read one partition at a
3325+ // time (all values in group 0 < group 1), degrading to single-threaded I/O.
34213326 let file_schema =
34223327 Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Float64 , false ) ] ) ) ;
34233328 let table_schema = TableSchema :: new ( Arc :: clone ( & file_schema) , vec ! [ ] ) ;
@@ -3456,78 +3361,23 @@ mod tests {
34563361 . downcast_ref :: < FileScanConfig > ( )
34573362 . expect ( "Expected FileScanConfig" ) ;
34583363
3459- // Should still have 2 groups (preserving parallelism)
3364+ // 2 groups preserved ( parallelism maintained )
34603365 assert_eq ! ( pushed_config. file_groups. len( ) , 2 ) ;
34613366
3462- // Group 0 should have consecutive files [file_01, file_02]
3367+ // Files within each group are sorted by stats, but groups are NOT
3368+ // redistributed — interleaved assignment from bin-packing is kept
34633369 let files0 = pushed_config. file_groups [ 0 ] . files ( ) ;
34643370 assert_eq ! ( files0[ 0 ] . object_meta. location. as_ref( ) , "file_01" ) ;
3465- assert_eq ! ( files0[ 1 ] . object_meta. location. as_ref( ) , "file_02" ) ;
3466-
3467- // Group 1 should have consecutive files [file_03, file_04]
3371+ assert_eq ! ( files0[ 1 ] . object_meta. location. as_ref( ) , "file_03" ) ;
34683372 let files1 = pushed_config. file_groups [ 1 ] . files ( ) ;
3469- assert_eq ! ( files1[ 0 ] . object_meta. location. as_ref( ) , "file_03 " ) ;
3373+ assert_eq ! ( files1[ 0 ] . object_meta. location. as_ref( ) , "file_02 " ) ;
34703374 assert_eq ! ( files1[ 1 ] . object_meta. location. as_ref( ) , "file_04" ) ;
34713375
3472- // output_ordering preserved
3376+ // output_ordering preserved (Exact, each group internally non-overlapping)
34733377 assert ! ( !pushed_config. output_ordering. is_empty( ) ) ;
34743378 Ok ( ( ) )
34753379 }
34763380
3477- #[ test]
3478- fn sort_pushdown_exact_multi_group_uneven_distribution ( ) -> Result < ( ) > {
3479- // 5 files across 2 groups → group 0 gets 3 files, group 1 gets 2
3480- let file_schema =
3481- Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Float64 , false ) ] ) ) ;
3482- let table_schema = TableSchema :: new ( Arc :: clone ( & file_schema) , vec ! [ ] ) ;
3483- let file_source = Arc :: new ( ExactSortPushdownSource :: new ( table_schema) ) ;
3484-
3485- let sort_expr = PhysicalSortExpr :: new_default ( Arc :: new ( Column :: new ( "a" , 0 ) ) ) ;
3486-
3487- let file_groups = vec ! [
3488- FileGroup :: new( vec![
3489- make_file_with_stats( "f1" , 0.0 , 9.0 ) ,
3490- make_file_with_stats( "f3" , 20.0 , 29.0 ) ,
3491- make_file_with_stats( "f5" , 40.0 , 49.0 ) ,
3492- ] ) ,
3493- FileGroup :: new( vec![
3494- make_file_with_stats( "f2" , 10.0 , 19.0 ) ,
3495- make_file_with_stats( "f4" , 30.0 , 39.0 ) ,
3496- ] ) ,
3497- ] ;
3498-
3499- let config =
3500- FileScanConfigBuilder :: new ( ObjectStoreUrl :: local_filesystem ( ) , file_source)
3501- . with_file_groups ( file_groups)
3502- . with_output_ordering ( vec ! [
3503- LexOrdering :: new( vec![ sort_expr. clone( ) ] ) . unwrap( ) ,
3504- ] )
3505- . build ( ) ;
3506-
3507- let result = config. try_pushdown_sort ( & [ sort_expr] ) ?;
3508- let SortOrderPushdownResult :: Exact { inner } = result else {
3509- panic ! ( "Expected Exact result, got {result:?}" ) ;
3510- } ;
3511- let pushed_config = inner
3512- . as_any ( )
3513- . downcast_ref :: < FileScanConfig > ( )
3514- . expect ( "Expected FileScanConfig" ) ;
3515-
3516- assert_eq ! ( pushed_config. file_groups. len( ) , 2 ) ;
3517- // Group 0: 3 files (5/2 = 2 base + 1 remainder)
3518- let files0 = pushed_config. file_groups [ 0 ] . files ( ) ;
3519- assert_eq ! ( files0. len( ) , 3 ) ;
3520- assert_eq ! ( files0[ 0 ] . object_meta. location. as_ref( ) , "f1" ) ;
3521- assert_eq ! ( files0[ 1 ] . object_meta. location. as_ref( ) , "f2" ) ;
3522- assert_eq ! ( files0[ 2 ] . object_meta. location. as_ref( ) , "f3" ) ;
3523- // Group 1: 2 files
3524- let files1 = pushed_config. file_groups [ 1 ] . files ( ) ;
3525- assert_eq ! ( files1. len( ) , 2 ) ;
3526- assert_eq ! ( files1[ 0 ] . object_meta. location. as_ref( ) , "f4" ) ;
3527- assert_eq ! ( files1[ 1 ] . object_meta. location. as_ref( ) , "f5" ) ;
3528- Ok ( ( ) )
3529- }
3530-
35313381 #[ test]
35323382 fn sort_pushdown_reverse_preserves_file_order_with_stats ( ) -> Result < ( ) > {
35333383 // Reverse scan should reverse file order but NOT apply statistics-based
0 commit comments