From 6f879c02769e3e2fd9438ead9f3ede01a0da30be Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Tue, 14 Apr 2026 11:30:35 -0600 Subject: [PATCH 1/9] enable metrics for harness worker --- harness/README.md | 24 ++++++++++++++++++++++-- harness/main.py | 6 ++++++ harness/manifests/configmap-gcp.yaml | 4 ++-- harness/manifests/deployment.yaml | 5 +++++ 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/harness/README.md b/harness/README.md index 9cab07da..01346f4b 100644 --- a/harness/README.md +++ b/harness/README.md @@ -48,12 +48,32 @@ docker run -d \ python-sdk-harness ``` -You can also run the harness locally without Docker (from the repo root): +You can also run the harness locally without Docker (from the repo root). +A virtual environment keeps dependencies isolated from your system Python: ```bash -# Install the SDK in development mode (one-time) +# Check if a .venv already exists +ls .venv/bin/activate 2>/dev/null && echo "venv exists" || echo "no venv found" + +# Create one if needed (one-time) +python3 -m venv .venv + +# Activate it (required every time you open a new terminal) +source .venv/bin/activate # Windows: .venv\Scripts\activate + +# Verify you're in the venv (should print the .venv path) +which python3 + +# Install the SDK in development mode (one-time, or after pulling new deps) pip3 install -e . +# When you're done, deactivate the venv to restore your normal shell +deactivate +``` + +Once the venv is active and the SDK is installed: + +```bash export CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api export CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY export CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET diff --git a/harness/main.py b/harness/main.py index 123655ec..28a92b3d 100644 --- a/harness/main.py +++ b/harness/main.py @@ -6,6 +6,7 @@ from conductor.client.automator.task_handler import TaskHandler from conductor.client.configuration.configuration import Configuration +from conductor.client.configuration.settings.metrics_settings import MetricsSettings from conductor.client.http.models.task_def import TaskDef from conductor.client.orkes_clients import OrkesClients from conductor.client.workflow.conductor_workflow import ConductorWorkflow @@ -79,10 +80,15 @@ def main() -> None: worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms) workers.append(worker) + metrics_port = env_int_or_default("HARNESS_METRICS_PORT", 9991) + metrics_settings = MetricsSettings(http_port=metrics_port) + print(f"Prometheus metrics will be served on port {metrics_port}") + task_handler = TaskHandler( workers=workers, configuration=configuration, scan_for_annotated_workers=False, + metrics_settings=metrics_settings, ) workflow_executor = clients.get_workflow_executor() diff --git a/harness/manifests/configmap-gcp.yaml b/harness/manifests/configmap-gcp.yaml index 949b38b3..fd87f0ad 100644 --- a/harness/manifests/configmap-gcp.yaml +++ b/harness/manifests/configmap-gcp.yaml @@ -9,5 +9,5 @@ metadata: labels: app: python-sdk-harness-worker data: - CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api" - CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a" + CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.io/api" + CONDUCTOR_AUTH_KEY: "25b681c1-34ec-11f1-b07a-9601c7a63373" diff --git a/harness/manifests/deployment.yaml b/harness/manifests/deployment.yaml index 39133906..c183197f 100644 --- a/harness/manifests/deployment.yaml +++ b/harness/manifests/deployment.yaml @@ -51,6 +51,11 @@ spec: - name: HARNESS_POLL_INTERVAL_MS value: "100" + ports: + - name: metrics + containerPort: 9991 + protocol: TCP + resources: requests: memory: "256Mi" From 4f4f66b6ad0947edf3947e2059d09931a1f667fa Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 16 Apr 2026 09:56:20 -0600 Subject: [PATCH 2/9] adding some self-recovery for broken client, as worker was in a loop infinitely failing to process with a dead client --- src/conductor/client/automator/task_runner.py | 96 +++++++++++- src/conductor/client/http/rest.py | 65 +++++++- tests/unit/api_client/test_rest_client.py | 139 +++++++++++++++++- tests/unit/automator/test_task_runner.py | 114 ++++++++++++++ 4 files changed, 396 insertions(+), 18 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 1541976a..2012b87f 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -79,9 +79,25 @@ def __init__( ) ) - # Auth failure backoff tracking to prevent retry storms + # Auth failure backoff tracking to prevent retry storms. + # `_auth_failures` is capped at `_max_auth_failure_exp` so that + # 2**N cannot overflow on a long-lived worker whose auth is broken. + # The resulting sleep is further clamped to `_auth_backoff_cap_seconds`. self._auth_failures = 0 self._last_auth_failure = 0 + self._auth_backoff_cap_seconds = 60 + self._max_auth_failure_exp = 6 # 2**6 = 64s, sleep clamped to cap + + # Generic poll-failure backoff. This is distinct from the empty-poll + # adaptive delay (`_consecutive_empty_polls`) and from the auth-error + # backoff above. It kicks in when batch_poll raises an exception + # (server 5xx, NGINX 502/504 under load, DNS hiccup, a closed httpx + # client that couldn't heal, etc.) so we don't hot-loop the log with + # stack traces while waiting for the server to recover. + self._poll_failures = 0 + self._last_poll_failure = 0 + self._poll_backoff_cap_seconds = 120 # max 2 minutes between retries + self._max_poll_failure_exp = 7 # 2**7 = 128s, sleep clamped to cap # Thread pool for concurrent task execution # thread_count from worker configuration controls concurrency @@ -543,15 +559,33 @@ def __batch_poll_tasks(self, count: int) -> list: logger.debug("Stop polling task for: %s", task_definition_name) return [] - # Apply exponential backoff if we have recent auth failures + # Apply exponential backoff if we have recent auth failures. if self._auth_failures > 0: now = time.time() - backoff_seconds = min(2 ** self._auth_failures, 60) + backoff_seconds = min( + 2 ** min(self._auth_failures, self._max_auth_failure_exp), + self._auth_backoff_cap_seconds, + ) time_since_last_failure = now - self._last_auth_failure if time_since_last_failure < backoff_seconds: time.sleep(0.1) return [] + # Apply exponential backoff for generic poll failures (5xx, network + # errors, closed-client runtime errors that couldn't self-heal, etc.). + # Bounded at `_poll_backoff_cap_seconds` (2 min) to avoid log floods + # without giving up on recovery. + if self._poll_failures > 0: + now = time.time() + backoff_seconds = min( + 2 ** min(self._poll_failures, self._max_poll_failure_exp), + self._poll_backoff_cap_seconds, + ) + time_since_last_failure = now - self._last_poll_failure + if time_since_last_failure < backoff_seconds: + time.sleep(0.1) + return [] + # Publish PollStarted event (metrics collector will handle via event) self.event_dispatcher.publish(PollStarted( task_type=task_definition_name, @@ -583,15 +617,20 @@ def __batch_poll_tasks(self, count: int) -> list: tasks_received=len(tasks) if tasks else 0 )) - # Success - reset auth failure counter (any successful HTTP response means auth is working) + # Success - reset both failure counters (any successful HTTP + # response means auth and connectivity are working). self._auth_failures = 0 + self._poll_failures = 0 return tasks if tasks else [] except AuthorizationException as auth_exception: self._auth_failures += 1 self._last_auth_failure = time.time() - backoff_seconds = min(2 ** self._auth_failures, 60) + backoff_seconds = min( + 2 ** min(self._auth_failures, self._max_auth_failure_exp), + self._auth_backoff_cap_seconds, + ) # Publish PollFailure event (metrics collector will handle via event) self.event_dispatcher.publish(PollFailure( @@ -619,10 +658,51 @@ def __batch_poll_tasks(self, count: int) -> list: duration_ms=(time.time() - start_time) * 1000, cause=e )) - logger.error( - "Failed to batch poll task for: %s, reason: %s", + + # Bump the poll-failure counter so the next poll waits with + # exponential backoff instead of hot-looping on a broken server + # or connection. + self._poll_failures += 1 + self._last_poll_failure = time.time() + backoff_seconds = min( + 2 ** min(self._poll_failures, self._max_poll_failure_exp), + self._poll_backoff_cap_seconds, + ) + + # Belt-and-suspenders: if the underlying httpx client got closed + # and rest.request() couldn't heal it (e.g. because the error + # arrived as a non-RuntimeError), nudge it here. The rest client + # exposes `_is_client_closed` and `_reset_connection` for this. + try: + rest_client = getattr( + getattr(self.task_client, "api_client", None), + "rest_client", + None, + ) + if rest_client is not None and getattr(rest_client, "_is_client_closed", lambda: False)(): + logger.warning( + "rest_client was closed after poll failure; resetting" + ) + rest_client._reset_connection() + except Exception: + # Healing is best-effort; never let it mask the original error. + pass + + # Log a single-line warning at a modest level to avoid drowning + # ops in tracebacks when the server is flapping. Full traceback + # goes to debug for when operators need it. + logger.warning( + "Failed to batch poll task for: %s (failure #%d). Will retry with exponential backoff (%ss). Reason: %s: %s", task_definition_name, - traceback.format_exc() + self._poll_failures, + backoff_seconds, + type(e).__name__, + e, + ) + logger.debug( + "batch poll failure traceback for %s:\n%s", + task_definition_name, + traceback.format_exc(), ) return [] diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index aedcbc95..1ed570a2 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -1,11 +1,26 @@ import io import json +import logging import os import re import httpx from six.moves.urllib.parse import urlencode +logger = logging.getLogger(__name__) + +# Substrings that indicate httpx has marked its Client as closed and is +# refusing further requests. httpx raises a plain RuntimeError in this case +# (e.g. "Cannot send a request, as the client has been closed."), so we +# detect it by message rather than exception type. +_CLOSED_CLIENT_MARKERS = ("has been closed", "is closed") + + +def _is_closed_client_error(exc: BaseException) -> bool: + """Return True if the exception indicates the httpx client is closed.""" + msg = str(exc) if exc else "" + return any(marker in msg for marker in _CLOSED_CLIENT_MARKERS) + class RESTResponse(io.IOBase): @@ -82,15 +97,34 @@ def _create_default_httpx_client(self) -> httpx.Client: http2=bool(self._http2_enabled) ) + def _is_client_closed(self) -> bool: + """Return True if the underlying httpx client is closed. + + We check the client's `is_closed` flag defensively because not every + mock or subclass exposes it. + """ + try: + return bool(getattr(self.connection, "is_closed", False)) + except Exception: + return False + def _reset_connection(self) -> None: - if not getattr(self, "_owns_connection", False): - return + """Close the current httpx client (if any) and create a fresh one. + + Previously this would silently no-op when `_owns_connection` was + False, which meant an externally-provided `httpx.Client` that got + closed could never be healed. We now take ownership of the + replacement client so self-healing also works in that path. + """ try: if getattr(self, "connection", None) is not None: self.connection.close() except Exception: pass + if getattr(self, "_http2_enabled", None) is None: + self._http2_enabled = self._is_http2_enabled() self.connection = self._create_default_httpx_client() + self._owns_connection = True def __del__(self): """Cleanup httpx client on object destruction.""" @@ -152,6 +186,16 @@ def request(self, method, url, query_params=None, headers=None, if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' + # Proactively heal a client that is already closed before we even try + # to send. This avoids unnecessary tracebacks from httpx and keeps the + # worker self-healing when the parent process (or a fork-related + # cleanup) has closed our underlying client out from under us. + if self._is_client_closed(): + logger.warning( + "httpx client was closed before request; re-establishing a fresh client" + ) + self._reset_connection() + for attempt in range(2): try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` @@ -190,12 +234,27 @@ def request(self, method, url, query_params=None, headers=None, # A stale/broken keep-alive connection can cause protocol errors (esp. with HTTP/2). # Reset the client to recover without requiring process restart. # Only auto-retry idempotent methods to avoid duplicating side effects. - if attempt == 0 and self._owns_connection: + if attempt == 0: + logger.warning("httpx protocol error; re-establishing client: %s", e) self._reset_connection() if method in idempotent_methods: continue msg = f"Protocol error: {e}" raise ApiException(status=0, reason=msg) + except RuntimeError as e: + # httpx raises a plain RuntimeError ("Cannot send a request, as the + # client has been closed.") once its Client is closed. This can + # happen after a fork, an errant cleanup, or a GC of a sibling + # RESTClientObject. Heal and retry once for idempotent methods. + if attempt == 0 and _is_closed_client_error(e): + logger.warning( + "httpx client was closed mid-request; re-establishing and retrying" + ) + self._reset_connection() + if method in idempotent_methods: + continue + msg = f"Runtime error: {e}" + raise ApiException(status=0, reason=msg) except httpx.TimeoutException as e: msg = f"Request timeout: {e}" raise ApiException(status=0, reason=msg) diff --git a/tests/unit/api_client/test_rest_client.py b/tests/unit/api_client/test_rest_client.py index 3b205287..ee95466a 100644 --- a/tests/unit/api_client/test_rest_client.py +++ b/tests/unit/api_client/test_rest_client.py @@ -6,6 +6,15 @@ from conductor.client.http import rest +def _ok_response(): + response = MagicMock() + response.status_code = 200 + response.reason_phrase = "OK" + response.headers = {} + response.text = "" + return response + + class TestRESTClientObject(unittest.TestCase): @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): @@ -14,13 +23,7 @@ def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): mock_create_client.side_effect = [first_client, second_client] first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") - - response = MagicMock() - response.status_code = 200 - response.reason_phrase = "OK" - response.headers = {} - response.text = "" - second_client.request.return_value = response + second_client.request.return_value = _ok_response() client = rest.RESTClientObject(connection=None) result = client.request("GET", "http://example", query_params={"a": "b"}) @@ -29,3 +32,125 @@ def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): self.assertTrue(first_client.close.called) self.assertEqual(result.status, 200) + def test_is_closed_client_error_recognises_httpx_messages(self): + self.assertTrue( + rest._is_closed_client_error( + RuntimeError("Cannot send a request, as the client has been closed.") + ) + ) + self.assertTrue( + rest._is_closed_client_error(RuntimeError("The transport is closed")) + ) + self.assertFalse(rest._is_closed_client_error(RuntimeError("something else"))) + self.assertFalse(rest._is_closed_client_error(None)) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_heals_when_client_already_closed_before_request(self, mock_create_client): + """If `is_closed` is already True we should reset before sending.""" + first_client = MagicMock() + first_client.is_closed = True # pretend it was closed mid-session + second_client = MagicMock() + second_client.is_closed = False + second_client.request.return_value = _ok_response() + mock_create_client.side_effect = [first_client, second_client] + + client = rest.RESTClientObject(connection=None) + result = client.request("GET", "http://example") + + # The closed client must have been replaced and never used to send. + self.assertFalse(first_client.request.called) + self.assertTrue(first_client.close.called) + self.assertTrue(second_client.request.called) + self.assertEqual(result.status, 200) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_heals_on_runtime_error_closed_client_and_retries_get(self, mock_create_client): + """The new RuntimeError branch should reset and retry idempotent calls.""" + first_client = MagicMock() + first_client.is_closed = False + first_client.request.side_effect = RuntimeError( + "Cannot send a request, as the client has been closed." + ) + second_client = MagicMock() + second_client.is_closed = False + second_client.request.return_value = _ok_response() + mock_create_client.side_effect = [first_client, second_client] + + client = rest.RESTClientObject(connection=None) + result = client.request("GET", "http://example") + + self.assertEqual(first_client.request.call_count, 1) + self.assertTrue(first_client.close.called) + self.assertEqual(second_client.request.call_count, 1) + self.assertEqual(result.status, 200) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_non_idempotent_post_does_not_auto_retry_after_close(self, mock_create_client): + """POST is not idempotent; we must reset the client but surface the error.""" + first_client = MagicMock() + first_client.is_closed = False + first_client.request.side_effect = RuntimeError( + "Cannot send a request, as the client has been closed." + ) + second_client = MagicMock() + second_client.is_closed = False + mock_create_client.side_effect = [first_client, second_client] + + client = rest.RESTClientObject(connection=None) + + with self.assertRaises(rest.ApiException) as ctx: + client.request("POST", "http://example", body={"x": 1}) + + # First attempt sent, reset happened, but no retry for POST. + self.assertEqual(first_client.request.call_count, 1) + self.assertTrue(first_client.close.called) + self.assertEqual(second_client.request.call_count, 0) + self.assertIn("Runtime error", str(ctx.exception)) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_unrelated_runtime_error_is_not_retried(self, mock_create_client): + """RuntimeErrors that aren't about a closed client must not trigger heal-retry.""" + first_client = MagicMock() + first_client.is_closed = False + first_client.request.side_effect = RuntimeError("something totally unrelated") + second_client = MagicMock() + mock_create_client.side_effect = [first_client, second_client] + + client = rest.RESTClientObject(connection=None) + + with self.assertRaises(rest.ApiException): + client.request("GET", "http://example") + + self.assertEqual(first_client.request.call_count, 1) + self.assertFalse(first_client.close.called) + self.assertEqual(mock_create_client.call_count, 1) # no replacement created + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_reset_connection_heals_externally_provided_connection(self, mock_create_client): + """Previously `_reset_connection` silently no-op'd for externally-provided + connections. With the fix it should close the old one and create a fresh + client that we own.""" + external = MagicMock() + external.is_closed = True + replacement = MagicMock() + replacement.is_closed = False + mock_create_client.return_value = replacement + + client = rest.RESTClientObject(connection=external) + self.assertFalse(client._owns_connection) + + client._reset_connection() + + self.assertTrue(external.close.called) + self.assertIs(client.connection, replacement) + self.assertTrue(client._owns_connection) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_is_client_closed_defensive_on_missing_attribute(self, mock_create_client): + """Mocks or subclasses that don't expose `is_closed` should be treated as open.""" + stub_client = MagicMock(spec=[]) # no `is_closed` attr + mock_create_client.return_value = stub_client + + client = rest.RESTClientObject(connection=None) + self.assertFalse(client._is_client_closed()) + diff --git a/tests/unit/automator/test_task_runner.py b/tests/unit/automator/test_task_runner.py index c7399227..9f1d5eef 100644 --- a/tests/unit/automator/test_task_runner.py +++ b/tests/unit/automator/test_task_runner.py @@ -377,3 +377,117 @@ def test_valid_domain_passed_to_poll(self): # 'domain' SHOULD be in the kwargs with value 'production' self.assertIn('domain', call_args.kwargs) self.assertEqual(call_args.kwargs['domain'], 'production') + + # -------- Poll-failure backoff -------- + + @patch('time.sleep', Mock(return_value=None)) + def test_poll_failure_increments_counter_and_records_time(self): + """Any non-auth exception from batch_poll must bump the poll-failure + counter so the next poll backs off.""" + task_runner = self.__get_valid_task_runner() + self.assertEqual(task_runner._poll_failures, 0) + + with patch.object(TaskResourceApi, 'batch_poll', side_effect=Exception("boom")): + result = task_runner._TaskRunner__batch_poll_tasks(1) + + self.assertEqual(result, []) + self.assertEqual(task_runner._poll_failures, 1) + self.assertGreater(task_runner._last_poll_failure, 0) + + @patch('time.sleep', Mock(return_value=None)) + def test_poll_failure_backoff_skips_batch_poll_within_window(self): + """Within the backoff window we should return [] without calling batch_poll.""" + task_runner = self.__get_valid_task_runner() + task_runner._poll_failures = 3 # 2**3 = 8s window + task_runner._last_poll_failure = time.time() + + with patch.object(TaskResourceApi, 'batch_poll') as mock_batch_poll: + result = task_runner._TaskRunner__batch_poll_tasks(1) + + self.assertEqual(result, []) + mock_batch_poll.assert_not_called() + + @patch('time.sleep', Mock(return_value=None)) + def test_poll_failure_backoff_allows_retry_after_window(self): + """Once the backoff window elapses we should actually call batch_poll again.""" + task_runner = self.__get_valid_task_runner() + task_runner._poll_failures = 1 # 2**1 = 2s window + task_runner._last_poll_failure = time.time() - 10 # long past + + with patch.object(TaskResourceApi, 'batch_poll', return_value=[]) as mock_batch_poll: + task_runner._TaskRunner__batch_poll_tasks(1) + + mock_batch_poll.assert_called_once() + + def test_poll_failure_backoff_is_capped(self): + """Runaway failure counters must not produce unbounded backoff.""" + task_runner = self.__get_valid_task_runner() + # Pretend we've been failing for a long time. + task_runner._poll_failures = 10_000 + task_runner._last_poll_failure = time.time() - 10_000 + + cap = task_runner._poll_backoff_cap_seconds + exp_cap = task_runner._max_poll_failure_exp + self.assertLessEqual(2 ** min(task_runner._poll_failures, exp_cap), cap * 4) + # The sleep the code computes must never exceed the cap (2 min). + backoff = min( + 2 ** min(task_runner._poll_failures, exp_cap), + cap, + ) + self.assertLessEqual(backoff, 120) + + @patch('time.sleep', Mock(return_value=None)) + def test_successful_poll_clears_both_failure_counters(self): + """A successful response means auth AND connectivity are fine.""" + task_runner = self.__get_valid_task_runner() + task_runner._auth_failures = 3 + task_runner._poll_failures = 4 + + with patch.object(TaskResourceApi, 'batch_poll', return_value=[]): + task_runner._TaskRunner__batch_poll_tasks(1) + + self.assertEqual(task_runner._auth_failures, 0) + self.assertEqual(task_runner._poll_failures, 0) + + def test_auth_failure_backoff_is_capped(self): + """The existing auth backoff should also have a hard upper bound.""" + task_runner = self.__get_valid_task_runner() + task_runner._auth_failures = 10_000 + cap = task_runner._auth_backoff_cap_seconds + exp_cap = task_runner._max_auth_failure_exp + backoff = min( + 2 ** min(task_runner._auth_failures, exp_cap), + cap, + ) + self.assertLessEqual(backoff, cap) + self.assertLessEqual(backoff, 60) + + @patch('time.sleep', Mock(return_value=None)) + def test_poll_failure_resets_closed_rest_client(self): + """When the rest client reports it's closed at the time of a poll + failure we should nudge it back to life (belt-and-suspenders for cases + that never hit rest.request's own heal path).""" + task_runner = self.__get_valid_task_runner() + + fake_rest = Mock() + fake_rest._is_client_closed.return_value = True + task_runner.task_client.api_client.rest_client = fake_rest + + with patch.object(TaskResourceApi, 'batch_poll', side_effect=Exception("boom")): + task_runner._TaskRunner__batch_poll_tasks(1) + + fake_rest._reset_connection.assert_called_once() + + @patch('time.sleep', Mock(return_value=None)) + def test_poll_failure_does_not_reset_healthy_rest_client(self): + """If the rest client looks fine we should not churn it on every error.""" + task_runner = self.__get_valid_task_runner() + + fake_rest = Mock() + fake_rest._is_client_closed.return_value = False + task_runner.task_client.api_client.rest_client = fake_rest + + with patch.object(TaskResourceApi, 'batch_poll', side_effect=Exception("boom")): + task_runner._TaskRunner__batch_poll_tasks(1) + + fake_rest._reset_connection.assert_not_called() From 71f2e03c2e46ca734d3ce9245bb922924ba02d3c Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 16 Apr 2026 10:13:11 -0600 Subject: [PATCH 3/9] use locking to prevent a flood of errors related to resetting client connection on a dead client that is in use by multiple threads --- src/conductor/client/automator/task_runner.py | 16 +- src/conductor/client/http/rest.py | 92 ++++++--- tests/unit/api_client/test_rest_client.py | 193 ++++++++++++++++-- 3 files changed, 247 insertions(+), 54 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 2012b87f..9797da25 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -671,8 +671,10 @@ def __batch_poll_tasks(self, count: int) -> list: # Belt-and-suspenders: if the underlying httpx client got closed # and rest.request() couldn't heal it (e.g. because the error - # arrived as a non-RuntimeError), nudge it here. The rest client - # exposes `_is_client_closed` and `_reset_connection` for this. + # arrived as a non-RuntimeError), nudge it here. Pass the current + # connection as `expected` so concurrent threads racing to heal + # can't cause a reset storm: only the first caller per client + # generation actually replaces it. try: rest_client = getattr( getattr(self.task_client, "api_client", None), @@ -680,10 +682,12 @@ def __batch_poll_tasks(self, count: int) -> list: None, ) if rest_client is not None and getattr(rest_client, "_is_client_closed", lambda: False)(): - logger.warning( - "rest_client was closed after poll failure; resetting" - ) - rest_client._reset_connection() + current_conn = getattr(rest_client, "connection", None) + reset = rest_client._reset_connection(expected=current_conn) + if reset: + logger.warning( + "rest_client was closed after poll failure; reset" + ) except Exception: # Healing is best-effort; never let it mask the original error. pass diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index 1ed570a2..fee90edf 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -3,6 +3,7 @@ import logging import os import re +import threading import httpx from six.moves.urllib.parse import urlencode @@ -61,6 +62,10 @@ def getheaders(self): class RESTClientObject(object): def __init__(self, connection=None): + # Serializes self-healing resets so that a thundering herd of threads + # discovering the same broken connection produces at most ONE real + # reset + warning line, not N. + self._reset_lock = threading.Lock() if connection is None: self._http2_enabled = self._is_http2_enabled() self.connection = self._create_default_httpx_client() @@ -108,23 +113,40 @@ def _is_client_closed(self) -> bool: except Exception: return False - def _reset_connection(self) -> None: + def _reset_connection(self, expected=None) -> bool: """Close the current httpx client (if any) and create a fresh one. - Previously this would silently no-op when `_owns_connection` was - False, which meant an externally-provided `httpx.Client` that got - closed could never be healed. We now take ownership of the - replacement client so self-healing also works in that path. + This is a thread-safe compare-and-swap: + + - If `expected` is provided, the reset is only performed when the + current `self.connection` is still that exact client instance. + This is the thundering-herd guard: when thread A discovers the + shared client is broken and heals it, threads B/C/D whose + in-flight requests on the OLD client also fail will try to heal + too. With `expected=the_old_client` they'll see the connection + has already moved on and no-op instead of closing thread A's + freshly-built replacement. + - If `expected` is None, the reset always happens (legacy callers). + + Returns True if this call actually replaced the client, False if + another thread had already healed it. Callers can use the return + value to decide whether to emit a WARNING or drop to DEBUG. """ - try: - if getattr(self, "connection", None) is not None: - self.connection.close() - except Exception: - pass - if getattr(self, "_http2_enabled", None) is None: - self._http2_enabled = self._is_http2_enabled() - self.connection = self._create_default_httpx_client() - self._owns_connection = True + with self._reset_lock: + current = getattr(self, "connection", None) + if expected is not None and current is not expected: + # Someone else already healed since our caller last looked. + return False + try: + if current is not None: + current.close() + except Exception: + pass + if getattr(self, "_http2_enabled", None) is None: + self._http2_enabled = self._is_http2_enabled() + self.connection = self._create_default_httpx_client() + self._owns_connection = True + return True def __del__(self): """Cleanup httpx client on object destruction.""" @@ -190,13 +212,20 @@ def request(self, method, url, query_params=None, headers=None, # to send. This avoids unnecessary tracebacks from httpx and keeps the # worker self-healing when the parent process (or a fork-related # cleanup) has closed our underlying client out from under us. + # Use compare-and-swap with a snapshot so that concurrent proactive + # heals don't churn the client. + pre_check_client = self.connection if self._is_client_closed(): - logger.warning( - "httpx client was closed before request; re-establishing a fresh client" - ) - self._reset_connection() + if self._reset_connection(expected=pre_check_client): + logger.warning( + "httpx client was closed before request; re-established a fresh client" + ) for attempt in range(2): + # Snapshot the client we're about to use so that if the request + # fails we can ask for a reset only if the client is still this + # one (i.e. nobody else healed it in the meantime). + client_at_send = self.connection try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: @@ -209,7 +238,7 @@ def request(self, method, url, query_params=None, headers=None, request_body = json.dumps(body) if isinstance(body, str): request_body = request_body.strip('"') - r = self.connection.request( + r = client_at_send.request( method, request_url, content=request_body, timeout=timeout, @@ -223,7 +252,7 @@ def request(self, method, url, query_params=None, headers=None, raise ApiException(status=0, reason=msg) # For `GET`, `HEAD` else: - r = self.connection.request( + r = client_at_send.request( method, url, params=query_params, timeout=timeout, @@ -235,8 +264,14 @@ def request(self, method, url, query_params=None, headers=None, # Reset the client to recover without requiring process restart. # Only auto-retry idempotent methods to avoid duplicating side effects. if attempt == 0: - logger.warning("httpx protocol error; re-establishing client: %s", e) - self._reset_connection() + reset_done = self._reset_connection(expected=client_at_send) + if reset_done: + logger.warning("httpx protocol error; re-established client: %s", e) + else: + logger.debug( + "httpx protocol error on stale client (already healed by another thread): %s", + e, + ) if method in idempotent_methods: continue msg = f"Protocol error: {e}" @@ -247,10 +282,15 @@ def request(self, method, url, query_params=None, headers=None, # happen after a fork, an errant cleanup, or a GC of a sibling # RESTClientObject. Heal and retry once for idempotent methods. if attempt == 0 and _is_closed_client_error(e): - logger.warning( - "httpx client was closed mid-request; re-establishing and retrying" - ) - self._reset_connection() + reset_done = self._reset_connection(expected=client_at_send) + if reset_done: + logger.warning( + "httpx client was closed mid-request; re-established and retrying" + ) + else: + logger.debug( + "httpx client closed on stale reference (already healed by another thread); retrying" + ) if method in idempotent_methods: continue msg = f"Runtime error: {e}" diff --git a/tests/unit/api_client/test_rest_client.py b/tests/unit/api_client/test_rest_client.py index ee95466a..e36d7457 100644 --- a/tests/unit/api_client/test_rest_client.py +++ b/tests/unit/api_client/test_rest_client.py @@ -1,3 +1,4 @@ +import threading import unittest from unittest.mock import MagicMock, patch @@ -15,11 +16,22 @@ def _ok_response(): return response +def _mock_client(is_closed=False): + """A MagicMock httpx.Client with an explicit `is_closed` flag. + + Default MagicMock attributes are truthy, which would incorrectly trigger + the pre-check heal path. Set the flag explicitly for test clarity. + """ + c = MagicMock() + c.is_closed = is_closed + return c + + class TestRESTClientObject(unittest.TestCase): @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): - first_client = MagicMock() - second_client = MagicMock() + first_client = _mock_client() + second_client = _mock_client() mock_create_client.side_effect = [first_client, second_client] first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") @@ -47,10 +59,8 @@ def test_is_closed_client_error_recognises_httpx_messages(self): @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_heals_when_client_already_closed_before_request(self, mock_create_client): """If `is_closed` is already True we should reset before sending.""" - first_client = MagicMock() - first_client.is_closed = True # pretend it was closed mid-session - second_client = MagicMock() - second_client.is_closed = False + first_client = _mock_client(is_closed=True) + second_client = _mock_client(is_closed=False) second_client.request.return_value = _ok_response() mock_create_client.side_effect = [first_client, second_client] @@ -66,13 +76,11 @@ def test_heals_when_client_already_closed_before_request(self, mock_create_clien @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_heals_on_runtime_error_closed_client_and_retries_get(self, mock_create_client): """The new RuntimeError branch should reset and retry idempotent calls.""" - first_client = MagicMock() - first_client.is_closed = False + first_client = _mock_client(is_closed=False) first_client.request.side_effect = RuntimeError( "Cannot send a request, as the client has been closed." ) - second_client = MagicMock() - second_client.is_closed = False + second_client = _mock_client(is_closed=False) second_client.request.return_value = _ok_response() mock_create_client.side_effect = [first_client, second_client] @@ -87,13 +95,11 @@ def test_heals_on_runtime_error_closed_client_and_retries_get(self, mock_create_ @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_non_idempotent_post_does_not_auto_retry_after_close(self, mock_create_client): """POST is not idempotent; we must reset the client but surface the error.""" - first_client = MagicMock() - first_client.is_closed = False + first_client = _mock_client(is_closed=False) first_client.request.side_effect = RuntimeError( "Cannot send a request, as the client has been closed." ) - second_client = MagicMock() - second_client.is_closed = False + second_client = _mock_client(is_closed=False) mock_create_client.side_effect = [first_client, second_client] client = rest.RESTClientObject(connection=None) @@ -110,10 +116,9 @@ def test_non_idempotent_post_does_not_auto_retry_after_close(self, mock_create_c @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_unrelated_runtime_error_is_not_retried(self, mock_create_client): """RuntimeErrors that aren't about a closed client must not trigger heal-retry.""" - first_client = MagicMock() - first_client.is_closed = False + first_client = _mock_client(is_closed=False) first_client.request.side_effect = RuntimeError("something totally unrelated") - second_client = MagicMock() + second_client = _mock_client() mock_create_client.side_effect = [first_client, second_client] client = rest.RESTClientObject(connection=None) @@ -130,16 +135,14 @@ def test_reset_connection_heals_externally_provided_connection(self, mock_create """Previously `_reset_connection` silently no-op'd for externally-provided connections. With the fix it should close the old one and create a fresh client that we own.""" - external = MagicMock() - external.is_closed = True - replacement = MagicMock() - replacement.is_closed = False + external = _mock_client(is_closed=True) + replacement = _mock_client(is_closed=False) mock_create_client.return_value = replacement client = rest.RESTClientObject(connection=external) self.assertFalse(client._owns_connection) - client._reset_connection() + self.assertTrue(client._reset_connection()) self.assertTrue(external.close.called) self.assertIs(client.connection, replacement) @@ -154,3 +157,149 @@ def test_is_client_closed_defensive_on_missing_attribute(self, mock_create_clien client = rest.RESTClientObject(connection=None) self.assertFalse(client._is_client_closed()) + # -------- Thread-safe compare-and-swap reset -------- + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_reset_connection_cas_no_ops_when_expected_mismatches(self, mock_create_client): + """If another thread already healed, a stale caller must NOT close the + replacement client or create yet another one.""" + initial = _mock_client() + already_healed = _mock_client() + # Only one client needed from `_create_default_httpx_client` (the initial + # construction). If the CAS incorrectly fires, this list will be exhausted. + mock_create_client.side_effect = [initial] + + client = rest.RESTClientObject(connection=None) + # Simulate "some other thread already healed": swap the connection. + client.connection = already_healed + + stale_reference = initial # pretend we saw `initial` before the error + did_reset = client._reset_connection(expected=stale_reference) + + self.assertFalse(did_reset) + # The already-healed client must survive untouched. + self.assertIs(client.connection, already_healed) + self.assertFalse(already_healed.close.called) + # Only the initial client was ever constructed. + self.assertEqual(mock_create_client.call_count, 1) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_reset_connection_cas_replaces_when_expected_matches(self, mock_create_client): + """With a matching `expected`, reset actually replaces the client.""" + initial = _mock_client() + replacement = _mock_client() + mock_create_client.side_effect = [initial, replacement] + + client = rest.RESTClientObject(connection=None) + + did_reset = client._reset_connection(expected=initial) + + self.assertTrue(did_reset) + self.assertIs(client.connection, replacement) + self.assertTrue(initial.close.called) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_thundering_herd_reset_produces_exactly_one_replacement(self, mock_create_client): + """When N threads all see the same broken client and race to heal, we + must end up with exactly one new client and exactly one + `_reset_connection` returning True.""" + num_threads = 16 + initial = _mock_client() + replacements = [_mock_client() for _ in range(num_threads + 1)] + # +1 for the __init__ call, rest would be extras if CAS were broken. + mock_create_client.side_effect = [initial] + replacements + + client = rest.RESTClientObject(connection=None) + self.assertIs(client.connection, initial) + + start_barrier = threading.Barrier(num_threads) + results = [] + results_lock = threading.Lock() + + def heal(): + start_barrier.wait() + got = client._reset_connection(expected=initial) + with results_lock: + results.append(got) + + threads = [threading.Thread(target=heal) for _ in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Exactly one thread saw `expected == current` and actually reset. + self.assertEqual(sum(1 for r in results if r), 1, msg=f"results={results}") + self.assertEqual(sum(1 for r in results if not r), num_threads - 1) + # Exactly one replacement client was created on top of the initial one. + self.assertEqual(mock_create_client.call_count, 2) + # The initial client was closed exactly once. + self.assertEqual(initial.close.call_count, 1) + # The replacement is now `client.connection` and was never closed. + self.assertIsNot(client.connection, initial) + self.assertFalse(client.connection.close.called) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_reset_connection_without_expected_always_resets(self, mock_create_client): + """Backwards-compat: existing callers that don't pass `expected` still work.""" + initial = _mock_client() + replacement = _mock_client() + mock_create_client.side_effect = [initial, replacement] + + client = rest.RESTClientObject(connection=None) + + self.assertTrue(client._reset_connection()) + self.assertIs(client.connection, replacement) + + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_concurrent_request_failures_do_not_cascade_close(self, mock_create_client): + """End-to-end: two threads' requests both fail on the same client; only + one reset happens and each thread retries on the fresh client. + + This is the whole reason the thundering-herd guard exists - without + CAS, thread B would close thread A's freshly-built replacement. + """ + initial = _mock_client(is_closed=False) + replacement = _mock_client(is_closed=False) + mock_create_client.side_effect = [initial, replacement] + + barrier = threading.Barrier(2) + + def shared_error(*_, **__): + # Force both threads into the error branch roughly simultaneously. + barrier.wait() + raise httpx.RemoteProtocolError("Received pseudo-header in trailer") + + initial.request.side_effect = shared_error + replacement.request.return_value = _ok_response() + + client = rest.RESTClientObject(connection=None) + + results = [] + results_lock = threading.Lock() + + def do_get(): + try: + resp = client.request("GET", "http://example") + with results_lock: + results.append(resp.status) + except Exception as e: # noqa: BLE001 + with results_lock: + results.append(e) + + threads = [threading.Thread(target=do_get) for _ in range(2)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Both threads eventually succeeded on the replacement. + self.assertEqual(results, [200, 200], msg=f"results={results}") + # Only ONE replacement was ever created (init + 1 heal). + self.assertEqual(mock_create_client.call_count, 2) + # Initial client was closed exactly once, not twice. + self.assertEqual(initial.close.call_count, 1) + # Replacement is still the live client. + self.assertIs(client.connection, replacement) + self.assertFalse(replacement.close.called) + From 6b0af663ad7376004d576b09c4ef4e87118527f6 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 16 Apr 2026 10:23:23 -0600 Subject: [PATCH 4/9] more gracefully handle the error logging during client recovery --- src/conductor/client/automator/task_runner.py | 36 ++++++++++++++----- src/conductor/client/http/async_rest.py | 9 +++-- src/conductor/client/http/rest.py | 20 ++++++++--- 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 9797da25..39bfbbc1 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -975,15 +975,33 @@ def __update_task(self, task_result: TaskResult): self.metrics_collector.increment_task_update_error( task_definition_name, type(e) ) - logger.error( - "Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s", - attempt + 1, - retry_count, - task_result.task_id, - task_result.workflow_instance_id, - task_definition_name, - traceback.format_exc() - ) + is_last_attempt = (attempt + 1) >= retry_count + # Known recoverable transport hiccups (stale keep-alive, + # HTTP/2 GOAWAY race, client closed mid-request) are flagged + # `transient=True` by the REST layer after it self-heals. For + # those, skip the stack trace until the final attempt — the + # retry normally succeeds immediately and a full traceback per + # in-flight task just spams the log. + if getattr(e, "transient", False) and not is_last_attempt: + logger.warning( + "Transient transport error updating task; will retry (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s", + attempt + 1, + retry_count, + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + getattr(e, "reason", None) or str(e), + ) + else: + logger.error( + "Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s", + attempt + 1, + retry_count, + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + traceback.format_exc() + ) continue except Exception as e: last_exception = e diff --git a/src/conductor/client/http/async_rest.py b/src/conductor/client/http/async_rest.py index 9fb948ef..c8098f0e 100644 --- a/src/conductor/client/http/async_rest.py +++ b/src/conductor/client/http/async_rest.py @@ -187,8 +187,8 @@ async def request(self, method, url, query_params=None, headers=None, await self._reset_connection() if method in idempotent_methods: continue - msg = f"Protocol error: {e}" - raise ApiException(status=0, reason=msg) + msg = f"Protocol error ({type(e).__name__}): {e}" + raise ApiException(status=0, reason=msg, transient=True) from e except httpx.TimeoutException as e: msg = f"Request timeout: {e}" raise ApiException(status=0, reason=msg) @@ -278,7 +278,10 @@ async def PATCH(self, url, headers=None, query_params=None, post_params=None, class ApiException(Exception): - def __init__(self, status=None, reason=None, http_resp=None, body=None): + def __init__(self, status=None, reason=None, http_resp=None, body=None, + transient=False): + # See rest.ApiException for a description of `transient`. + self.transient = transient if http_resp: self.status = http_resp.status self.code = http_resp.status diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index fee90edf..66520faa 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -274,8 +274,8 @@ def request(self, method, url, query_params=None, headers=None, ) if method in idempotent_methods: continue - msg = f"Protocol error: {e}" - raise ApiException(status=0, reason=msg) + msg = f"Protocol error ({type(e).__name__}): {e}" + raise ApiException(status=0, reason=msg, transient=True) from e except RuntimeError as e: # httpx raises a plain RuntimeError ("Cannot send a request, as the # client has been closed.") once its Client is closed. This can @@ -293,8 +293,12 @@ def request(self, method, url, query_params=None, headers=None, ) if method in idempotent_methods: continue + # Not an idempotent method; surface as transient so the + # caller's retry loop can log it cleanly. + msg = f"Client was closed mid-request: {e}" + raise ApiException(status=0, reason=msg, transient=True) from e msg = f"Runtime error: {e}" - raise ApiException(status=0, reason=msg) + raise ApiException(status=0, reason=msg) from e except httpx.TimeoutException as e: msg = f"Request timeout: {e}" raise ApiException(status=0, reason=msg) @@ -384,7 +388,15 @@ def PATCH(self, url, headers=None, query_params=None, post_params=None, class ApiException(Exception): - def __init__(self, status=None, reason=None, http_resp=None, body=None): + def __init__(self, status=None, reason=None, http_resp=None, body=None, + transient=False): + # `transient=True` means the SDK recognised this as a recoverable + # transport-layer hiccup (stale keep-alive, HTTP/2 GOAWAY race, client + # closed by a fork cleanup, etc.) and has already self-healed the + # underlying httpx client. Callers that implement their own retry loop + # (e.g. TaskRunner.__update_task) can use this flag to log a concise + # warning instead of a full traceback until the final attempt. + self.transient = transient if http_resp: self.status = http_resp.status self.code = http_resp.status From d4a907a03751b7fcf95b94870e694829d74ad24e Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 16 Apr 2026 10:45:40 -0600 Subject: [PATCH 5/9] adjust test expectation for new text --- tests/unit/api_client/test_rest_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/api_client/test_rest_client.py b/tests/unit/api_client/test_rest_client.py index e36d7457..3aa69b48 100644 --- a/tests/unit/api_client/test_rest_client.py +++ b/tests/unit/api_client/test_rest_client.py @@ -111,7 +111,10 @@ def test_non_idempotent_post_does_not_auto_retry_after_close(self, mock_create_c self.assertEqual(first_client.request.call_count, 1) self.assertTrue(first_client.close.called) self.assertEqual(second_client.request.call_count, 0) - self.assertIn("Runtime error", str(ctx.exception)) + # Surfaced as a transient ApiException so the caller's retry loop can + # log it concisely rather than dumping a full traceback. + self.assertIn("Client was closed mid-request", str(ctx.exception)) + self.assertTrue(getattr(ctx.exception, "transient", False)) @patch.object(rest.RESTClientObject, "_create_default_httpx_client") def test_unrelated_runtime_error_is_not_retried(self, mock_create_client): From c4aa9984d5032ad441c42cac21b4d18d1290f5d6 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 16 Apr 2026 11:03:27 -0600 Subject: [PATCH 6/9] make manual run be deployable --- .github/workflows/harness-image.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/harness-image.yml b/.github/workflows/harness-image.yml index 0b4a414d..0f0f5126 100644 --- a/.github/workflows/harness-image.yml +++ b/.github/workflows/harness-image.yml @@ -13,6 +13,11 @@ on: release: types: [published] workflow_dispatch: + inputs: + deploy: + description: "Dispatch downstream deploy after the image is built" + type: boolean + default: true concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -61,7 +66,9 @@ jobs: tags: ${{ steps.meta.outputs.tags }} dispatch-deploy: - if: github.event_name == 'release' + if: | + github.event_name == 'release' || + (github.event_name == 'workflow_dispatch' && inputs.deploy) needs: build-and-push runs-on: ubuntu-latest permissions: From a8975569121ae4a7f6f7ff661ae58d45623cbb99 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Fri, 17 Apr 2026 09:07:05 -0600 Subject: [PATCH 7/9] use branch name in image, temp image for now --- .github/workflows/harness-image.yml | 16 +++++++++++++++- harness/manifests/deployment.yaml | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/harness-image.yml b/.github/workflows/harness-image.yml index 0f0f5126..682a9881 100644 --- a/.github/workflows/harness-image.yml +++ b/.github/workflows/harness-image.yml @@ -46,13 +46,21 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} + - name: Calculate branch tag + id: vars + shell: bash + run: | + BRANCH="${{ github.ref_name }}" + CLEANED_BRANCH_NAME=$(echo "$BRANCH" | tr '/' '-' | tr '[:upper:]' '[:lower:]') + echo "cleaned-branch-name=$CLEANED_BRANCH_NAME" >> "$GITHUB_OUTPUT" + - name: Docker metadata id: meta uses: docker/metadata-action@v5 with: images: ghcr.io/conductor-oss/python-sdk/harness-worker tags: | - type=raw,value=latest + type=raw,value=${{ steps.vars.outputs.cleaned-branch-name }}-latest,enable=${{ github.event_name != 'release' }} type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }} - name: Build and push @@ -64,6 +72,12 @@ jobs: platforms: linux/amd64,linux/arm64 push: true tags: ${{ steps.meta.outputs.tags }} + # Registry-backed BuildKit cache. Unchanged layers are reused across + # runs so rebuilding the same commit (or one with only minor diffs) + # is near-instant. The `:buildcache` tag lives alongside the image + # but only stores layer blobs, not a runnable image. + cache-from: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache + cache-to: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache,mode=max dispatch-deploy: if: | diff --git a/harness/manifests/deployment.yaml b/harness/manifests/deployment.yaml index c183197f..0f7f710e 100644 --- a/harness/manifests/deployment.yaml +++ b/harness/manifests/deployment.yaml @@ -18,7 +18,7 @@ spec: # note: imagePullSecrets is not needed for public images containers: - name: harness - image: ghcr.io/conductor-oss/python-sdk/harness-worker:latest + image: ghcr.io/conductor-oss/python-sdk/harness-worker:certification-worker-metrics-latest imagePullPolicy: Always env: # === CONDUCTOR CONNECTION (from per-cloud ConfigMap) === From 5aae522d63925a806de72f0f98e64f2e11aa62f5 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Mon, 20 Apr 2026 09:18:40 -0600 Subject: [PATCH 8/9] update harness image to main --- harness/manifests/deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harness/manifests/deployment.yaml b/harness/manifests/deployment.yaml index 0f7f710e..048559fa 100644 --- a/harness/manifests/deployment.yaml +++ b/harness/manifests/deployment.yaml @@ -18,7 +18,7 @@ spec: # note: imagePullSecrets is not needed for public images containers: - name: harness - image: ghcr.io/conductor-oss/python-sdk/harness-worker:certification-worker-metrics-latest + image: ghcr.io/conductor-oss/python-sdk/harness-worker:main-latest imagePullPolicy: Always env: # === CONDUCTOR CONNECTION (from per-cloud ConfigMap) === From 16c4f84fb77581c954e84e8044800dd90b82e315 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Mon, 20 Apr 2026 10:01:50 -0600 Subject: [PATCH 9/9] stop allowing test failures to continue-on-error in gh actions, wip/testing --- .github/workflows/pull_request.yml | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 0bf3802e..2bb0b0a7 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -10,7 +10,6 @@ jobs: unit-test: runs-on: ubuntu-latest env: - COVERAGE_FILE: coverage.xml COVERAGE_DIR: .coverage-reports steps: - name: Checkout code @@ -67,23 +66,21 @@ jobs: - name: Generate coverage report id: coverage_report - continue-on-error: true run: | coverage combine ${{ env.COVERAGE_DIR }}/.coverage.* coverage report - coverage xml + coverage xml -o coverage.xml - name: Verify coverage file id: verify_coverage if: always() - continue-on-error: true run: | - if [ ! -s "${{ env.COVERAGE_FILE }}" ]; then - echo "Coverage file is empty or does not exist" - ls -la ${{ env.COVERAGE_FILE }} ${{ env.COVERAGE_DIR }} + if [ ! -s coverage.xml ]; then + echo "coverage.xml is empty or does not exist" + ls -la coverage.xml ${{ env.COVERAGE_DIR }} || true exit 1 fi - echo "Coverage file exists and is not empty" + echo "coverage.xml exists and is not empty" - name: Upload coverage to Codecov if: always() && steps.verify_coverage.outcome == 'success' @@ -91,7 +88,7 @@ jobs: uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} - file: ${{ env.COVERAGE_FILE }} + file: coverage.xml - name: Check test results if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure'