Skip to content

Commit 4edd904

Browse files
authored
refactor: reorganize shuffle crate module structure (#3772)
1 parent c3dd3a4 commit 4edd904

20 files changed

Lines changed: 422 additions & 330 deletions

File tree

native/core/src/execution/operators/shuffle_scan.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
use crate::{
1919
errors::CometError,
2020
execution::{
21-
operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID,
22-
shuffle::codec::read_ipc_compressed,
21+
operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, shuffle::ipc::read_ipc_compressed,
2322
},
2423
jvm_bridge::{jni_call, JVMClasses},
2524
};
@@ -352,15 +351,15 @@ impl RecordBatchStream for ShuffleScanStream {
352351

353352
#[cfg(test)]
354353
mod tests {
355-
use crate::execution::shuffle::codec::{CompressionCodec, ShuffleBlockWriter};
354+
use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
356355
use arrow::array::{Int32Array, StringArray};
357356
use arrow::datatypes::{DataType, Field, Schema};
358357
use arrow::record_batch::RecordBatch;
359358
use datafusion::physical_plan::metrics::Time;
360359
use std::io::Cursor;
361360
use std::sync::Arc;
362361

363-
use crate::execution::shuffle::codec::read_ipc_compressed;
362+
use crate::execution::shuffle::ipc::read_ipc_compressed;
364363

365364
#[test]
366365
#[cfg_attr(miri, ignore)] // Miri cannot call FFI functions (zstd)

native/shuffle/benches/row_columnar.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
2424
use arrow::datatypes::{DataType as ArrowDataType, Field, Fields};
2525
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
26-
use datafusion_comet_shuffle::spark_unsafe::row::{
27-
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
28-
};
26+
use datafusion_comet_shuffle::spark_unsafe::row::{process_sorted_row_partition, SparkUnsafeRow};
27+
use datafusion_comet_shuffle::spark_unsafe::unsafe_object::SparkUnsafeObject;
2928
use datafusion_comet_shuffle::CompressionCodec;
3029
use std::sync::Arc;
3130
use tempfile::Builder;

native/shuffle/src/comet_partitioning.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::row::{OwnedRow, RowConverter};
1919
use datafusion::physical_expr::{LexOrdering, PhysicalExpr};
2020
use std::sync::Arc;
2121

22+
/// Partitioning scheme for distributing rows across shuffle output partitions.
2223
#[derive(Debug, Clone)]
2324
pub enum CometPartitioning {
2425
SinglePartition,

native/shuffle/src/ipc.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::RecordBatch;
19+
use arrow::ipc::reader::StreamReader;
20+
use datafusion::common::DataFusionError;
21+
use datafusion::error::Result;
22+
23+
pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> {
24+
match &bytes[0..4] {
25+
b"SNAP" => {
26+
let decoder = snap::read::FrameDecoder::new(&bytes[4..]);
27+
let mut reader =
28+
unsafe { StreamReader::try_new(decoder, None)?.with_skip_validation(true) };
29+
reader.next().unwrap().map_err(|e| e.into())
30+
}
31+
b"LZ4_" => {
32+
let decoder = lz4_flex::frame::FrameDecoder::new(&bytes[4..]);
33+
let mut reader =
34+
unsafe { StreamReader::try_new(decoder, None)?.with_skip_validation(true) };
35+
reader.next().unwrap().map_err(|e| e.into())
36+
}
37+
b"ZSTD" => {
38+
let decoder = zstd::Decoder::new(&bytes[4..])?;
39+
let mut reader =
40+
unsafe { StreamReader::try_new(decoder, None)?.with_skip_validation(true) };
41+
reader.next().unwrap().map_err(|e| e.into())
42+
}
43+
b"NONE" => {
44+
let mut reader =
45+
unsafe { StreamReader::try_new(&bytes[4..], None)?.with_skip_validation(true) };
46+
reader.next().unwrap().map_err(|e| e.into())
47+
}
48+
other => Err(DataFusionError::Execution(format!(
49+
"Failed to decode batch: invalid compression codec: {other:?}"
50+
))),
51+
}
52+
}

native/shuffle/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
pub mod codec;
1918
pub(crate) mod comet_partitioning;
19+
pub mod ipc;
2020
pub(crate) mod metrics;
2121
pub(crate) mod partitioners;
2222
mod shuffle_writer;
2323
pub mod spark_unsafe;
2424
pub(crate) mod writers;
2525

26-
pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
2726
pub use comet_partitioning::CometPartitioning;
27+
pub use ipc::read_ipc_compressed;
2828
pub use shuffle_writer::ShuffleWriterExec;
29+
pub use writers::{CompressionCodec, ShuffleBlockWriter};

native/shuffle/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion::physical_plan::metrics::{
1919
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
2020
};
2121

22+
/// Execution metrics for a shuffle partition operation.
2223
pub(crate) struct ShufflePartitionerMetrics {
2324
/// metrics
2425
pub(crate) baseline: BaselineMetrics,

native/shuffle/src/partitioners/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,9 @@
1818
mod multi_partition;
1919
mod partitioned_batch_iterator;
2020
mod single_partition;
21-
22-
use arrow::record_batch::RecordBatch;
23-
use datafusion::common::Result;
21+
mod traits;
2422

2523
pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner;
2624
pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator;
2725
pub(crate) use single_partition::SinglePartitionShufflePartitioner;
28-
29-
#[async_trait::async_trait]
30-
pub(crate) trait ShufflePartitioner: Send + Sync {
31-
/// Insert a batch into the partitioner
32-
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
33-
/// Write shuffle data and shuffle index file to disk
34-
fn shuffle_write(&mut self) -> Result<()>;
35-
}
26+
pub(crate) use traits::ShufflePartitioner;

native/shuffle/src/partitioners/multi_partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use std::io::{BufReader, BufWriter, Seek, Write};
3939
use std::sync::Arc;
4040
use tokio::time::Instant;
4141

42+
/// Reusable scratch buffers for computing row-to-partition assignments.
4243
#[derive(Default)]
4344
struct ScratchSpace {
4445
/// Hashes for each row in the current batch.

native/shuffle/src/partitioners/partitioned_batch_iterator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl PartitionedBatchesProducer {
5050
}
5151
}
5252

53+
/// Iterates over the shuffled record batches belonging to a single output partition.
5354
pub(crate) struct PartitionedBatchIterator<'a> {
5455
record_batches: Vec<&'a RecordBatch>,
5556
batch_size: usize,
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::record_batch::RecordBatch;
19+
use datafusion::common::Result;
20+
21+
#[async_trait::async_trait]
22+
pub(crate) trait ShufflePartitioner: Send + Sync {
23+
/// Insert a batch into the partitioner
24+
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
25+
/// Write shuffle data and shuffle index file to disk
26+
fn shuffle_write(&mut self) -> Result<()>;
27+
}

0 commit comments

Comments
 (0)