Generic coding standards apply across all projects.
Curator Backend = FastAPI service managing CuratorBot jobs for uploading media to Wikimedia Commons. REST API + WebSocket endpoint for frontend; Celery workers handle background upload tasks.
Tech Stack: Python 3.13, FastAPI, SQLModel, Redis, Celery, MediaWikiClient (requests + mwoauth) | Deployment: Wikimedia Toolforge
poetry run web # FastAPI server (port 8000) - DO NOT RUN
poetry run worker # Celery worker - DO NOT RUN
poetry run pytest -q # Run tests
poetry run ty check # Type check
poetry run isort . # Sort imports
poetry run ruff format # Format code
poetry run ruff check # Run linter
poetry run alembic upgrade head # Apply migrationsProject-Specific Development Workflow: After backend tasks, run in order:
poetry run isort . && poetry run ruff format && poetry run ruff check && poetry run pytest -q && poetry run ty checkType-check specific files: poetry run ty check path/to/file.py
Layered: main.py (Routes) → handler*.py (Business Logic) → dal.py (Database) → models.py (SQLModel)
DB access via DAL functions using get_session() dependency.
recovery.py handles uploads stuck in queued state after Redis restart. Uses sentinel key (curator:started) — missing means Redis restarted. Queued uploads grouped by (userid, edit_group_id), OAuth token validated, then re-enqueued via enqueue_uploads(). Invalid tokens mark group uploads failed with session-expired message in single DB call. Called from main.py lifespan after Alembic migrations.
- Sentinel set atomically via
redis_client.set(SENTINEL_KEY, "1", nx=True)—Nonereturn means another instance claimed recovery, return immediately. Prevents double-enqueuing on concurrent startups.
Retry lets users + admins retry failed uploads. Creates new UploadRequest copies in new batch rather than modifying originals. Preserves original failed uploads for history/audit.
dal.reset_failed_uploads_to_new_batch()— user retrydal.retry_selected_uploads_to_new_batch()— admin retry- After enqueueing,
update_celery_task_id()called to enable cancellation admin_retry_uploadsinadmin.pycallsprocess_upload.apply_async()directly, bypassingenqueue_uploads()+ rate limiting — admin retries not rate limited
Redis = Celery broker + result backend. Redis restart destroys in-flight tasks — DB retains status="queued" records. Startup recovery (recovery.py) reconciles this.
tasks.py uses _requeue_or_fail(self, upload_id, worker_id, DELAYS, exc). Delay lists: STORAGE_ERROR_DELAYS = [300, 600, 900], HASH_LOCK_DELAYS = [60, 60, 60], SOURCE_CDN_DELAYS = [600]. To add new requeue type: add constant, add except NewError handler in process_one (reset to QUEUED, re-raise), add matching handler in process_upload calling _requeue_or_fail.
- Rate limits fetched from
action=query&meta=userinfo&uiprop=ratelimits|rightsviaMediaWikiClient.get_user_rate_limits()returning(ratelimits, rights) - Each upload costs 2 edit API calls (SDC apply + null edit), so edit limit halved before comparing against upload limit — more restrictive used
- Users with
noratelimitright exempt, receive_NO_RATE_LIMIT; MediaWiki returns"ratelimits": {}for exempt users (sysops, bots) — expected, not error - Redis tracks next available slot per user:
ratelimit:{userid}:next_available - Rate limit keys have no TTL — stale past-timestamp values handled by
max(0.0, next_available - current_time), TTL would incorrectly reset slot for large batches get_user_groups()inrecovery.pyused only to validate OAuth tokens on startup recovery — returned groups not used
upload_file_chunked = complete workflow (download, hash, duplicate check, upload, SDC). download_file fetches from external CDNs. Both use config.HTTP_RETRY_DELAYS but have separate retry loops — differ in error handling + response processing.
download_fileraisesSourceCdnError(notHTTPError) when all retries exhausted on 5xx — triggers task-level requeue. 4xx errors raiseHTTPErrordirectly, permanent failures.
_api_request()retries with exponential backoff (3 attempts: 0s, 1s, 3s delays)requests.exceptions.RequestException,badtokenCSRF errors, + OAuth "Nonce already used" errors trigger retries; other exceptions propagate immediately- API-level error responses (e.g.
{"error": {"code": "..."}}) returned as-is —_api_requestdoes not raise on them; callers must check for expected keys (e.g."query") before indexing. Helpers called from inside_api_request(e.g.get_csrf_token) must raiserequests.exceptions.RequestExceptionon missing keys —KeyErrorpropagates past the retry loop. _client= underlyingrequests.Session— close explicitly withclient._client.close()infinallyblock for short-lived clients- Category name normalization: MediaWiki treats
_+as equivalent. Category names from API/frontend often have underscores, wikitext always uses spaces. Normalize withsource.replace("_", " ")before regex-matching wikitext. - Fetching page wikitext:
action=query&prop=revisions&rvprop=content&rvslots=main&titles=<title>— content atpages[id].revisions[0].slots.main["*"] upload_file()acceptsfile_path: str(notfile_content: bytes) for memory efficiency- Chunked upload flow: chunks uploaded with
stash=1, then final commit publishes file - Per-chunk retryable errors: substring match on
error.codeforUploadStashFileException,uploadstash-exception,UploadChunkFileException,JobQueueError,internal_api_error_(prefix); alsobackend-fail-internalinerror.info— retry up to 4 attempts with 3/5/10s delays - Final commit retryable errors:
backend-fail-internal,JobQueueError,internal_api_error_(prefix) — same 4-attempt / 3/5/10s logic. Final commit has own separate retry loop inupload_file(). uploadstash-file-not-found,uploadstash-bad-path,stashfailed(with "No chunked upload session with this key") NOT retried inside_upload_chunk— stash gone, retrying useless. Surface asUploadResult(success=False);_upload_with_retryiningest.pyrestarts full upload from scratch.MAX_UPLOADSTASH_TRIEScontrols restart count. All three detected by_is_uploadstash_gone_error.stashfailed: Chunked upload is already completedhandled inupload_file()— when chunk retry gets this + validfile_keyexists, break chunk loop and proceed to final commit- Chunk
UploadResult.errorformat: alwaysf"{error_code}: {error_info}"— omitting error code means_is_uploadstash_gone_errorchecks silently fail to match - Duplicate detection:
warnings.duplicate+warnings.existshandled during stash phase.warnings.existsfetches remote SHA1, raisesDuplicateUploadErroronly if hashes match; SHA1 differs →UploadResult(success=False)with"File already exists with different content".warnings.nochangeon final commit raisesDuplicateUploadError. - SDC by title: use
sites=commonswiki&titles=File:Example.jpgnotids=M12345 - With
sites/titlesin wbgetentities, entity keyed by entity ID — extract withnext(iter(entities)) - Entity ID "-1" = non-existent file (raise); positive ID with "missing" key = file exists but no SDC (return None)
upload_file_chunkedacquires Redis hash lock after duplicate check; released intry/finally- All result processing in
upload_file()lives insideif file_key:block
- Auto-generated AsyncAPI models use kebab-case aliases (e.g.,
entity-type,numeric-id) - DAL's
_fix_sdc_keys()recursively maps snake_case to kebab-case for DB storage
db/commons_engine.py = read-only connection to Wikimedia Commons replica DB. Uses TOOL_TOOLSDB_USER/TOOL_TOOLSDB_PASSWORD credentials, connects to commonswiki.analytics.db.svc.wikimedia.cloud / commonswiki_p. Falls back to COMMONS_DB_URL env var for local dev.
Local dev: When TOOL_TOOLSDB_USER absent, auto-starts SSH tunnel through login.toolforge.org. Credentials read from bastion via ssh login.toolforge.org cat ~/replica.my.cnf. Tunnel uses start_new_session=True so it survives poetry file-watch restarts.
Replica schema: cl_to, pl_title, tl_title removed in MW 1.43–1.45, replaced with cl_target_id/pl_target_id/tl_target_id → linktarget(lt_id, lt_namespace, lt_title).
Commons replica query patterns (benchmarked against live replica):
NOT EXISTScatastrophically slow on large tables likecategorylinks(~4.5 min for 100 rows). UseLEFT JOIN ... WHERE p_target.page_id IS NULLinstead.EXPLAINunavailable on replica — benchmark withtime sql commonswiki.categorylinkshas nocl_to— join viacl_target_id→linktarget.DISTINCTwithLIMITdoes not allow early termination. Avoid on high-cardinality scans.
category table for wanted-categories counts:
category table = pre-computed per-category stats. Use cat_pages - cat_subcats - cat_files for regular pages. Query:
SELECT c.cat_title, c.cat_subcats, c.cat_files, (c.cat_pages - c.cat_subcats - c.cat_files) AS pages, c.cat_pages AS total FROM category c LEFT JOIN page p ON p.page_namespace = 14 AND p.page_title = c.cat_title WHERE p.page_id IS NULL ORDER BY c.cat_pages DESC LIMIT 100
Performance:
querypage=WantedCategoriesAPI disabled on Commons.- No index on
cat_pages—ORDER BY cat_pages DESCrequires full table sort (~7–16s). - Missing categories NOT correlated with largest categories — no indexed shortcut.
- Accepted approach: run ~7s query directly (admin-only, infrequent). Redis caching with Celery periodic task = known improvement path.
DuckDB text filter: Append lower(title) LIKE '%{filter_text.lower()}%' to conditions list. Use filter_text param name (not filter — shadows built-in).
test_wanted_categories_cache.py: Cache tests import module functions inside with patch("curator.db.wanted_categories_cache._get_duck_conn", ...) blocks. New tests must follow same inline-import pattern.
DuckDB concurrency: _duck_conn opens in write mode. threading.Lock (_duck_lock) serialises all access — DuckDB not thread-safe. Tests mock _get_duck_conn (not duckdb.connect) for both read + write.
Title format in wanted categories: DuckDB stores cat_title with underscores. Backend sends as-is. Display transformation in Vue template only. Exception: CATEGORY_CREATED_RESPONSE title from MediaWiki API uses spaces — normalize to underscores before sending (created_title.replace(" ", "_")).
Search indexed columns only — unindexed columns (especially JSONB fields) very slow on large datasets. Check model definitions for index=True.
- TypedDict for query return types — define private
TypedDict(prefix_) in DAL file when query returns mixed key types.dict[str, int | str]too broad. ImageHandlerenum — usehandler.valuewhen passing tostr-typed params (str(handler)returns'ImageHandler.MAPILLARY')- Pydantic objects — use
model_dump(mode="json")to convert to dict for DB storage - Optional asyncapi booleans — add
or False/or Truewhen passing to non-optionalboolparams
- Background streaming tasks sending on closing WebSocket receive
AssertionError(fromwebsockets.legacy.protocol._drain_helper), notWebSocketDisconnect— detect viasocket.client_state == WebSocketState.DISCONNECTED - Two-phase upload: creation phase (slices via
UPLOAD_SLICE) → subscription phase (SUBSCRIBE_BATCH)
Backend models auto-generated from ../frontend/asyncapi.json. When updating schema:
- Update
../frontend/asyncapi.json - Run
cd ../frontend && bun generate - Update all code constructing or accessing modified models
- Run tests:
poetry run pytest -q
Design Patterns:
- Group related fields into nested objects
- Boolean flags required with defaults, not Optional
- Required boolean fields generate as
Field(default=False)in Python models - Use
$ref: "#/components/schemas/ImageHandler"(nottype: string) for enum fields — produces typedImageHandlerenum in both Python + TypeScript
When adding new server messages, update all 4 locations in asyncapi.json (alphabetical order):
components/messages/— message definitioncomponents/schemas/— schema with type, data, nonce propertieschannels/wsChannel/messages/— channel message referenceoperations/ServerMessage/messages/— server operation reference
poetry run alembic revision --autogenerate -m "description"
poetry run alembic upgrade headWhen dropping column with both FK + index, autogenerated migration drops index first — MySQL requires FK dropped first. Verify order: drop_constraint (FK) → drop_index → drop_column.
- Tests in
tests/, BDD tests intests/bdd/ - All imports at top of test files (no inline imports)
- Use
patch()fromunittest.mock, notpytest.mock.patch - No nested function definitions in tests
- Complex mock behavior: module-level helper functions (prefix
_) passed toside_effect; stateful mocks:functools.partial(helper, state_dict) - pytest timeout:
0.25s— mocktime.sleepin any test using retry logic - When mocking
process_upload.apply_async(), queue viacall[1]["queue"], args viacall[1]["args"]
Use single with statement with comma-separated patches:
# Wrong
with patch("os.path.getsize", return_value=1000):
with patch("builtins.open", mock_open(read_data=b"data")):
result = func()
# Correct
with patch("os.path.getsize", return_value=1000), patch(
"builtins.open", mock_open(read_data=b"data")
):
result = func()When adding new async send methods to protocol.py: add sender.send_new_method = AsyncMock() to mock_sender fixture in tests/fixtures.py.
tests/fixtures.py autouse fixture mock_external_calls patches many external deps. Designed for BDD. If tests fail with strange errors, check if file needs adding to skip list in fixtures.py — fixture skips modules with "mediawiki", "geocoding", or "test_download" in name.
@given(..., target_fixture="fixture_name")to make fixture values available to other steps@when(..., target_fixture="result")to make return values available to@thenstepsglobalvariables for state across BDD step definitions- Handle expected exceptions in
@whensteps, return in result dict for@thenverification
mocker.patch("module.symbol") raises AttributeError if symbol doesn't exist. Use create=True:
mocker.patch("curator.core.handler.get_redlinks", return_value=[...], create=True)Patch instance methods with side_effect=lambda *args, **kwargs: {...} — avoids "got multiple values for argument" errors.
@app.task(bind=True)withautoretry_forstores pre-autoretry function astask._orig_run- Call with mock
self:process_upload._orig_run.__func__(mock_self, upload_id, edit_group_id) - When using
MagicMock()as taskself, set integer attributes explicitly:mock_self.max_retries = 3
Inspect _with_options on captured query: option_keys = [opt.path.path[1].key for opt in query._with_options]
Passing Model.relationship to selectinload causes ty error. Use class_mapper instead: class_mapper(Model).relationships["name"].class_attribute.
- Phabricator tasks linked by full URL:
https://phabricator.wikimedia.org/T123456 - Line-by-line review comments:
gh api repos/{owner}/{repo}/pulls/{number}/comments
- Functions always returning or raising: use
raise AssertionError("Unreachable")to satisfy type checker - AsyncAPI models auto-generated from
frontend/asyncapi.json— do not edit directly
commons.pyimports frommediawiki_client.py; never reverse- Shared exceptions go in
errors.py(no deps on other app modules)
list[str] | None = NonewithoutQuery()always resolves toNone- Use
Query(default=None):status: list[str] | None = Query(default=None) - Tests calling endpoint functions directly bypass HTTP parsing — use
TestClientto verify
session.exec(select(col(Table.column))).all()returnslist[value], notlist[Row]- Use
session.exec()for all queries;session.execute()deprecated in SQLModel sa_select(...)doesn't match SQLModel'ssession.exec()overloads — use# type: ignoreon those lines- When DAL calls
session.exec()multiple times, tests must usemock_session.exec.side_effect = [result1, result2]
# type: ignore[error-code]does NOT suppress errors — ty only honors bare# type: ignore