From 8a30d87a8f8caab4f54d48f11f07bdf5974fc651 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 19:51:37 -0500 Subject: [PATCH 1/7] feat(table): add opt-in pyiceberg-core arrow reader --- pyiceberg/io/pyiceberg_core.py | 37 +++++++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 32 ++++++++++++++++++++++++++-- tests/io/test_pyiceberg_core.py | 8 +++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index f49bfb48da..f060056196 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -146,6 +146,16 @@ def _read_field_ids( return ids +def can_read_projected_schema_with_pyiceberg_core( + schema: Schema, + projected_schema: Schema, + row_filter: BooleanExpression, + case_sensitive: bool, +) -> bool: + """Return whether pyiceberg-core can read exactly the requested projection for this filter.""" + return _expression_field_ids(row_filter, schema, case_sensitive).issubset(projected_schema.field_ids) + + _UNARY_METHODS: dict[type[BooleanExpression], str] = { IsNull: "is_null", NotNull: "is_not_null", @@ -291,3 +301,30 @@ def file_scan_task_to_pyiceberg_core( name_mapping=_model_json(name_mapping) if name_mapping is not None else None, case_sensitive=case_sensitive, ) + + +def arrow_batch_reader_from_pyiceberg_core( + file_io: FileIO, + tasks: Iterable[FileScanTask], + schema: Schema, + projected_schema: Schema, + partition_specs: dict[int, PartitionSpec], + name_mapping: NameMapping | None, + case_sensitive: bool = True, +) -> Any: + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader.""" + core_tasks = [ + file_scan_task_to_pyiceberg_core( + task, + schema, + projected_schema, + partition_spec=partition_specs.get(task.file.spec_id), + name_mapping=name_mapping, + case_sensitive=case_sensitive, + project_field_ids=list(projected_schema.field_ids), + ) + for task in tasks + ] + + reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io)) + return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..ad45eac2c7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -110,6 +110,7 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" @dataclass() @@ -2242,9 +2243,36 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - target_schema = schema_to_pyarrow(self.projection()) + projected_schema = self.projection() + if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + from pyiceberg.io.pyiceberg_core import ( + arrow_batch_reader_from_pyiceberg_core, + can_read_projected_schema_with_pyiceberg_core, + ) + + if can_read_projected_schema_with_pyiceberg_core( + self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive + ): + try: + return arrow_batch_reader_from_pyiceberg_core( + self.io, + self.plan_files(), + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + ) + except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) + + target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, self.io, projected_schema, self.row_filter, self.case_sensitive, self.limit ).to_record_batches(self.plan_files()) return pa.RecordBatchReader.from_batches( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 0e1fd0f95b..8d516d96cf 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -26,6 +26,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + can_read_projected_schema_with_pyiceberg_core, delete_file_to_pyiceberg_core, expression_to_pyiceberg_core, file_io_to_pyiceberg_core, @@ -295,6 +296,13 @@ def test_file_scan_task_to_pyiceberg_core_adds_filter_only_field_to_read_project assert converted.kwargs["project_field_ids"] == [1, 2] +def test_can_read_projected_schema_with_pyiceberg_core_requires_filter_fields(simple_schema: Schema) -> None: + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=3) + + assert not can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("data", "abc"), True) + assert can_read_projected_schema_with_pyiceberg_core(simple_schema, projected_schema, EqualTo("id", 1), True) + + def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitioned_task(simple_schema: Schema) -> None: data_file = DataFile.from_args( content=DataFileContent.DATA, From 7a287d69c4e9ae19ee0808883f2343f1ba5897f3 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:34:34 -0500 Subject: [PATCH 2/7] feat(io): shard pyiceberg-core arrow reads across cores Fan scan tasks out over a thread pool of native ArrowReaders so decode uses multiple cores instead of one, streaming batches as they complete with at most one decoded batch per shard in flight. A default batch size amortizes the per-batch GIL handoff that otherwise dominates the fan-in. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 164 ++++++++++++++++++++- tests/io/test_pyiceberg_core.py | 250 ++++++++++++++++++++++++++++++++ 2 files changed, 411 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index f060056196..4ef33dcfca 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -19,6 +19,10 @@ from __future__ import annotations import importlib +import os +import threading +import weakref +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from typing import TYPE_CHECKING, Any from pyiceberg.expressions import ( @@ -303,6 +307,132 @@ def file_scan_task_to_pyiceberg_core( ) +# Rows per Arrow batch handed back from the native reader. The native default emits very small +# batches, and the sharded fan-in marshals every batch back through the GIL-holding consumer, so a +# tiny batch makes per-batch Python orchestration dominate the decode (measured ~3x slower than a +# whole-shard drain). A larger batch amortizes that handoff while keeping in-flight memory bounded +# to shards x batch (so the streaming contract still holds). Override with +# PYICEBERG_RUST_ARROW_BATCH_SIZE. +_DEFAULT_ARROW_BATCH_SIZE = 262144 + + +def _reader_kwargs() -> dict[str, int]: + batch_size = os.environ.get("PYICEBERG_RUST_ARROW_BATCH_SIZE") + kwargs: dict[str, int] = {"batch_size": int(batch_size) if batch_size else _DEFAULT_ARROW_BATCH_SIZE} + concurrency = os.environ.get("PYICEBERG_RUST_ARROW_FILE_CONCURRENCY") + if concurrency: + kwargs["data_file_concurrency_limit"] = int(concurrency) + return kwargs + + +def _shard_count(n_tasks: int) -> int: + """How many decode threads to fan out across. + + The native ArrowReader decodes a single stream on one core (it parallelizes I/O, not CPU + decode), so a single-stream read of many files leaves the box idle. Sharding the file tasks + across threads — each driving its own reader — recovers multi-core decode (the GIL is released + during the C-stream drain). Default scales with cores, capped so tiny scans don't pay thread + overhead; override with PYICEBERG_RUST_ARROW_SHARDS (1 disables sharding). + """ + override = os.environ.get("PYICEBERG_RUST_ARROW_SHARDS") + if override: + return max(1, int(override)) + if n_tasks <= 1: + return 1 + return max(1, min(n_tasks, (os.cpu_count() or 1))) + + +class _ShardedBatchStream: + """Generator-backed, backpressured fan-in over several native shard readers. + + Each shard owns one ``pyiceberg_core`` ``RecordBatchReader`` and is drained sequentially (a + stateful reader must not be polled concurrently), so at most one read per shard is ever in + flight. A ``ThreadPoolExecutor`` pulls the *next* batch from every idle shard at once and the + consumer is handed batches as they complete, so decode runs on up to ``n_shards`` cores (the + GIL is released during the C-stream drain). Peak memory is bounded to at most one decoded + batch per shard plus what the consumer holds — never the whole result — because a shard is not + asked for its next batch until its current one has been handed out (backpressure: a slow + consumer stalls the shards rather than buffering ahead). + + Batches are yielded as they complete (``FIRST_COMPLETED``); ordering across shards is not + preserved, which is sound because the scan result is an unordered union of file tasks. Worker + exceptions are re-raised to the consumer, and the pool is shut down on exhaustion, on an + exception during iteration, on an explicit :meth:`close`, or — for a consumer that simply + stops iterating and drops the reader — via the ``weakref`` finalizer at garbage collection. + """ + + def __init__(self, readers: list[Any]) -> None: + self._readers = readers + self._pool = ThreadPoolExecutor(max_workers=len(readers)) + # Shard indices whose reader is idle and not yet known to be exhausted. + self._idle: list[int] = list(range(len(readers))) + self._in_flight: dict[Future[Any], int] = {} + self._closed = False + self._lock = threading.Lock() + # Shut the pool down if the consumer abandons the iterator without closing it. + self._finalizer = weakref.finalize(self, self._shutdown, self._pool, self._readers) + + @staticmethod + def _next_batch(reader: Any) -> Any | None: + """Pull one batch from a shard reader, returning ``None`` when the shard is exhausted.""" + try: + return reader.read_next_batch() + except StopIteration: + return None + + def _submit_idle(self) -> None: + """Submit the next read for every idle, non-exhausted shard (one read per shard).""" + while self._idle: + shard = self._idle.pop() + future = self._pool.submit(self._next_batch, self._readers[shard]) + self._in_flight[future] = shard + + def __iter__(self) -> _ShardedBatchStream: + return self + + def __next__(self) -> Any: + if self._closed: + raise StopIteration + try: + while True: + self._submit_idle() + if not self._in_flight: + # Every shard is exhausted: the union is complete. + self.close() + raise StopIteration + done, _ = wait(self._in_flight, return_when=FIRST_COMPLETED) + for future in done: + shard = self._in_flight.pop(future) + batch = future.result() # re-raises any worker exception here + if batch is None: + continue # shard exhausted; do not return it to the idle set + # Shard produced a batch: it may have more, so mark it idle again. + self._idle.append(shard) + return batch + except StopIteration: + raise + except BaseException: + # Worker exception, GeneratorExit, or KeyboardInterrupt: tear the workers down so an + # aborted scan never leaves decode threads running. + self.close() + raise + + def close(self) -> None: + """Cancel pending work and release shard readers; idempotent and consumer-safe.""" + with self._lock: + if self._closed: + return + self._closed = True + self._finalizer.detach() + self._shutdown(self._pool, self._readers) + + @staticmethod + def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: + # cancel_futures drops queued reads; in-flight reads are joined so no thread outlives us. + pool.shutdown(wait=True, cancel_futures=True) + readers.clear() + + def arrow_batch_reader_from_pyiceberg_core( file_io: FileIO, tasks: Iterable[FileScanTask], @@ -312,7 +442,18 @@ def arrow_batch_reader_from_pyiceberg_core( name_mapping: NameMapping | None, case_sensitive: bool = True, ) -> Any: - """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader.""" + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + + Multi-file scans are sharded across a thread pool (see ``_shard_count``) so decode uses + multiple cores; a single native reader over all files would decode on one core. Each shard + drives its own native ``RecordBatchReader``; the returned ``pyarrow.RecordBatchReader`` pulls + batches from the shards lazily (at most one decoded batch per shard in flight), so the whole + result is never materialized and peak memory stays bounded. The single-file or single-shard + case skips the fan-out entirely and returns the native reader directly. + + Worker-thread exceptions propagate to the consumer, and the shard threads are shut down when + the reader is exhausted, closed early, or garbage collected. + """ core_tasks = [ file_scan_task_to_pyiceberg_core( task, @@ -326,5 +467,22 @@ def arrow_batch_reader_from_pyiceberg_core( for task in tasks ] - reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io)) - return reader.read(schema_to_pyiceberg_core(projected_schema), core_tasks) + core_projection = schema_to_pyiceberg_core(projected_schema) + reader_kwargs = _reader_kwargs() + + def _read(shard_tasks: list[Any]) -> Any: + reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io), **reader_kwargs) + return reader.read(core_projection, shard_tasks) + + shards = _shard_count(len(core_tasks)) + if shards <= 1 or len(core_tasks) <= 1: + return _read(core_tasks) + + import pyarrow as pa + + groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] + readers = [_read(group) for group in groups] + # Every native reader carries the same projected Arrow schema; use it to type the stream. + arrow_schema = readers[0].schema + stream = _ShardedBatchStream(readers) + return pa.RecordBatchReader.from_batches(arrow_schema, stream) diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 8d516d96cf..c1797441db 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -18,14 +18,18 @@ from __future__ import annotations import sys +import threading from types import ModuleType from typing import Any +import pyarrow as pa import pytest from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _ShardedBatchStream, + arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, delete_file_to_pyiceberg_core, expression_to_pyiceberg_core, @@ -122,6 +126,7 @@ def fake_pyiceberg_core(monkeypatch: pytest.MonkeyPatch) -> None: scan: Any = ModuleType("pyiceberg_core.scan") scan.DeleteFile = CoreObject scan.FileScanTask = CoreObject + scan.ArrowReader = CoreObject # streaming tests override this with a batch-producing fake monkeypatch.setitem(sys.modules, "pyiceberg_core", root) monkeypatch.setitem(sys.modules, "pyiceberg_core.schema", schema) @@ -322,3 +327,248 @@ def test_file_scan_task_to_pyiceberg_core_requires_partition_spec_for_partitione with pytest.raises(ValueError, match="partition_spec is required"): file_scan_task_to_pyiceberg_core(FileScanTask(data_file), simple_schema) + + +# --- streaming sharded reader ------------------------------------------------- + +_STREAM_SCHEMA = pa.schema([("id", pa.int64())]) + + +def _batch(values: list[int]) -> pa.RecordBatch: + return pa.record_batch({"id": pa.array(values, type=pa.int64())}) + + +class FakeShardReader: + """A stand-in for a native ``pyiceberg_core`` RecordBatchReader over one shard's tasks.""" + + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._batches = list(batches) + self._pos = 0 + self.schema = _STREAM_SCHEMA + + def read_next_batch(self) -> pa.RecordBatch: + if self._pos >= len(self._batches): + raise StopIteration + batch = self._batches[self._pos] + self._pos += 1 + return batch + + +def _drain(reader: pa.RecordBatchReader) -> tuple[int, int]: + """Return (row_count, sum-of-id checksum) — the same parity signal used in the perf gate.""" + table = reader.read_all() + rows = table.num_rows + checksum = pa.compute.sum(table["id"]).as_py() or 0 + return rows, checksum + + +def test_sharded_batch_stream_preserves_all_rows_and_checksum() -> None: + # Uneven shards exercise the "one shard finishes early" path of the fan-in. + readers = [ + FakeShardReader([_batch([1, 2]), _batch([3])]), + FakeShardReader([_batch([4, 5, 6])]), + FakeShardReader([]), # an empty shard must not stall the union + ] + expected_rows = 6 + expected_sum = 21 + + stream = _ShardedBatchStream(readers) + reader = pa.RecordBatchReader.from_batches(_STREAM_SCHEMA, stream) + + assert _drain(reader) == (expected_rows, expected_sum) + + +def test_arrow_batch_reader_streams_lazily_without_materializing(monkeypatch: pytest.MonkeyPatch) -> None: + # The legacy implementation called read_all() per shard up front; assert the new path does not + # pull any batch until the consumer asks for one (the streaming contract). + pulled: list[int] = [] + + class ObservingReader(FakeShardReader): + def read_next_batch(self) -> pa.RecordBatch: + batch = super().read_next_batch() + pulled.append(batch.num_rows) + return batch + + shard_readers = [ObservingReader([_batch([1]), _batch([2])]), ObservingReader([_batch([3]), _batch([4])])] + + class FakeArrowReader: + def __init__(self, _file_io: Any, **_kwargs: Any) -> None: + pass + + def read(self, _projection: Any, _tasks: list[Any]) -> ObservingReader: + return shard_readers.pop(0) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "2") + + reader = _build_reader(monkeypatch, n_tasks=2) + assert pulled == [] # construction must not drain anything + + first = reader.read_next_batch() + assert first.num_rows == 1 + # Backpressure: at most one read per shard is outstanding, so the first pull never drains all 4. + assert len(pulled) < 4 + + rows, checksum = _drain(reader) + assert (rows + first.num_rows, checksum + first["id"][0].as_py()) == (4, 1 + 2 + 3 + 4) + + +def test_sharded_batch_stream_bounds_in_flight_reads() -> None: + # Each shard read parks on a gate; assert no more than one read per shard is ever outstanding, + # so peak memory is bounded to one decoded batch per shard rather than the whole result. A + # shard must not be asked for its next batch until its current one is consumed (backpressure). + n_shards = 5 + release = threading.Event() + started = threading.Semaphore(0) + concurrent = 0 + peak = 0 + lock = threading.Lock() + + class GatedReader: + def __init__(self) -> None: + self.schema = _STREAM_SCHEMA + self._remaining = 4 + + def read_next_batch(self) -> pa.RecordBatch: + nonlocal concurrent, peak + if self._remaining <= 0: + raise StopIteration + self._remaining -= 1 + with lock: + concurrent += 1 + peak = max(peak, concurrent) + started.release() + release.wait(timeout=5) + with lock: + concurrent -= 1 + return _batch([1]) + + stream = _ShardedBatchStream([GatedReader() for _ in range(n_shards)]) + + consumer = threading.Thread(target=lambda: list(stream)) + consumer.start() + # All shards start their first read and park. An (n_shards + 1)th concurrent start would mean a + # shard was double-polled; assert it does not happen while the gate is closed. + for _ in range(n_shards): + assert started.acquire(timeout=5) + assert not started.acquire(timeout=0.2), "a shard was polled twice before its batch was consumed" + with lock: + assert peak <= n_shards + + release.set() + consumer.join(timeout=10) + assert not consumer.is_alive() + assert peak <= n_shards + + +def test_sharded_batch_stream_propagates_worker_exceptions() -> None: + class BoomReader: + def __init__(self) -> None: + self.schema = _STREAM_SCHEMA + + def read_next_batch(self) -> pa.RecordBatch: + raise RuntimeError("native decode failed") + + stream = _ShardedBatchStream([FakeShardReader([_batch([1])]), BoomReader()]) + reader = pa.RecordBatchReader.from_batches(_STREAM_SCHEMA, stream) + + with pytest.raises(RuntimeError, match="native decode failed"): + reader.read_all() + + # The pool must be torn down (no leaked worker threads) once the error surfaced. + assert stream._closed + assert stream._pool._shutdown + + +def test_sharded_batch_stream_shuts_down_on_early_close() -> None: + readers = [FakeShardReader([_batch([i]) for i in range(10)]) for _ in range(3)] + stream = _ShardedBatchStream(readers) + + first = next(stream) + assert first.num_rows == 1 + + stream.close() + assert stream._closed + assert stream._pool._shutdown + # close() is idempotent and a closed stream is exhausted. + stream.close() + with pytest.raises(StopIteration): + next(stream) + + +def test_sharded_batch_stream_shuts_down_on_garbage_collection() -> None: + import gc + import weakref + + readers = [FakeShardReader([_batch([i]) for i in range(50)]) for _ in range(3)] + stream = _ShardedBatchStream(readers) + pool = stream._pool + + next(stream) # leave the pool and workers live, then abandon the stream without close() + ref = weakref.ref(stream) + del stream + gc.collect() + + assert ref() is None # no lingering references kept the stream (and its threads) alive + assert pool._shutdown # the finalizer tore the pool down + + +def test_arrow_batch_reader_single_shard_returns_native_reader_directly(monkeypatch: pytest.MonkeyPatch) -> None: + native = FakeShardReader([_batch([7])]) + + class FakeArrowReader: + def __init__(self, _file_io: Any, **_kwargs: Any) -> None: + pass + + def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: + return native + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") + + # The fast path must hand back the native reader untouched, not wrap it in a fan-in. + reader = _build_reader(monkeypatch, n_tasks=4) + assert reader is native + + +def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int) -> Any: + """Drive ``arrow_batch_reader_from_pyiceberg_core`` with ``n_tasks`` trivial data-file tasks.""" + + def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: + return task + + # The conversion + projection helpers are covered by their own tests; stub them so this test + # focuses on the streaming fan-in rather than re-exercising payload conversion. + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.file_scan_task_to_pyiceberg_core", _identity_task_conversion) + monkeypatch.setattr("pyiceberg.io.pyiceberg_core.schema_to_pyiceberg_core", lambda schema: schema) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + tasks = [] + for _ in range(n_tasks): + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://warehouse/table/data.parquet", + file_format="PARQUET", + partition=Record(), + record_count=1, + file_size_in_bytes=1, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + ) + data_file.spec_id = 0 + tasks.append(FileScanTask(data_file)) + + return arrow_batch_reader_from_pyiceberg_core( + FakeFileIO(properties={}), + tasks, + schema, + schema, + {0: PartitionSpec(spec_id=0)}, + None, + True, + ) From ea84c6eeaa229345c6c26b2038a1e7c0905d4348 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:45:22 -0500 Subject: [PATCH 3/7] feat(io): apply scan limit on the pyiceberg-core path A limited scan previously always fell back to PyArrow. Push the limit through the native reader instead: truncate the streamed result at the limit (slicing the crossing batch and closing the shards so they stop decoding early) and cap the batch size to the limit so a small limit does not decode a full batch per shard. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 42 +++++++++++++++++++++++---- pyiceberg/table/__init__.py | 3 +- tests/io/test_pyiceberg_core.py | 51 ++++++++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 4ef33dcfca..b887d4a378 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -62,7 +62,7 @@ from pyiceberg.typedef import Record if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Iterable, Iterator def _core_module(name: str) -> Any: @@ -433,6 +433,29 @@ def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: readers.clear() +def _limited_batches(source: Any, limit: int) -> Iterator[Any]: + """Yield batches from ``source`` until ``limit`` rows have been emitted, then stop. + + The batch that crosses the limit is sliced, and the underlying source is closed so a sharded + scan stops decoding early instead of draining every file. An Iceberg scan limit has no ordering + guarantee, so returning the first ``limit`` rows the readers produce is correct. + """ + remaining = limit + try: + for batch in source: + if remaining <= 0: + break + if batch.num_rows > remaining: + yield batch.slice(0, remaining) + break + remaining -= batch.num_rows + yield batch + finally: + close = getattr(source, "close", None) + if close is not None: + close() + + def arrow_batch_reader_from_pyiceberg_core( file_io: FileIO, tasks: Iterable[FileScanTask], @@ -441,6 +464,7 @@ def arrow_batch_reader_from_pyiceberg_core( partition_specs: dict[int, PartitionSpec], name_mapping: NameMapping | None, case_sensitive: bool = True, + limit: int | None = None, ) -> Any: """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. @@ -469,20 +493,28 @@ def arrow_batch_reader_from_pyiceberg_core( core_projection = schema_to_pyiceberg_core(projected_schema) reader_kwargs = _reader_kwargs() + if limit is not None: + # No point decoding a full default-sized batch per shard just to truncate to a small limit. + reader_kwargs["batch_size"] = max(1, min(reader_kwargs["batch_size"], limit)) def _read(shard_tasks: list[Any]) -> Any: reader = _core_module("scan").ArrowReader(file_io_to_pyiceberg_core(file_io), **reader_kwargs) return reader.read(core_projection, shard_tasks) + import pyarrow as pa + shards = _shard_count(len(core_tasks)) if shards <= 1 or len(core_tasks) <= 1: - return _read(core_tasks) - - import pyarrow as pa + reader = _read(core_tasks) + if limit is None: + return reader + return pa.RecordBatchReader.from_batches(reader.schema, _limited_batches(reader, limit)) groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] readers = [_read(group) for group in groups] # Every native reader carries the same projected Arrow schema; use it to type the stream. arrow_schema = readers[0].schema - stream = _ShardedBatchStream(readers) + stream: Any = _ShardedBatchStream(readers) + if limit is not None: + stream = _limited_batches(stream, limit) return pa.RecordBatchReader.from_batches(arrow_schema, stream) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ad45eac2c7..29b637ea21 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2244,7 +2244,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() - if self.limit is None and os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: + if os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import ( arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, @@ -2262,6 +2262,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: self.table_metadata.specs(), self.table_metadata.name_mapping(), self.case_sensitive, + limit=self.limit, ) except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: warnings.warn( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index c1797441db..3f3cabb3ed 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -28,6 +28,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _limited_batches, _ShardedBatchStream, arrow_batch_reader_from_pyiceberg_core, can_read_projected_schema_with_pyiceberg_core, @@ -531,7 +532,54 @@ def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: assert reader is native -def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int) -> Any: +def test_limited_batches_truncates_to_limit_and_closes_source() -> None: + class ClosableSource: + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._it = iter(batches) + self.closed = False + + def __iter__(self) -> ClosableSource: + return self + + def __next__(self) -> pa.RecordBatch: + return next(self._it) + + def close(self) -> None: + self.closed = True + + source = ClosableSource([_batch([1, 2, 3]), _batch([4, 5, 6]), _batch([7, 8, 9])]) + out = list(_limited_batches(source, 4)) + + rows = [v for batch in out for v in batch["id"].to_pylist()] + assert rows == [1, 2, 3, 4] # the batch crossing the limit is sliced + assert source.closed # the underlying source is closed so a sharded scan stops decoding early + + +def test_arrow_batch_reader_applies_limit_across_shards(monkeypatch: pytest.MonkeyPatch) -> None: + captured_batch_size: list[Any] = [] + shard_readers = [ + FakeShardReader([_batch([1, 2]), _batch([3, 4])]), + FakeShardReader([_batch([5, 6]), _batch([7, 8])]), + ] + + class FakeArrowReader: + def __init__(self, _file_io: Any, **kwargs: Any) -> None: + captured_batch_size.append(kwargs.get("batch_size")) + + def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: + return shard_readers.pop(0) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "2") + + reader = _build_reader(monkeypatch, n_tasks=4, limit=3) + table = reader.read_all() + + assert table.num_rows == 3 # the global limit is enforced across shards, not per shard + assert captured_batch_size == [3, 3] # batch size is capped to the limit so small limits don't over-decode + + +def _build_reader(monkeypatch: pytest.MonkeyPatch, n_tasks: int, limit: int | None = None) -> Any: """Drive ``arrow_batch_reader_from_pyiceberg_core`` with ``n_tasks`` trivial data-file tasks.""" def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: @@ -571,4 +619,5 @@ def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: {0: PartitionSpec(spec_id=0)}, None, True, + limit=limit, ) From 51a3640e93e6b99f63ce03ff2c9ed469680a9581 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 04:58:39 -0500 Subject: [PATCH 4/7] fix(io): return the PyArrow target schema from the native scan The native reader emits arrow-rs's own types (string rather than large_string, and run-end-encoded identity-partition columns), so its output diverged from the PyArrow scan path. Cast every batch to schema_to_pyarrow(projected_schema), decoding run-end-encoded columns first since there is no direct cast kernel for them, so the native path is a faithful drop-in. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 53 +++++++++++++++++++++++++-------- tests/io/test_pyiceberg_core.py | 49 ++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index b887d4a378..777057de7a 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -433,6 +433,34 @@ def _shutdown(pool: ThreadPoolExecutor, readers: list[Any]) -> None: readers.clear() +def _cast_batches(source: Any, target_schema: Any) -> Iterator[Any]: + """Cast every batch to the PyArrow path's output schema so the native path is a faithful drop-in. + + The native reader returns arrow-rs's own types (e.g. ``string`` rather than PyIceberg's + ``large_string``) and run-end-encodes constant identity-partition columns. There is no direct + cast kernel from run-end-encoded to the target type, so those columns are decoded first, then a + single ``RecordBatch.cast`` aligns the whole batch with ``schema_to_pyarrow(projected_schema)``. + """ + import pyarrow as pa + import pyarrow.compute as pc + + try: + for batch in source: + columns = None + for i, column in enumerate(batch.columns): + if pa.types.is_run_end_encoded(column.type): + if columns is None: + columns = list(batch.columns) + columns[i] = pc.run_end_decode(column) + if columns is not None: + batch = pa.RecordBatch.from_arrays(columns, names=batch.schema.names) + yield batch.cast(target_schema) + finally: + close = getattr(source, "close", None) + if close is not None: + close() + + def _limited_batches(source: Any, limit: int) -> Iterator[Any]: """Yield batches from ``source`` until ``limit`` rows have been emitted, then stop. @@ -503,18 +531,19 @@ def _read(shard_tasks: list[Any]) -> Any: import pyarrow as pa + from pyiceberg.io.pyarrow import schema_to_pyarrow + + # The PyArrow scan path returns this exact schema; casting the native batches to it makes the + # native path a faithful drop-in (matching large_string, decoded partition columns, etc.). + target_schema = schema_to_pyarrow(projected_schema) + shards = _shard_count(len(core_tasks)) if shards <= 1 or len(core_tasks) <= 1: - reader = _read(core_tasks) - if limit is None: - return reader - return pa.RecordBatchReader.from_batches(reader.schema, _limited_batches(reader, limit)) - - groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] - readers = [_read(group) for group in groups] - # Every native reader carries the same projected Arrow schema; use it to type the stream. - arrow_schema = readers[0].schema - stream: Any = _ShardedBatchStream(readers) + source: Any = _read(core_tasks) + else: + groups = [g for g in (core_tasks[i::shards] for i in range(shards)) if g] + source = _ShardedBatchStream([_read(group) for group in groups]) + if limit is not None: - stream = _limited_batches(stream, limit) - return pa.RecordBatchReader.from_batches(arrow_schema, stream) + source = _limited_batches(source, limit) + return pa.RecordBatchReader.from_batches(target_schema, _cast_batches(source, target_schema)) diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 3f3cabb3ed..41ec5af365 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -28,6 +28,7 @@ from pyiceberg.expressions import And, EqualTo, IsNull, StartsWith from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.io.pyiceberg_core import ( + _cast_batches, _limited_batches, _ShardedBatchStream, arrow_batch_reader_from_pyiceberg_core, @@ -354,6 +355,13 @@ def read_next_batch(self) -> pa.RecordBatch: self._pos += 1 return batch + # A real pyarrow.RecordBatchReader (what the single-shard fast path drains) is also iterable. + def __iter__(self) -> FakeShardReader: + return self + + def __next__(self) -> pa.RecordBatch: + return self.read_next_batch() + def _drain(reader: pa.RecordBatchReader) -> tuple[int, int]: """Return (row_count, sum-of-id checksum) — the same parity signal used in the perf gate.""" @@ -514,7 +522,9 @@ def test_sharded_batch_stream_shuts_down_on_garbage_collection() -> None: assert pool._shutdown # the finalizer tore the pool down -def test_arrow_batch_reader_single_shard_returns_native_reader_directly(monkeypatch: pytest.MonkeyPatch) -> None: +def test_arrow_batch_reader_single_shard_casts_to_target_schema(monkeypatch: pytest.MonkeyPatch) -> None: + # The native reader hands back int64; the projected schema is IntegerType, so even the + # single-shard fast path must cast its output to the PyArrow target schema (here int32). native = FakeShardReader([_batch([7])]) class FakeArrowReader: @@ -527,9 +537,42 @@ def read(self, _projection: Any, _tasks: list[Any]) -> FakeShardReader: monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") - # The fast path must hand back the native reader untouched, not wrap it in a fan-in. reader = _build_reader(monkeypatch, n_tasks=4) - assert reader is native + table = reader.read_all() + + assert table.column("id").to_pylist() == [7] + assert table.schema.field("id").type == pa.int32() # cast to the projected schema's type, not the native int64 + + +def test_cast_batches_decodes_ree_and_matches_target_schema() -> None: + import pyarrow.compute as pc + + ree = pc.run_end_encode(pa.array(["a", "a", "b"], pa.string())) + batch = pa.record_batch({"id": pa.array([1, 2, 3], pa.int64()), "cat": ree}) + target = pa.schema([("id", pa.int32()), ("cat", pa.large_string())]) + + class Source: + def __init__(self, batches: list[pa.RecordBatch]) -> None: + self._it = iter(batches) + self.closed = False + + def __iter__(self) -> Source: + return self + + def __next__(self) -> pa.RecordBatch: + return next(self._it) + + def close(self) -> None: + self.closed = True + + source = Source([batch]) + out = list(_cast_batches(source, target)) + + assert len(out) == 1 + assert out[0].schema.field("id").type == pa.int32() # native int64 widened/narrowed to the target + assert out[0].schema.field("cat").type == pa.large_string() # run-end-encoded column decoded then cast + assert out[0].column("cat").to_pylist() == ["a", "a", "b"] + assert source.closed def test_limited_batches_truncates_to_limit_and_closes_source() -> None: From e7cd71997fb57e80f9d63ad3a37b1ebc3e424c4c Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 05:28:05 -0500 Subject: [PATCH 5/7] feat(io): add opt-in rust scan planning PYICEBERG_RUST_SCAN_PLANNING plans the scan in pyiceberg-core (Table.plan_files) instead of PyIceberg's Python manifest planning, then streams the planned tasks through the same sharded, casted reader as the read path. Falls back to PyArrow on any scan pyiceberg-core cannot handle. Co-Authored-By: Claude Opus 4.8 --- pyiceberg/io/pyiceberg_core.py | 100 +++++++++++++++----- pyiceberg/table/__init__.py | 22 +++++ tests/io/test_pyiceberg_core.py | 158 ++++++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 21 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 777057de7a..5792b10220 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -484,17 +484,13 @@ def _limited_batches(source: Any, limit: int) -> Iterator[Any]: close() -def arrow_batch_reader_from_pyiceberg_core( +def _read_core_tasks( file_io: FileIO, - tasks: Iterable[FileScanTask], - schema: Schema, + core_tasks: list[Any], projected_schema: Schema, - partition_specs: dict[int, PartitionSpec], - name_mapping: NameMapping | None, - case_sensitive: bool = True, - limit: int | None = None, + limit: int | None, ) -> Any: - """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + """Stream pyiceberg-core file scan tasks through ArrowReader as a casted, limited reader. Multi-file scans are sharded across a thread pool (see ``_shard_count``) so decode uses multiple cores; a single native reader over all files would decode on one core. Each shard @@ -506,19 +502,6 @@ def arrow_batch_reader_from_pyiceberg_core( Worker-thread exceptions propagate to the consumer, and the shard threads are shut down when the reader is exhausted, closed early, or garbage collected. """ - core_tasks = [ - file_scan_task_to_pyiceberg_core( - task, - schema, - projected_schema, - partition_spec=partition_specs.get(task.file.spec_id), - name_mapping=name_mapping, - case_sensitive=case_sensitive, - project_field_ids=list(projected_schema.field_ids), - ) - for task in tasks - ] - core_projection = schema_to_pyiceberg_core(projected_schema) reader_kwargs = _reader_kwargs() if limit is not None: @@ -547,3 +530,78 @@ def _read(shard_tasks: list[Any]) -> Any: if limit is not None: source = _limited_batches(source, limit) return pa.RecordBatchReader.from_batches(target_schema, _cast_batches(source, target_schema)) + + +def arrow_batch_reader_from_pyiceberg_core( + file_io: FileIO, + tasks: Iterable[FileScanTask], + schema: Schema, + projected_schema: Schema, + partition_specs: dict[int, PartitionSpec], + name_mapping: NameMapping | None, + case_sensitive: bool = True, + limit: int | None = None, +) -> Any: + """Read PyIceberg scan tasks through pyiceberg-core's ArrowReader as a streaming reader. + + PyIceberg plans the files (Python-side manifest planning); each task is converted to a + pyiceberg-core file scan task and streamed through ``_read_core_tasks``. + """ + core_tasks = [ + file_scan_task_to_pyiceberg_core( + task, + schema, + projected_schema, + partition_spec=partition_specs.get(task.file.spec_id), + name_mapping=name_mapping, + case_sensitive=case_sensitive, + project_field_ids=list(projected_schema.field_ids), + ) + for task in tasks + ] + return _read_core_tasks(file_io, core_tasks, projected_schema, limit) + + +def plan_and_read_with_pyiceberg_core( + table_metadata: Any, + io: FileIO, + projected_schema: Schema, + row_filter: BooleanExpression, + table_identifier: tuple[str, ...] | None, + *, + case_sensitive: bool = True, + snapshot_id: int | None = None, + limit: int | None = None, +) -> Any: + """Plan and read a scan entirely in pyiceberg-core, skipping PyIceberg's manifest planning. + + pyiceberg-core's ``Table.plan_files`` produces the file scan tasks (data files, deletes, + partition data, residual predicate) directly from the table metadata, and those tasks are fed + straight into ``_read_core_tasks``. The native planner applies the residual during the read, so + a filter on a column that is not part of ``projected_schema`` is still honoured. + + ``selected_fields`` must be the top-level projected field *names* (the native planner sets each + task's ``project_field_ids`` from them); only flat projections are expressible this way. + """ + scan = _core_module("scan") + file_io = file_io_to_pyiceberg_core(io) + + # The identifier only names the table rust-side (data-file paths are absolute, so it does not + # affect planning); rust rejects a <2-part identifier, so fall back to a safe 2-part name. + identifier = list(table_identifier) if table_identifier and len(table_identifier) >= 2 else ["_", "_"] + + native_table = scan.Table.from_metadata_json(file_io, identifier, table_metadata.model_dump_json()) + + predicate = None + if not isinstance(row_filter, AlwaysTrue): + predicate = expression_to_pyiceberg_core(row_filter, table_metadata.schema(), case_sensitive) + + tasks = list( + native_table.plan_files( + selected_fields=[field.name for field in projected_schema.fields], + predicate=predicate, + snapshot_id=snapshot_id, + case_sensitive=case_sensitive, + ) + ) + return _read_core_tasks(io, tasks, projected_schema, limit) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 29b637ea21..8d6aece6e7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -111,6 +111,7 @@ ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" PYICEBERG_RUST_ARROW_SCAN = "PYICEBERG_RUST_ARROW_SCAN" +PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" @dataclass() @@ -2244,6 +2245,27 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() + if os.environ.get(PYICEBERG_RUST_SCAN_PLANNING, "").lower() in {"1", "true", "yes"}: + from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core + + try: + return plan_and_read_with_pyiceberg_core( + self.table_metadata, + self.io, + projected_schema, + self.row_filter, + self.table_identifier, + case_sensitive=self.case_sensitive, + snapshot_id=self.snapshot_id, + limit=self.limit, + ) + except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) + if os.environ.get(PYICEBERG_RUST_ARROW_SCAN, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import ( arrow_batch_reader_from_pyiceberg_core, diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 41ec5af365..3149105886 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -37,6 +37,7 @@ expression_to_pyiceberg_core, file_io_to_pyiceberg_core, file_scan_task_to_pyiceberg_core, + plan_and_read_with_pyiceberg_core, schema_to_pyiceberg_core, ) from pyiceberg.manifest import DataFile, DataFileContent @@ -86,6 +87,29 @@ def negate(self) -> CorePredicate: return CorePredicate(op="not", child=self) +class CoreScanTable(CoreObject): + """Fake ``pyiceberg_core.scan.Table`` that records planning args and emits planned tasks.""" + + last_from_metadata_json: dict[str, Any] = {} + last_plan_files: dict[str, Any] = {} + + @classmethod + def from_metadata_json(cls, file_io: Any, identifier: Any, metadata_json: str, **kwargs: Any) -> CoreScanTable: + CoreScanTable.last_from_metadata_json = { + "file_io": file_io, + "identifier": identifier, + "metadata_json": metadata_json, + **kwargs, + } + return cls() + + def plan_files(self, **kwargs: Any) -> list[CoreObject]: + CoreScanTable.last_plan_files = kwargs + # One planned task per selected field keeps the count deterministic and lets the read fake + # echo back exactly what planning produced. + return [CoreObject(planned=name) for name in kwargs["selected_fields"]] + + class CoreReference(CoreObject): def _predicate(self, op: str, *args: Any) -> CorePredicate: return CorePredicate(op=op, name=self.args[0], args=args) @@ -128,6 +152,7 @@ def fake_pyiceberg_core(monkeypatch: pytest.MonkeyPatch) -> None: scan: Any = ModuleType("pyiceberg_core.scan") scan.DeleteFile = CoreObject scan.FileScanTask = CoreObject + scan.Table = CoreScanTable # planning tests drive from_metadata_json + plan_files through this scan.ArrowReader = CoreObject # streaming tests override this with a batch-producing fake monkeypatch.setitem(sys.modules, "pyiceberg_core", root) @@ -664,3 +689,136 @@ def _identity_task_conversion(task: Any, *_args: Any, **_kwargs: Any) -> Any: True, limit=limit, ) + + +# --- native scan planning ----------------------------------------------------- + + +class FakeTableMetadata: + """Minimal stand-in: native planning only needs the metadata JSON and the table schema.""" + + def __init__(self, schema: Schema) -> None: + self._schema = schema + + def model_dump_json(self) -> str: + return '{"format-version": 2}' + + def schema(self) -> Schema: + return self._schema + + +def _planning_arrow_reader(monkeypatch: pytest.MonkeyPatch, captured: dict[str, Any]) -> None: + """Wire a fake ArrowReader that records the projection + planned tasks and emits one batch.""" + + class FakeArrowReader: + def __init__(self, _file_io: Any, **kwargs: Any) -> None: + captured["reader_kwargs"] = kwargs + + def read(self, projection: Any, tasks: list[Any]) -> FakeShardReader: + captured["projection"] = projection + captured["tasks"] = tasks + return FakeShardReader([_batch([1, 2, 3])]) + + monkeypatch.setattr(sys.modules["pyiceberg_core.scan"], "ArrowReader", FakeArrowReader) + monkeypatch.setenv("PYICEBERG_RUST_ARROW_SHARDS", "1") + + +def test_plan_and_read_passes_projection_and_filter_to_native_planner(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "data", StringType()), + schema_id=0, + ) + projected_schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + reader = plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={"s3.region": "us-east-1"}), + projected_schema, + EqualTo("id", 5), + ("ns", "t"), + case_sensitive=False, + snapshot_id=42, + ) + table = reader.read_all() + + # The metadata JSON and a >=2-part identifier are handed to from_metadata_json verbatim. + assert CoreScanTable.last_from_metadata_json["identifier"] == ["ns", "t"] + assert CoreScanTable.last_from_metadata_json["metadata_json"] == '{"format-version": 2}' + # plan_files receives the projection as field NAMES, the converted predicate, snapshot, sensitivity. + plan = CoreScanTable.last_plan_files + assert plan["selected_fields"] == ["id"] + assert plan["snapshot_id"] == 42 + assert plan["case_sensitive"] is False + assert plan["predicate"].kwargs == {"op": "eq", "name": "id", "args": (5,)} + # The planned tasks (not python-built tasks) are what the reader consumes. + assert [task.kwargs["planned"] for task in captured["tasks"]] == ["id"] + # Output is cast to the projected schema's PyArrow type (IntegerType -> int32, not native int64). + assert table.schema.field("id").type == pa.int32() + assert table.column("id").to_pylist() == [1, 2, 3] + + +def test_plan_and_read_skips_predicate_for_always_true_filter(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("ns", "t"), + ).read_all() + + # An unfiltered scan must not pass a predicate at all (rather than always_true()). + assert CoreScanTable.last_plan_files["predicate"] is None + + +def test_plan_and_read_falls_back_to_safe_identifier_for_short_identifier(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + # A single-part (or None) identifier cannot name a table rust-side; a 2-part fallback is used. + plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("only_one",), + ).read_all() + + identifier = CoreScanTable.last_from_metadata_json["identifier"] + assert len(identifier) >= 2 + + +def test_plan_and_read_applies_limit(monkeypatch: pytest.MonkeyPatch) -> None: + from pyiceberg.expressions import AlwaysTrue + + captured: dict[str, Any] = {} + _planning_arrow_reader(monkeypatch, captured) + + schema = Schema(NestedField(1, "id", IntegerType(), required=True), schema_id=0) + + reader = plan_and_read_with_pyiceberg_core( + FakeTableMetadata(schema), + FakeFileIO(properties={}), + schema, + AlwaysTrue(), + ("ns", "t"), + limit=2, + ) + table = reader.read_all() + + assert table.num_rows == 2 # the batch of 3 is truncated to the limit + assert captured["reader_kwargs"]["batch_size"] == 2 # batch size capped to the limit From 7230d6007a8cdfc05b86d92c56e379a13d023989 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 07:28:39 -0500 Subject: [PATCH 6/7] fix(io): make opt-in native arrow scan usable on real S3 catalogs The native path crashed on a normal REST+S3 catalog instead of degrading: non-str FileIO props (the REST auth manager) reached the Rust binding, S3 path-style/region were never translated to the opendal keys, and the fallback guard missed decode-time errors and schemeless local paths. Filter props to strings, mirror PyArrow's path-style default and pass region, prime the first batch so a decode mismatch falls back too, and skip native for bare paths. Scoped to static-credential FileIO -- a refreshing auth manager is still not carried to the native path. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyiceberg/io/pyiceberg_core.py | 49 ++++++++++++++- pyiceberg/table/__init__.py | 99 ++++++++++++++++++++++------- tests/io/test_pyiceberg_core.py | 106 +++++++++++++++++++++++++++++++- 3 files changed, 227 insertions(+), 27 deletions(-) diff --git a/pyiceberg/io/pyiceberg_core.py b/pyiceberg/io/pyiceberg_core.py index 5792b10220..7aa2c9aef8 100644 --- a/pyiceberg/io/pyiceberg_core.py +++ b/pyiceberg/io/pyiceberg_core.py @@ -53,13 +53,19 @@ UnaryPredicate, UnboundTerm, ) -from pyiceberg.io import FileIO +from pyiceberg.io import ( + AWS_REGION, + S3_FORCE_VIRTUAL_ADDRESSING, + S3_REGION, + FileIO, +) from pyiceberg.manifest import DataFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import FileScanTask from pyiceberg.table.name_mapping import NameMapping from pyiceberg.typedef import Record +from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -89,9 +95,46 @@ def schema_to_pyiceberg_core(schema: Schema) -> Any: return _core_module("schema").Schema.from_json(_model_json(schema)) +def _string_props(properties: dict[str, Any]) -> dict[str, str]: + """Keep only the string-valued FileIO properties the Rust binding accepts. + + The Rust ``FileIO.from_props`` requires every value to be a ``str``. The REST catalog injects a + live ``LegacyOAuth2AuthManager`` object under ``auth.manager`` (for token refresh), which would + raise ``TypeError`` and crash every native scan. Dropping non-str values is a scoped fix for + static-credential FileIO (endpoint plus access key/secret/token, all strings) -- it deliberately + does NOT hand off a non-str auth manager, so signed/refreshing auth is not carried to the native + path. That is a known limitation, not a complete auth handoff. + """ + return {key: value for key, value in properties.items() if isinstance(value, str)} + + def file_io_to_pyiceberg_core(file_io: FileIO) -> Any: - """Convert a PyIceberg FileIO to a pyiceberg-core FileIO-like object.""" - return _core_module("file_io").FileIO.from_props(dict(file_io.properties)) + """Convert a PyIceberg FileIO to a pyiceberg-core FileIO-like object. + + Only string-valued properties are forwarded (see ``_string_props``), and two PyIceberg S3 + properties are translated into the opendal-backed binding's own keys so MinIO/path-style and + region-required deployments work on the native path the same way they do on PyArrow. + """ + props = _string_props(dict(file_io.properties)) + + # PyIceberg documents ``s3.force-virtual-addressing`` (default False on the s3 scan path), but + # the opendal S3 client reads ``s3.path-style-access``. Mirror the PyArrow default: path-style + # access is on unless virtual addressing was explicitly requested. Only set the binding key when + # it is not already provided, so an explicit native-side override still wins. + if "s3.path-style-access" not in props: + force_virtual_addressing = property_as_bool(props, S3_FORCE_VIRTUAL_ADDRESSING, False) + if not force_virtual_addressing: + props["s3.path-style-access"] = "true" + + # The opendal S3 builder requires an explicit region (PyArrow resolves it implicitly). Fall back + # to PyIceberg's ``client.region`` property and finally the AWS_REGION OS env so the native path + # does not fail with "region is missing". + if S3_REGION not in props: + region = props.get(AWS_REGION) or os.environ.get("AWS_REGION") + if region: + props[S3_REGION] = region + + return _core_module("file_io").FileIO.from_props(props) def _literal_value(value: Any) -> Any: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8d6aece6e7..5af9beb818 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -114,6 +114,35 @@ PYICEBERG_RUST_SCAN_PLANNING = "PYICEBERG_RUST_SCAN_PLANNING" +def _native_scan_supports_paths(tasks: Iterable[FileScanTask]) -> bool: + """Return whether every planned data file has a URL scheme the native opendal reader can open. + + The opendal-backed reader crashes on a bare local path (no ``scheme://``); the PyArrow path + reads those fine. Checking the planned task paths up front lets an unschemed path fall back to + PyArrow cleanly instead of crashing inside the native reader. + """ + return all("://" in task.file.file_path for task in tasks) + + +def _prime_native_reader(reader: pa.RecordBatchReader) -> pa.RecordBatchReader: + """Pull the first batch so a native decode failure surfaces before any batch is handed out. + + The native reader builds eagerly but decodes lazily, so a binding mismatch can raise either at + construction or while the FIRST batch is decoded (e.g. the run-end-decode/cast in + pyiceberg_core). Forcing the first batch here lets the caller's guard fall back to PyArrow. The + primed batch is re-chained ahead of the rest, so iteration is unchanged. Once a batch has been + yielded a later error MUST propagate -- restarting on PyArrow would re-emit already returned rows + (silent duplication), so this only ever fronts the very first batch. + """ + import pyarrow as pa + + try: + first = reader.read_next_batch() + except StopIteration: + return reader + return pa.RecordBatchReader.from_batches(reader.schema, itertools.chain((first,), reader)) + + @dataclass() class UpsertResult: """Summary the upsert operation.""" @@ -2245,21 +2274,34 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow projected_schema = self.projection() + + # An unsupported native scan should DEGRADE to PyArrow, not crash. The native reader builds + # eagerly but decodes lazily, so a binding mismatch can surface either at construction or + # while the FIRST batch is decoded (e.g. the run-end-decode/cast in pyiceberg_core). We prime + # that first batch under the guard and re-chain it; once any batch has been handed to the + # caller we MUST let later errors propagate -- restarting on PyArrow would re-emit already + # yielded rows (silent duplication). The catch is scoped to the native attempt only: the + # binding raises ValueError/NotImplementedError for unsupported scans and TypeError/ + # ArrowInvalid for prop/decode mismatches; broader user errors still surface. + native_fallback_errors = (ModuleNotFoundError, NotImplementedError, ValueError, TypeError, pa.lib.ArrowInvalid) + if os.environ.get(PYICEBERG_RUST_SCAN_PLANNING, "").lower() in {"1", "true", "yes"}: from pyiceberg.io.pyiceberg_core import plan_and_read_with_pyiceberg_core try: - return plan_and_read_with_pyiceberg_core( - self.table_metadata, - self.io, - projected_schema, - self.row_filter, - self.table_identifier, - case_sensitive=self.case_sensitive, - snapshot_id=self.snapshot_id, - limit=self.limit, + return _prime_native_reader( + plan_and_read_with_pyiceberg_core( + self.table_metadata, + self.io, + projected_schema, + self.row_filter, + self.table_identifier, + case_sensitive=self.case_sensitive, + snapshot_id=self.snapshot_id, + limit=self.limit, + ) ) - except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + except native_fallback_errors as exc: warnings.warn( f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", RuntimeWarning, @@ -2275,23 +2317,34 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: if can_read_projected_schema_with_pyiceberg_core( self.table_metadata.schema(), projected_schema, self.row_filter, self.case_sensitive ): - try: - return arrow_batch_reader_from_pyiceberg_core( - self.io, - self.plan_files(), - self.table_metadata.schema(), - projected_schema, - self.table_metadata.specs(), - self.table_metadata.name_mapping(), - self.case_sensitive, - limit=self.limit, - ) - except (ModuleNotFoundError, NotImplementedError, ValueError) as exc: + # Plan once and reuse: the bare-path guard inspects the same tasks the reader gets. + native_tasks = list(self.plan_files()) + if not _native_scan_supports_paths(native_tasks): warnings.warn( - f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + "Falling back to PyArrow scan because pyiceberg-core cannot read a data file path without a URL scheme.", RuntimeWarning, stacklevel=2, ) + else: + try: + return _prime_native_reader( + arrow_batch_reader_from_pyiceberg_core( + self.io, + native_tasks, + self.table_metadata.schema(), + projected_schema, + self.table_metadata.specs(), + self.table_metadata.name_mapping(), + self.case_sensitive, + limit=self.limit, + ) + ) + except native_fallback_errors as exc: + warnings.warn( + f"Falling back to PyArrow scan because pyiceberg-core cannot handle this scan: {exc}", + RuntimeWarning, + stacklevel=2, + ) target_schema = schema_to_pyarrow(projected_schema) batches = ArrowScan( diff --git a/tests/io/test_pyiceberg_core.py b/tests/io/test_pyiceberg_core.py index 3149105886..fc8cf6d2f4 100644 --- a/tests/io/test_pyiceberg_core.py +++ b/tests/io/test_pyiceberg_core.py @@ -188,7 +188,39 @@ def test_schema_to_pyiceberg_core_uses_lazy_core_schema_json(simple_schema: Sche def test_file_io_to_pyiceberg_core_uses_file_io_properties() -> None: converted = file_io_to_pyiceberg_core(FakeFileIO(properties={"s3.region": "us-east-1"})) - assert converted.kwargs == {"properties": {"s3.region": "us-east-1"}} + # Region present, no force-virtual-addressing -> path-style is turned on (PyArrow default). + assert converted.kwargs == {"properties": {"s3.region": "us-east-1", "s3.path-style-access": "true"}} + + +def test_file_io_to_pyiceberg_core_drops_non_str_props() -> None: + # The REST catalog injects a live auth-manager object under "auth.manager"; the Rust binding + # only accepts str values, so non-str props must be dropped rather than crashing the native scan. + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "auth.manager": object()}) + converted = file_io_to_pyiceberg_core(file_io) + + assert "auth.manager" not in converted.kwargs["properties"] + assert converted.kwargs["properties"]["s3.region"] == "us-east-1" + + +def test_file_io_to_pyiceberg_core_translates_path_style_for_minio() -> None: + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "s3.force-virtual-addressing": "false"}) + converted = file_io_to_pyiceberg_core(file_io) + + assert converted.kwargs["properties"]["s3.path-style-access"] == "true" + + +def test_file_io_to_pyiceberg_core_keeps_virtual_addressing() -> None: + file_io = FakeFileIO(properties={"s3.region": "us-east-1", "s3.force-virtual-addressing": "true"}) + converted = file_io_to_pyiceberg_core(file_io) + + assert "s3.path-style-access" not in converted.kwargs["properties"] + + +def test_file_io_to_pyiceberg_core_falls_back_to_aws_region_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AWS_REGION", "eu-west-1") + converted = file_io_to_pyiceberg_core(FakeFileIO(properties={})) + + assert converted.kwargs["properties"]["s3.region"] == "eu-west-1" def test_expression_to_pyiceberg_core_converts_expression_tree(simple_schema: Schema) -> None: @@ -822,3 +854,75 @@ def test_plan_and_read_applies_limit(monkeypatch: pytest.MonkeyPatch) -> None: assert table.num_rows == 2 # the batch of 3 is truncated to the limit assert captured["reader_kwargs"]["batch_size"] == 2 # batch size capped to the limit + + +def _bare_path_task(file_path: str) -> FileScanTask: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format="PARQUET", + partition=Record(), + record_count=1, + file_size_in_bytes=1, + ) + data_file.spec_id = 0 + return FileScanTask(data_file) + + +def test_native_scan_supports_paths_rejects_bare_paths() -> None: + from pyiceberg.table import _native_scan_supports_paths + + assert _native_scan_supports_paths([_bare_path_task("s3://warehouse/t/data.parquet")]) + # A bare local path (no scheme) crashes opendal, so the dispatch must fall back to PyArrow. + assert not _native_scan_supports_paths([_bare_path_task("/tmp/warehouse/t/data.parquet")]) + assert not _native_scan_supports_paths([_bare_path_task("s3://warehouse/t/a.parquet"), _bare_path_task("/tmp/t/b.parquet")]) + + +def _prime_batch(value: int) -> pa.RecordBatch: + return pa.record_batch({"id": pa.array([value], pa.int32())}) + + +def test_prime_native_reader_preserves_batch_order() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + reader = pa.RecordBatchReader.from_batches(schema, iter([_prime_batch(1), _prime_batch(2), _prime_batch(3)])) + + primed = _prime_native_reader(reader) + + assert primed.read_all().column("id").to_pylist() == [1, 2, 3] + + +def test_prime_native_reader_surfaces_error_before_first_batch() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + + def _explode() -> Any: + raise pa.lib.ArrowInvalid("decode failed on first batch") + yield # type: ignore[unreachable] # pragma: no cover - generator marker + + reader = pa.RecordBatchReader.from_batches(schema, _explode()) + + # The error must surface from _prime_native_reader so the caller's guard can fall back to PyArrow. + with pytest.raises(pa.lib.ArrowInvalid): + _prime_native_reader(reader) + + +def test_prime_native_reader_propagates_mid_stream_error_without_restart() -> None: + from pyiceberg.table import _prime_native_reader + + schema = pa.schema([pa.field("id", pa.int32())]) + + def _batches() -> Any: + yield _prime_batch(1) + raise pa.lib.ArrowInvalid("decode failed mid-stream") + + reader = pa.RecordBatchReader.from_batches(schema, _batches()) + primed = _prime_native_reader(reader) + + # First batch was already handed out; a later failure must propagate (no silent fallback/restart + # that would re-emit the first batch). + assert primed.read_next_batch().column("id").to_pylist() == [1] + with pytest.raises(pa.lib.ArrowInvalid): + primed.read_next_batch() From 60a6ee378ba3c44db3271a97d5631c32867a6ff1 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sat, 6 Jun 2026 07:32:17 -0500 Subject: [PATCH 7/7] feat(io): make scan planning pluggable via a ScanPlanner strategy plan_files() now resolves a ScanPlanner -- local manifest planning or REST server-side, exactly as before -- or uses one injected on the scan. That injection point is the seam a Rust or engine-specific planner plugs into without touching any call site. The bundled RustScanPlanner is an opt-in, deliberately-unimplemented stub: the native plan output exposes only booleans/counts, not the residual, partition data, or deletes needed to rebuild a faithful FileScanTask, so the fused native read path stays the supported native route. Env flags and default resolution are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyiceberg/table/__init__.py | 17 ++-- pyiceberg/table/scan_planning.py | 84 ++++++++++++++++++++ tests/table/test_scan_planning.py | 126 ++++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 pyiceberg/table/scan_planning.py create mode 100644 tests/table/test_scan_planning.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5af9beb818..5d90215f1e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -107,6 +107,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask + from pyiceberg.table.scan_planning import ScanPlanner ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -1840,6 +1841,7 @@ class TableScan(ABC): limit: int | None catalog: Catalog | None table_identifier: Identifier | None + scan_planner: ScanPlanner | None def __init__( self, @@ -1853,6 +1855,7 @@ def __init__( limit: int | None = None, catalog: Catalog | None = None, table_identifier: Identifier | None = None, + scan_planner: ScanPlanner | None = None, ): self.table_metadata = table_metadata self.io = io @@ -1864,6 +1867,7 @@ def __init__( self.limit = limit self.catalog = catalog self.table_identifier = table_identifier + self.scan_planner = scan_planner def snapshot(self) -> Snapshot | None: if self.snapshot_id: @@ -2233,16 +2237,17 @@ def _plan_files_local(self) -> Iterable[FileScanTask]: def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. - If the table comes from a REST catalog with scan planning enabled, - this will use server-side scan planning. Otherwise, it falls back - to local planning. + The planning strategy is pluggable: an injected ``scan_planner`` is used if set, otherwise + a planner is resolved per scan. The default resolution uses REST server-side planning when + the catalog supports it and local manifest planning otherwise. Returns: List of FileScanTasks that contain both data and delete files. """ - if self._should_use_server_side_planning(): - return self._plan_files_server_side() - return self._plan_files_local() + from pyiceberg.table.scan_planning import resolve_scan_planner + + planner = self.scan_planner or resolve_scan_planner(self) + return planner.plan_files(self) def to_arrow(self) -> pa.Table: """Read an Arrow table eagerly from this DataScan. diff --git a/pyiceberg/table/scan_planning.py b/pyiceberg/table/scan_planning.py new file mode 100644 index 0000000000..e5332d8fa3 --- /dev/null +++ b/pyiceberg/table/scan_planning.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Pluggable scan-planning strategies. + +A ``ScanPlanner`` turns a ``DataScan`` into the file scan tasks that back it. The Python (local) +and REST (server-side) strategies reproduce the existing planner selection exactly; the resolver +picks between them just as ``DataScan.plan_files`` used to. A planner can also be injected on a +``DataScan`` to override resolution, which is how the Rust strategy is opted into. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from collections.abc import Iterable + + from pyiceberg.table import DataScan, FileScanTask + + +@runtime_checkable +class ScanPlanner(Protocol): + """Plan the file scan tasks for a ``DataScan``.""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: ... + + +class LocalScanPlanner: + """Plan files locally by reading manifests (PyIceberg's default planner).""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + return scan._plan_files_local() + + +class RestScanPlanner: + """Plan files using REST server-side scan planning.""" + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + return scan._plan_files_server_side() + + +class RustScanPlanner: + """Placeholder for a pyiceberg-core planner that returns ``FileScanTask`` objects. + + Opt-in only: ``resolve_scan_planner`` never selects this, so it has to be injected on a + ``DataScan``. It is intentionally unimplemented: pyiceberg-core's native plan output exposes + only booleans/counts for predicate, partition data, and deletes, not the residual expression, + partition ``Record``, or delete ``DataFile`` objects needed to rebuild a faithful PyIceberg + ``FileScanTask`` — so reconstructing one here would silently drop the residual and break + filter-on-dropped-column scans. Use the fused ``PYICEBERG_RUST_SCAN_PLANNING`` read path, which + plans and reads natively and never round-trips through ``FileScanTask``. + """ + + def plan_files(self, scan: DataScan) -> Iterable[FileScanTask]: + raise NotImplementedError( + "Native pyiceberg-core planning to FileScanTask is not supported; " + "the native plan output does not expose the residual, partition data, or deletes " + "needed to rebuild a faithful FileScanTask. Use the fused PYICEBERG_RUST_SCAN_PLANNING " + "read path instead." + ) + + +def resolve_scan_planner(scan: DataScan) -> ScanPlanner: + """Pick the planner for ``scan``, reproducing the historical local/server-side selection. + + ``RustScanPlanner`` is never auto-selected; it is opt-in via ``DataScan.scan_planner``. + """ + if scan._should_use_server_side_planning(): + return RestScanPlanner() + return LocalScanPlanner() diff --git a/tests/table/test_scan_planning.py b/tests/table/test_scan_planning.py new file mode 100644 index 0000000000..dc04f66c50 --- /dev/null +++ b/tests/table/test_scan_planning.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Any + +import pytest + +from pyiceberg.table import Table +from pyiceberg.table.scan_planning import ( + LocalScanPlanner, + RestScanPlanner, + RustScanPlanner, + ScanPlanner, + resolve_scan_planner, +) + + +class _FakeCatalog: + def __init__(self, server_side: bool) -> None: + self._server_side = server_side + + def supports_server_side_planning(self) -> bool: + return self._server_side + + +class _FakeScan: + """Minimal stand-in for DataScan that records how the planner consulted it.""" + + def __init__(self, catalog: _FakeCatalog | None) -> None: + self.catalog = catalog + self.local_called = False + self.server_side_called = False + + def _should_use_server_side_planning(self) -> bool: + return self.catalog is not None and self.catalog.supports_server_side_planning() + + def _plan_files_local(self) -> list[Any]: + self.local_called = True + return ["local-task"] + + def _plan_files_server_side(self) -> list[Any]: + self.server_side_called = True + return ["server-task"] + + +def test_local_scan_planner_satisfies_protocol() -> None: + assert isinstance(LocalScanPlanner(), ScanPlanner) + assert isinstance(RestScanPlanner(), ScanPlanner) + + +def test_local_scan_planner_delegates_to_the_scan() -> None: + scan = _FakeScan(catalog=None) + + tasks = list(LocalScanPlanner().plan_files(scan)) # type: ignore[arg-type] + + assert tasks == ["local-task"] + assert scan.local_called and not scan.server_side_called + + +def test_rest_scan_planner_delegates_to_the_scan() -> None: + scan = _FakeScan(catalog=_FakeCatalog(server_side=True)) + + tasks = list(RestScanPlanner().plan_files(scan)) # type: ignore[arg-type] + + assert tasks == ["server-task"] + assert scan.server_side_called and not scan.local_called + + +def test_resolve_scan_planner_returns_rest_when_server_side_supported() -> None: + scan = _FakeScan(catalog=_FakeCatalog(server_side=True)) + + assert isinstance(resolve_scan_planner(scan), RestScanPlanner) # type: ignore[arg-type] + + +def test_resolve_scan_planner_returns_local_otherwise() -> None: + assert isinstance(resolve_scan_planner(_FakeScan(catalog=None)), LocalScanPlanner) # type: ignore[arg-type] + assert isinstance(resolve_scan_planner(_FakeScan(catalog=_FakeCatalog(server_side=False))), LocalScanPlanner) # type: ignore[arg-type] + + +def test_rust_scan_planner_is_an_honest_stub() -> None: + # Native plan output cannot rebuild a faithful FileScanTask (no residual / partition / deletes), + # so the planner refuses rather than silently dropping the residual. The fused read path is the + # supported native route. + with pytest.raises(NotImplementedError, match="faithful FileScanTask"): + list(RustScanPlanner().plan_files(_FakeScan(catalog=None))) # type: ignore[arg-type] + + +def test_data_scan_plan_files_uses_injected_planner(table_v2: Table) -> None: + """An injected planner overrides resolution on a real DataScan and is handed the scan itself.""" + seen: list[Any] = [] + + class _RecordingPlanner: + def plan_files(self, scan: Any) -> list[Any]: + seen.append(scan) + return ["injected-task"] + + scan = table_v2.scan().update(scan_planner=_RecordingPlanner()) + + tasks = list(scan.plan_files()) + + assert tasks == ["injected-task"] + assert seen == [scan] # the planner receives the DataScan itself, not a copied context + + +def test_data_scan_plan_files_resolves_local_by_default(table_v2: Table) -> None: + # No injected planner and no server-side catalog: resolution must pick the local planner. + scan = table_v2.scan() + + assert scan.scan_planner is None + assert isinstance(resolve_scan_planner(scan), LocalScanPlanner)