Skip to content

Adaptive split sizing using shuffle partitions for parallelism causes aggressive scaling #15988

@karuppayya

Description

@karuppayya

Apache Iceberg version

1.10.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

read.split.adaptive-size.enabled (default true) computes parallelism via SparkReadConf.parallelism():

  public int parallelism() {
      int defaultParallelism = spark.sparkContext().defaultParallelism();
      int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
      return Math.max(defaultParallelism, numShufflePartitions);
  }

This is enabled by default and is a table-level property only — there is no session-level config to disable it globally. Users must ALTER TABLE every table individually.

On a cluster (with say 2 executors/2 cores before upscaling), the parallelism becomes max(4, 200) = 200, creating many small tasks for modest scans. Spark's dynamic allocator requests new executors to service these tasks, but they finish on existing executors before new ones arrive, particularly adverse for streaming micro-batches (500 MB–1 GB range), where the inflation repeats every batch.

Example: spark.sql.shuffle.partitions = 200 (default):

Scan Size Split Size Tasks (actual) Tasks (128 MB default) Inflation
500 MB 16 MB 32 4
1 GB 16 MB 64 8
5 GB 25 MB 200 40
10 GB 50 MB 200 80 2.5×

Users commonly set shuffle partitions higher (400–2000) for shuffle-heavy workloads, relying on AQE coalescing to reduce actual shuffle parallelism at runtime. But Iceberg reads numShufflePartitions at scan-planning time before AQE runs and targets a parallelism the shuffle stage itself might not use.

Proposed fix

  • Session-level config — Add spark.sql.iceberg.adaptive-split-size.enabled so users can disable adaptive sizing globally without per-table ALTER TABLE.
  • Dedicated read parallelism config — Add spark.sql.iceberg.read.split.parallelism (defaulting to defaultParallelism) to give users explicit control over read-stage parallelism. This would make task count computation consistent and resilient to shuffle partition count changes( across Spark versions (e.g. Spark could change the 200 default) or from user-configured values for shuffle-heavy workloads).

Related discussion thread
cc: @aokolnychyi @rdblue @ConeyLiu Thoughts?

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions