Apache Iceberg version
1.10.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
read.split.adaptive-size.enabled (default true) computes parallelism via SparkReadConf.parallelism():
public int parallelism() {
int defaultParallelism = spark.sparkContext().defaultParallelism();
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
return Math.max(defaultParallelism, numShufflePartitions);
}
This is enabled by default and is a table-level property only — there is no session-level config to disable it globally. Users must ALTER TABLE every table individually.
On a cluster (with say 2 executors/2 cores before upscaling), the parallelism becomes max(4, 200) = 200, creating many small tasks for modest scans. Spark's dynamic allocator requests new executors to service these tasks, but they finish on existing executors before new ones arrive, particularly adverse for streaming micro-batches (500 MB–1 GB range), where the inflation repeats every batch.
Example: spark.sql.shuffle.partitions = 200 (default):
| Scan Size |
Split Size |
Tasks (actual) |
Tasks (128 MB default) |
Inflation |
| 500 MB |
16 MB |
32 |
4 |
8× |
| 1 GB |
16 MB |
64 |
8 |
8× |
| 5 GB |
25 MB |
200 |
40 |
5× |
| 10 GB |
50 MB |
200 |
80 |
2.5× |
Users commonly set shuffle partitions higher (400–2000) for shuffle-heavy workloads, relying on AQE coalescing to reduce actual shuffle parallelism at runtime. But Iceberg reads numShufflePartitions at scan-planning time before AQE runs and targets a parallelism the shuffle stage itself might not use.
Proposed fix
- Session-level config — Add
spark.sql.iceberg.adaptive-split-size.enabled so users can disable adaptive sizing globally without per-table ALTER TABLE.
- Dedicated read parallelism config — Add
spark.sql.iceberg.read.split.parallelism (defaulting to defaultParallelism) to give users explicit control over read-stage parallelism. This would make task count computation consistent and resilient to shuffle partition count changes( across Spark versions (e.g. Spark could change the 200 default) or from user-configured values for shuffle-heavy workloads).
Related discussion thread
cc: @aokolnychyi @rdblue @ConeyLiu Thoughts?
Willingness to contribute
Apache Iceberg version
1.10.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
read.split.adaptive-size.enabled(defaulttrue) computes parallelism viaSparkReadConf.parallelism():This is enabled by default and is a table-level property only — there is no session-level config to disable it globally. Users must ALTER TABLE every table individually.
On a cluster (with say 2 executors/2 cores before upscaling), the parallelism becomes max(4, 200) = 200, creating many small tasks for modest scans. Spark's dynamic allocator requests new executors to service these tasks, but they finish on existing executors before new ones arrive, particularly adverse for streaming micro-batches (500 MB–1 GB range), where the inflation repeats every batch.
Example:
spark.sql.shuffle.partitions= 200 (default):Users commonly set shuffle partitions higher (400–2000) for shuffle-heavy workloads, relying on AQE coalescing to reduce actual shuffle parallelism at runtime. But Iceberg reads numShufflePartitions at scan-planning time before AQE runs and targets a parallelism the shuffle stage itself might not use.
Proposed fix
spark.sql.iceberg.adaptive-split-size.enabledso users can disable adaptive sizing globally without per-table ALTER TABLE.spark.sql.iceberg.read.split.parallelism(defaulting to defaultParallelism) to give users explicit control over read-stage parallelism. This would make task count computation consistent and resilient to shuffle partition count changes( across Spark versions (e.g. Spark could change the 200 default) or from user-configured values for shuffle-heavy workloads).Related discussion thread
cc: @aokolnychyi @rdblue @ConeyLiu Thoughts?
Willingness to contribute