Skip to content

Commit cf2d756

Browse files
Jibolaclaude
andcommitted
PYTHON-5767 Implement DRIVERS-3427 phase 1 rollout changes
- Change MAX_RETRIES default from 5 to 2 - Add configurable maxAdaptiveRetries client option (default: 2) - Add enableOverloadRetargeting client option (default: false) - Add retry metadata ("retry": N) to outgoing command bodies on retry - Remove _retry_overload decorator from collection/database methods Co-Authored-By: Claude Code <noreply@anthropic.com>
1 parent 7acc87a commit cf2d756

12 files changed

Lines changed: 70 additions & 26 deletions

File tree

pymongo/asynchronous/collection.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
AsyncCursor,
5858
AsyncRawBatchCursor,
5959
)
60-
from pymongo.asynchronous.helpers import _retry_overload
6160
from pymongo.collation import validate_collation_or_none
6261
from pymongo.common import _ecoc_coll_name, _esc_coll_name
6362
from pymongo.errors import (
@@ -2228,7 +2227,6 @@ async def create_indexes(
22282227
return await self._create_indexes(indexes, session, **kwargs)
22292228

22302229
@_csot.apply
2231-
@_retry_overload
22322230
async def _create_indexes(
22332231
self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any
22342232
) -> list[str]:
@@ -2480,7 +2478,6 @@ async def drop_index(
24802478
await self._drop_index(index_or_name, session, comment, **kwargs)
24812479

24822480
@_csot.apply
2483-
@_retry_overload
24842481
async def _drop_index(
24852482
self,
24862483
index_or_name: _IndexKeyHint,
@@ -3104,7 +3101,6 @@ async def aggregate_raw_batches(
31043101
)
31053102

31063103
@_csot.apply
3107-
@_retry_overload
31083104
async def rename(
31093105
self,
31103106
new_name: str,

pymongo/asynchronous/database.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
3939
from pymongo.asynchronous.collection import AsyncCollection
4040
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
41-
from pymongo.asynchronous.helpers import _retry_overload
4241
from pymongo.common import _ecoc_coll_name, _esc_coll_name
4342
from pymongo.database_shared import _check_name, _CodecDocumentType
4443
from pymongo.errors import CollectionInvalid, InvalidOperation
@@ -479,7 +478,6 @@ async def watch(
479478
return change_stream
480479

481480
@_csot.apply
482-
@_retry_overload
483481
async def create_collection(
484482
self,
485483
name: str,
@@ -822,7 +820,6 @@ async def command(
822820
...
823821

824822
@_csot.apply
825-
@_retry_overload
826823
async def command(
827824
self,
828825
command: Union[str, MutableMapping[str, Any]],
@@ -959,7 +956,6 @@ async def inner(
959956
)
960957

961958
@_csot.apply
962-
@_retry_overload
963959
async def cursor_command(
964960
self,
965961
command: Union[str, MutableMapping[str, Any]],
@@ -1283,7 +1279,6 @@ async def inner(
12831279
return await self.client._retryable_write(False, inner, session, _Op.DROP)
12841280

12851281
@_csot.apply
1286-
@_retry_overload
12871282
async def drop_collection(
12881283
self,
12891284
name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]],

pymongo/asynchronous/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import socket
2323
import sys
2424
import time as time # noqa: PLC0414 # needed in sync version
25+
from contextvars import ContextVar
2526
from typing import (
2627
Any,
2728
Callable,
@@ -76,9 +77,13 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7677
return cast(F, inner)
7778

7879

79-
_MAX_RETRIES = 5
80+
_MAX_RETRIES = 2
8081
_BACKOFF_INITIAL = 0.1
8182
_BACKOFF_MAX = 10
83+
84+
# Context variable used to pass the current retry attempt number to conn.command()
85+
# so that retry metadata can be injected into outgoing command bodies.
86+
_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0)
8287
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
8388
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8489

pymongo/asynchronous/mongo_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession
7070
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
7171
from pymongo.asynchronous.helpers import (
72+
_RETRY_ATTEMPT,
7273
_RetryPolicy,
7374
_TokenBucket,
7475
)
@@ -895,7 +896,9 @@ def __init__(
895896
)
896897

897898
self._retry_policy = _RetryPolicy(
898-
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
899+
_TokenBucket(),
900+
attempts=self._options.max_adaptive_retries,
901+
adaptive_retry=self._options.adaptive_retries,
899902
)
900903

901904
self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name)
@@ -2820,6 +2823,7 @@ async def run(self) -> T:
28202823

28212824
while True:
28222825
self._check_last_error(check_csot=True)
2826+
retry_token = _RETRY_ATTEMPT.set(self._attempt_number)
28232827
try:
28242828
res = await self._read() if self._is_read else await self._write()
28252829
await self._retry_policy.record_success(self._attempt_number > 0)
@@ -2930,7 +2934,7 @@ async def run(self) -> T:
29302934
transaction.set_starting()
29312935
transaction.attempt = 0
29322936

2933-
if (
2937+
if self._client.options.enable_overload_retargeting and (
29342938
self._server is not None
29352939
and self._client.topology_description.topology_type_name == "Sharded"
29362940
or exc.has_error_label("SystemOverloadedError")
@@ -2946,6 +2950,8 @@ async def run(self) -> T:
29462950
else:
29472951
raise
29482952
await asyncio.sleep(delay)
2953+
finally:
2954+
_RETRY_ATTEMPT.reset(retry_token)
29492955

29502956
def _is_not_eligible_for_retry(self) -> bool:
29512957
"""Checks if the exchange is not eligible for retry"""

pymongo/asynchronous/pool.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from bson import DEFAULT_CODEC_OPTIONS
4040
from pymongo import _csot, helpers_shared
4141
from pymongo.asynchronous.client_session import _validate_session_write_concern
42-
from pymongo.asynchronous.helpers import _handle_reauth
42+
from pymongo.asynchronous.helpers import _RETRY_ATTEMPT, _handle_reauth
4343
from pymongo.asynchronous.network import command
4444
from pymongo.common import (
4545
MAX_BSON_SIZE,
@@ -395,6 +395,9 @@ async def command(
395395
if session:
396396
session._apply_to(spec, retryable_write, read_preference, self)
397397
self.send_cluster_time(spec, session, client)
398+
retry_attempt = _RETRY_ATTEMPT.get()
399+
if retry_attempt > 0:
400+
spec["retry"] = retry_attempt
398401
listeners = self.listeners if publish_events else None
399402
unacknowledged = bool(write_concern and not write_concern.acknowledged)
400403
if self.op_msg_enabled:

pymongo/client_options.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,16 @@ def __init__(
240240
if "adaptive_retries" in options
241241
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
242242
)
243+
self.__max_adaptive_retries = (
244+
options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES)
245+
if "max_adaptive_retries" in options
246+
else options.get("maxadaptiveretries", common.MAX_ADAPTIVE_RETRIES)
247+
)
248+
self.__enable_overload_retargeting = (
249+
options.get("enable_overload_retargeting", common.ENABLE_OVERLOAD_RETARGETING)
250+
if "enable_overload_retargeting" in options
251+
else options.get("enableoverloadretargeting", common.ENABLE_OVERLOAD_RETARGETING)
252+
)
243253

244254
@property
245255
def _options(self) -> Mapping[str, Any]:
@@ -359,3 +369,19 @@ def adaptive_retries(self) -> bool:
359369
.. versionadded:: 4.XX
360370
"""
361371
return self.__adaptive_retries
372+
373+
@property
374+
def max_adaptive_retries(self) -> int:
375+
"""The configured maxAdaptiveRetries option.
376+
377+
.. versionadded:: 4.XX
378+
"""
379+
return self.__max_adaptive_retries
380+
381+
@property
382+
def enable_overload_retargeting(self) -> bool:
383+
"""The configured enableOverloadRetargeting option.
384+
385+
.. versionadded:: 4.XX
386+
"""
387+
return self.__enable_overload_retargeting

pymongo/common.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@
143143
# Default value for adaptiveRetries
144144
ADAPTIVE_RETRIES = False
145145

146+
# Default value for maxAdaptiveRetries
147+
MAX_ADAPTIVE_RETRIES = 2
148+
149+
# Default value for enableOverloadRetargeting
150+
ENABLE_OVERLOAD_RETARGETING = False
151+
146152
# Auth mechanism properties that must raise an error instead of warning if they invalidate.
147153
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]
148154

@@ -776,6 +782,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
776782
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
777783
"authoidcallowedhosts": validate_list,
778784
"adaptive_retries": validate_boolean_or_string,
785+
"max_adaptive_retries": validate_non_negative_integer,
786+
"enable_overload_retargeting": validate_boolean_or_string,
779787
}
780788

781789
# Dictionary where keys are any URI option name, and values are the

pymongo/synchronous/collection.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
Cursor,
8989
RawBatchCursor,
9090
)
91-
from pymongo.synchronous.helpers import _retry_overload
9291
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
9392
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean
9493

@@ -2225,7 +2224,6 @@ def create_indexes(
22252224
return self._create_indexes(indexes, session, **kwargs)
22262225

22272226
@_csot.apply
2228-
@_retry_overload
22292227
def _create_indexes(
22302228
self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any
22312229
) -> list[str]:
@@ -2475,7 +2473,6 @@ def drop_index(
24752473
self._drop_index(index_or_name, session, comment, **kwargs)
24762474

24772475
@_csot.apply
2478-
@_retry_overload
24792476
def _drop_index(
24802477
self,
24812478
index_or_name: _IndexKeyHint,
@@ -3097,7 +3094,6 @@ def aggregate_raw_batches(
30973094
)
30983095

30993096
@_csot.apply
3100-
@_retry_overload
31013097
def rename(
31023098
self,
31033099
new_name: str,

pymongo/synchronous/database.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from pymongo.synchronous.change_stream import DatabaseChangeStream
4444
from pymongo.synchronous.collection import Collection
4545
from pymongo.synchronous.command_cursor import CommandCursor
46-
from pymongo.synchronous.helpers import _retry_overload
4746
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
4847

4948
if TYPE_CHECKING:
@@ -479,7 +478,6 @@ def watch(
479478
return change_stream
480479

481480
@_csot.apply
482-
@_retry_overload
483481
def create_collection(
484482
self,
485483
name: str,
@@ -822,7 +820,6 @@ def command(
822820
...
823821

824822
@_csot.apply
825-
@_retry_overload
826823
def command(
827824
self,
828825
command: Union[str, MutableMapping[str, Any]],
@@ -959,7 +956,6 @@ def inner(
959956
)
960957

961958
@_csot.apply
962-
@_retry_overload
963959
def cursor_command(
964960
self,
965961
command: Union[str, MutableMapping[str, Any]],
@@ -1280,7 +1276,6 @@ def inner(
12801276
return self.client._retryable_write(False, inner, session, _Op.DROP)
12811277

12821278
@_csot.apply
1283-
@_retry_overload
12841279
def drop_collection(
12851280
self,
12861281
name_or_collection: Union[str, Collection[_DocumentTypeArg]],

pymongo/synchronous/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import socket
2323
import sys
2424
import time as time # noqa: PLC0414 # needed in sync version
25+
from contextvars import ContextVar
2526
from typing import (
2627
Any,
2728
Callable,
@@ -76,9 +77,13 @@ def inner(*args: Any, **kwargs: Any) -> Any:
7677
return cast(F, inner)
7778

7879

79-
_MAX_RETRIES = 5
80+
_MAX_RETRIES = 2
8081
_BACKOFF_INITIAL = 0.1
8182
_BACKOFF_MAX = 10
83+
84+
# Context variable used to pass the current retry attempt number to conn.command()
85+
# so that retry metadata can be injected into outgoing command bodies.
86+
_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0)
8287
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
8388
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8489

0 commit comments

Comments
 (0)