[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407
[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407scwhittle wants to merge 2 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a capacity constraint on the data transmission queue within the gRPC data channel. By limiting the queue size, the system can better manage backpressure when the SDK produces data faster than the runner can consume it, preventing unbounded memory growth. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a fixed size for the _to_send queue in the _GrpcDataChannel class. Feedback suggests that the hardcoded maxsize=10 should be made configurable to avoid potential performance bottlenecks in high-throughput scenarios and notes a discrepancy between the PR description and the implementation of the receive queue size.
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
thanks, marking as draft for now |
|
waiting on author |
2a8a8d6 to
e8f5bf7
Compare
Previously an unbounded queue was used for pending data outputs to be sent over the fnapi to the runner. If outputs were being generated faster than the runner was consuming them, this would lead to memory growth and possible OOMs. This PR introduces a byte-limited queue data structure that is used instead to limit the # of bytes in the queue. This was preferred to just using a queue with max number of elements because the size of elements can vary greatly. For batch pipelines they are likely large while for stremaing pipelines there may be more small outputs.
There was a problem hiding this comment.
Code Review
This pull request introduces a ByteLimitedQueue to the Apache Beam Python SDK to limit queue capacity by both element count and total byte weight, which is then utilized in the _GrpcDataChannel to manage memory usage. The review feedback highlights several critical improvements for the new queue implementation: using time.monotonic() instead of time.time() for more robust timeout handling, ensuring compatibility with Python 3.13's queue.Queue.shutdown() to prevent deadlocks, and switching from notify() to notify_all() in the _get method to avoid potential producer starvation caused by heterogeneous item weights.
| def put(self, item, block=True, timeout=None): | ||
| item_size = max(1, self.weighing_fn(item)) | ||
| with self.not_full: | ||
| if not block: | ||
| if self._is_full(item_size): | ||
| raise queue.Full | ||
| elif timeout is None: | ||
| while self._is_full(item_size): | ||
| self.not_full.wait() | ||
| elif timeout < 0: | ||
| raise ValueError("'timeout' must be a non-negative number") | ||
| else: | ||
| endtime = time.time() + timeout | ||
| while self._is_full(item_size): | ||
| remaining = endtime - time.time() | ||
| if remaining <= 0.0: | ||
| raise queue.Full | ||
| self.not_full.wait(remaining) | ||
|
|
||
| self._put((item, item_size)) | ||
| self._byte_size += item_size | ||
| self.unfinished_tasks += 1 | ||
| self.not_empty.notify() |
There was a problem hiding this comment.
The put method has two issues:
- Robustness: It uses
time.time()for timeouts. It is recommended to usetime.monotonic()to avoid issues if the system clock is adjusted during the wait. - Python 3.13 Compatibility: Python 3.13 introduced
queue.Queue.shutdown(). Since this method is completely overridden, it should check for the shutdown state to avoid deadlocks during shutdown (asshutdown()notifies all waiters, but they will re-enter thewhileloop if the queue is still full).
def put(self, item, block=True, timeout=None):
item_size = max(1, self.weighing_fn(item))
with self.not_full:
if hasattr(self, '_is_shutdown') and self._is_shutdown():
raise queue.ShutDown
if not block:
if self._is_full(item_size):
raise queue.Full
elif timeout is None:
while self._is_full(item_size):
if hasattr(self, '_is_shutdown') and self._is_shutdown():
raise queue.ShutDown
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while self._is_full(item_size):
if hasattr(self, '_is_shutdown') and self._is_shutdown():
raise queue.ShutDown
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise queue.Full
self.not_full.wait(remaining)
self._put((item, item_size))
self._byte_size += item_size
self.unfinished_tasks += 1
self.not_empty.notify()There was a problem hiding this comment.
Supporting shutdown is unneeded for the use by data_plane. I just added a comment for now that shutdown does not work if called while there may be more put calls.
|
The failures I looked through were unrelated. Are the tests flaky? |
|
When backpressure happens, would that look to users as a processing lull? If so, that is less than ideal, since we probably should be throttling reading inputs, and we might mislead users that processing is slow. Ideally the Runner would be smart enough to not send them if it cannot consume outputs fast enough... Have you considered tracking unwritten data at the Data Channel? Something like #38422 perhaps (gemini assisted). I am a bit hesitant with us rewriting queue internals, but I can take a closer look if you think this approach would be better. |
tvalentyn
left a comment
There was a problem hiding this comment.
If we want to be conservative we could use regular queue and limit by 10k elements if that resolves this failure mode for the customer. but it might be helpful to have this utility class.
| return True | ||
| return False | ||
|
|
||
| def put(self, item, block=True, timeout=None): |
There was a problem hiding this comment.
It may be worth calling out in the docstring that we don't guarantee that the element will land as soon as enough space opens up, since https://github.com/python/cpython/blob/45c47d26c230086163ac1ef0aa9f955f794fb69c/Lib/queue.py#L214-L215 will wake up one random thread that is waiting, which might not be the one that can fit. this is fine as long as we are continuously emptying the queue
| """A queue.Queue that limits by both element count and total weight. | ||
|
|
||
| A single element is allowed to exceed the maxweight to avoid deadlock. | ||
| Note that shutdown is only supported after there are no more put calls. |
There was a problem hiding this comment.
is this because it is simpler or because we don't want to shutdown earlier to avoid data loss?
we could also raise NotImplemented error on shutdown if we don't want to implement this contract.
Previously an unbounded queue was used for pending data outputs to
be sent over the fnapi to the runner. If outputs were being generated
faster than the runner was consuming them, this would lead to memory
growth and possible OOMs. This PR introduces a byte-limited queue
data structure that is used instead to limit the # of bytes in the
queue. This was preferred to just using a queue with max number of
elements because the size of elements can vary greatly. For batch
pipelines they are likely large while for stremaing pipelines there
may be more small outputs.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.