Skip to content

[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407

Open
scwhittle wants to merge 2 commits intoapache:masterfrom
scwhittle:queue_limit
Open

[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407
scwhittle wants to merge 2 commits intoapache:masterfrom
scwhittle:queue_limit

Conversation

@scwhittle
Copy link
Copy Markdown
Contributor

@scwhittle scwhittle commented May 7, 2026

Previously an unbounded queue was used for pending data outputs to
be sent over the fnapi to the runner. If outputs were being generated
faster than the runner was consuming them, this would lead to memory
growth and possible OOMs. This PR introduces a byte-limited queue
data structure that is used instead to limit the # of bytes in the
queue. This was preferred to just using a queue with max number of
elements because the size of elements can vary greatly. For batch
pipelines they are likely large while for stremaing pipelines there
may be more small outputs.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a capacity constraint on the data transmission queue within the gRPC data channel. By limiting the queue size, the system can better manage backpressure when the SDK produces data faster than the runner can consume it, preventing unbounded memory growth.

Highlights

  • Queue Capacity Limit: Updated the _to_send queue in _GrpcDataChannel to enforce a maximum size of 10 elements.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fixed size for the _to_send queue in the _GrpcDataChannel class. Feedback suggests that the hardcoded maxsize=10 should be made configurable to avoid potential performance bottlenecks in high-throughput scenarios and notes a discrepancy between the PR description and the implementation of the receive queue size.

Comment thread sdks/python/apache_beam/runners/worker/data_plane.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@tvalentyn tvalentyn marked this pull request as draft May 7, 2026 19:11
@tvalentyn
Copy link
Copy Markdown
Contributor

thanks, marking as draft for now

@tvalentyn
Copy link
Copy Markdown
Contributor

waiting on author

@scwhittle scwhittle changed the title Minimal change to push back to dofn generating output if the sdk is producing faster than runner is consuming. [Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages. May 8, 2026
@scwhittle scwhittle force-pushed the queue_limit branch 3 times, most recently from 2a8a8d6 to e8f5bf7 Compare May 8, 2026 13:56
Previously an unbounded queue was used for pending data outputs to
be sent over the fnapi to the runner. If outputs were being generated
faster than the runner was consuming them, this would lead to memory
growth and possible OOMs. This PR introduces a byte-limited queue
data structure that is used instead to limit the # of bytes in the
queue. This was preferred to just using a queue with max number of
elements because the size of elements can vary greatly.  For batch
pipelines they are likely large while for stremaing pipelines there
may be more small outputs.
@scwhittle scwhittle marked this pull request as ready for review May 8, 2026 14:10
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a ByteLimitedQueue to the Apache Beam Python SDK to limit queue capacity by both element count and total byte weight, which is then utilized in the _GrpcDataChannel to manage memory usage. The review feedback highlights several critical improvements for the new queue implementation: using time.monotonic() instead of time.time() for more robust timeout handling, ensuring compatibility with Python 3.13's queue.Queue.shutdown() to prevent deadlocks, and switching from notify() to notify_all() in the _get method to avoid potential producer starvation caused by heterogeneous item weights.

Comment on lines +63 to +85
def put(self, item, block=True, timeout=None):
item_size = max(1, self.weighing_fn(item))
with self.not_full:
if not block:
if self._is_full(item_size):
raise queue.Full
elif timeout is None:
while self._is_full(item_size):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.time() + timeout
while self._is_full(item_size):
remaining = endtime - time.time()
if remaining <= 0.0:
raise queue.Full
self.not_full.wait(remaining)

self._put((item, item_size))
self._byte_size += item_size
self.unfinished_tasks += 1
self.not_empty.notify()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The put method has two issues:

  1. Robustness: It uses time.time() for timeouts. It is recommended to use time.monotonic() to avoid issues if the system clock is adjusted during the wait.
  2. Python 3.13 Compatibility: Python 3.13 introduced queue.Queue.shutdown(). Since this method is completely overridden, it should check for the shutdown state to avoid deadlocks during shutdown (as shutdown() notifies all waiters, but they will re-enter the while loop if the queue is still full).
  def put(self, item, block=True, timeout=None):
    item_size = max(1, self.weighing_fn(item))
    with self.not_full:
      if hasattr(self, '_is_shutdown') and self._is_shutdown():
        raise queue.ShutDown
      if not block:
        if self._is_full(item_size):
          raise queue.Full
      elif timeout is None:
        while self._is_full(item_size):
          if hasattr(self, '_is_shutdown') and self._is_shutdown():
            raise queue.ShutDown
          self.not_full.wait()
      elif timeout < 0:
        raise ValueError("'timeout' must be a non-negative number")
      else:
        endtime = time.monotonic() + timeout
        while self._is_full(item_size):
          if hasattr(self, '_is_shutdown') and self._is_shutdown():
            raise queue.ShutDown
          remaining = endtime - time.monotonic()
          if remaining <= 0.0:
            raise queue.Full
          self.not_full.wait(remaining)

      self._put((item, item_size))
      self._byte_size += item_size
      self.unfinished_tasks += 1
      self.not_empty.notify()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting shutdown is unneeded for the use by data_plane. I just added a comment for now that shutdown does not work if called while there may be more put calls.

Comment thread sdks/python/apache_beam/utils/byte_limited_queue.py
@scwhittle
Copy link
Copy Markdown
Contributor Author

The failures I looked through were unrelated. Are the tests flaky?

@tvalentyn
Copy link
Copy Markdown
Contributor

tvalentyn commented May 8, 2026

When backpressure happens, would that look to users as a processing lull? If so, that is less than ideal, since we probably should be throttling reading inputs, and we might mislead users that processing is slow. Ideally the Runner would be smart enough to not send them if it cannot consume outputs fast enough...

Have you considered tracking unwritten data at the Data Channel? Something like #38422 perhaps (gemini assisted). I am a bit hesitant with us rewriting queue internals, but I can take a closer look if you think this approach would be better.

Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be conservative we could use regular queue and limit by 10k elements if that resolves this failure mode for the customer. but it might be helpful to have this utility class.

return True
return False

def put(self, item, block=True, timeout=None):
Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth calling out in the docstring that we don't guarantee that the element will land as soon as enough space opens up, since https://github.com/python/cpython/blob/45c47d26c230086163ac1ef0aa9f955f794fb69c/Lib/queue.py#L214-L215 will wake up one random thread that is waiting, which might not be the one that can fit. this is fine as long as we are continuously emptying the queue

Comment thread sdks/python/apache_beam/utils/byte_limited_queue.py
"""A queue.Queue that limits by both element count and total weight.

A single element is allowed to exceed the maxweight to avoid deadlock.
Note that shutdown is only supported after there are no more put calls.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this because it is simpler or because we don't want to shutdown earlier to avoid data loss?
we could also raise NotImplemented error on shutdown if we don't want to implement this contract.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants