diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 35f36ea8bc0ce..51c0e2167053e 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -27,7 +27,9 @@ use datafusion::{ file_format::file_compression_type::FileCompressionType, listing::PartitionedFile, object_store::ObjectStoreUrl, - physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource}, + physical_plan::{ + CsvSource, FileSource, FileStreamBuilder, JsonOpener, JsonSource, + }, }, error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, @@ -80,8 +82,12 @@ async fn csv_opener() -> Result<()> { .create_file_opener(object_store, &scan_config, 0)?; let mut result = vec![]; - let mut stream = - FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?; + let metrics = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&scan_config) + .with_partition(0) + .with_file_opener(opener) + .with_metrics(&metrics) + .build()?; while let Some(batch) = stream.next().await.transpose()? { result.push(batch); } @@ -137,12 +143,12 @@ async fn json_opener() -> Result<()> { .with_file(PartitionedFile::new(path.to_string(), 10)) .build(); - let mut stream = FileStream::new( - &scan_config, - 0, - Arc::new(opener), - &ExecutionPlanMetricsSet::new(), - )?; + let metrics = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&scan_config) + .with_partition(0) + .with_file_opener(Arc::new(opener)) + .with_metrics(&metrics) + .build()?; let mut result = vec![]; while let Some(batch) = stream.next().await.transpose()? { result.push(batch); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 04c8ea129d05c..8e4855afa66bb 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -49,5 +49,5 @@ pub use datafusion_datasource::file_scan_config::{ pub use datafusion_datasource::file_sink_config::*; pub use datafusion_datasource::file_stream::{ - FileOpenFuture, FileOpener, FileStream, OnError, + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, }; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 773e6b2055fa0..78da70402f93d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -21,7 +21,7 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, - file_compression_type::FileCompressionType, file_stream::FileStream, + file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, source::DataSource, statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; @@ -588,7 +588,11 @@ impl DataSource for FileScanConfig { let opener = source.create_file_opener(object_store, self, partition)?; - let stream = FileStream::new(self, partition, opener, source.metrics())?; + let stream = FileStreamBuilder::new(self) + .with_partition(partition) + .with_file_opener(opener) + .with_metrics(source.metrics()) + .build()?; Ok(Box::pin(cooperative(stream))) } diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs new file mode 100644 index 0000000000000..7f21ace92c46b --- /dev/null +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::file_scan_config::FileScanConfig; +use datafusion_common::{Result, internal_err}; +use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + +use super::{FileOpener, FileStream, FileStreamMetrics, FileStreamState, OnError}; + +/// Builder for constructing a [`FileStream`]. +pub struct FileStreamBuilder<'a> { + config: &'a FileScanConfig, + partition: Option, + file_opener: Option>, + metrics: Option<&'a ExecutionPlanMetricsSet>, + on_error: OnError, +} + +impl<'a> FileStreamBuilder<'a> { + /// Create a new builder. + pub fn new(config: &'a FileScanConfig) -> Self { + Self { + config, + partition: None, + file_opener: None, + metrics: None, + on_error: OnError::Fail, + } + } + + /// Configure the partition to scan. + pub fn with_partition(mut self, partition: usize) -> Self { + self.partition = Some(partition); + self + } + + /// Configure the [`FileOpener`] used to open files. + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } + + /// Configure the metrics set used by the stream. + pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self { + self.metrics = Some(metrics); + self + } + + /// Configure the behavior when opening or scanning a file fails. + pub fn with_on_error(mut self, on_error: OnError) -> Self { + self.on_error = on_error; + self + } + + /// Build the configured [`FileStream`]. + pub fn build(self) -> Result { + let Self { + config, + partition, + file_opener, + metrics, + on_error, + } = self; + + let Some(partition) = partition else { + return internal_err!("FileStreamBuilder missing required partition"); + }; + let Some(file_opener) = file_opener else { + return internal_err!("FileStreamBuilder missing required file_opener"); + }; + let Some(metrics) = metrics else { + return internal_err!("FileStreamBuilder missing required metrics"); + }; + let projected_schema = config.projected_schema()?; + let Some(file_group) = config.file_groups.get(partition).cloned() else { + return internal_err!( + "FileStreamBuilder invalid partition index: {partition}" + ); + }; + + Ok(FileStream { + file_iter: file_group.into_inner().into_iter().collect(), + projected_schema, + remain: config.limit, + file_opener, + state: FileStreamState::Idle, + file_stream_metrics: FileStreamMetrics::new(metrics, partition), + baseline_metrics: BaselineMetrics::new(metrics, partition), + on_error, + }) + } +} diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream/mod.rs similarity index 91% rename from datafusion/datasource/src/file_stream.rs rename to datafusion/datasource/src/file_stream/mod.rs index 8a4ec4a7f1d1a..a423552917408 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -21,6 +21,8 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. +mod builder; + use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; @@ -29,7 +31,7 @@ use std::task::{Context, Poll}; use crate::PartitionedFile; use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; -use datafusion_common::error::Result; +use datafusion_common::Result; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time, @@ -42,6 +44,8 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{FutureExt as _, Stream, StreamExt as _, ready}; +pub use builder::FileStreamBuilder; + /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { /// An iterator over input files. @@ -66,26 +70,18 @@ pub struct FileStream { impl FileStream { /// Create a new `FileStream` using the give `FileOpener` to scan underlying files + #[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")] pub fn new( config: &FileScanConfig, partition: usize, file_opener: Arc, metrics: &ExecutionPlanMetricsSet, ) -> Result { - let projected_schema = config.projected_schema()?; - - let file_group = config.file_groups[partition].clone(); - - Ok(Self { - file_iter: file_group.into_inner().into_iter().collect(), - projected_schema, - remain: config.limit, - file_opener, - state: FileStreamState::Idle, - file_stream_metrics: FileStreamMetrics::new(metrics, partition), - baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error: OnError::Fail, - }) + FileStreamBuilder::new(config) + .with_partition(partition) + .with_file_opener(file_opener) + .with_metrics(metrics) + .build() } /// Specify the behavior when an error occurs opening or scanning a file @@ -400,9 +396,9 @@ impl FileStreamMetrics { #[cfg(test)] mod tests { - use crate::PartitionedFile; - use crate::file_scan_config::FileScanConfigBuilder; + use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use crate::tests::make_partition; + use crate::{PartitionedFile, TableSchema}; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -410,7 +406,7 @@ mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; + use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; use crate::test_util::MockSource; use arrow::array::RecordBatch; use arrow::datatypes::Schema; @@ -530,7 +526,7 @@ mod tests { let on_error = self.on_error; - let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]); + let table_schema = TableSchema::new(file_schema, vec![]); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -539,10 +535,12 @@ mod tests { .with_limit(self.limit) .build(); let metrics_set = ExecutionPlanMetricsSet::new(); - let file_stream = - FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set) - .unwrap() - .with_on_error(on_error); + let file_stream = FileStreamBuilder::new(&config) + .with_partition(0) + .with_file_opener(Arc::new(self.opener)) + .with_metrics(&metrics_set) + .with_on_error(on_error) + .build()?; file_stream .collect::>() @@ -563,6 +561,23 @@ mod tests { .expect("error executing stream") } + /// Create the smallest valid file scan config for builder validation tests. + fn builder_test_config() -> FileScanConfig { + let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]); + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file(PartitionedFile::new("mock_file", 10)) + .build() + } + + /// Convenience helper to keep builder error assertions focused on the + /// specific missing or invalid input under test. + fn builder_error(builder: FileStreamBuilder<'_>) -> String { + builder.build().err().unwrap().to_string() + } + #[tokio::test] async fn on_error_opening() -> Result<()> { let batches = FileStreamTest::new() @@ -857,4 +872,36 @@ mod tests { Ok(()) } + + #[test] + fn builder_requires_partition_file_opener_and_metrics() { + let config = builder_test_config(); + + let err = builder_error(FileStreamBuilder::new(&config)); + assert!(err.contains("FileStreamBuilder missing required partition")); + + let err = builder_error(FileStreamBuilder::new(&config).with_partition(0)); + assert!(err.contains("FileStreamBuilder missing required file_opener")); + + let err = builder_error( + FileStreamBuilder::new(&config) + .with_partition(0) + .with_file_opener(Arc::new(TestOpener::default())), + ); + assert!(err.contains("FileStreamBuilder missing required metrics")); + } + + #[test] + fn builder_errors_on_invalid_partition() { + let config = builder_test_config(); + let metrics = ExecutionPlanMetricsSet::new(); + + let err = builder_error( + FileStreamBuilder::new(&config) + .with_partition(1) + .with_file_opener(Arc::new(TestOpener::default())) + .with_metrics(&metrics), + ); + assert!(err.contains("FileStreamBuilder invalid partition index: 1")); + } }