Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions .github/workflows/harness-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ on:
release:
types: [published]
workflow_dispatch:
inputs:
deploy:
description: "Dispatch downstream deploy after the image is built"
type: boolean
default: true

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -41,13 +46,21 @@ jobs:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Calculate branch tag
id: vars
shell: bash
run: |
BRANCH="${{ github.ref_name }}"
CLEANED_BRANCH_NAME=$(echo "$BRANCH" | tr '/' '-' | tr '[:upper:]' '[:lower:]')
echo "cleaned-branch-name=$CLEANED_BRANCH_NAME" >> "$GITHUB_OUTPUT"

- name: Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ghcr.io/conductor-oss/python-sdk/harness-worker
tags: |
type=raw,value=latest
type=raw,value=${{ steps.vars.outputs.cleaned-branch-name }}-latest,enable=${{ github.event_name != 'release' }}
type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }}

- name: Build and push
Expand All @@ -59,9 +72,17 @@ jobs:
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
# Registry-backed BuildKit cache. Unchanged layers are reused across
# runs so rebuilding the same commit (or one with only minor diffs)
# is near-instant. The `:buildcache` tag lives alongside the image
# but only stores layer blobs, not a runnable image.
cache-from: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache
cache-to: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache,mode=max

dispatch-deploy:
if: github.event_name == 'release'
if: |
github.event_name == 'release' ||
(github.event_name == 'workflow_dispatch' && inputs.deploy)
needs: build-and-push
runs-on: ubuntu-latest
permissions:
Expand Down
15 changes: 6 additions & 9 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ jobs:
unit-test:
runs-on: ubuntu-latest
env:
COVERAGE_FILE: coverage.xml
COVERAGE_DIR: .coverage-reports
steps:
- name: Checkout code
Expand Down Expand Up @@ -67,31 +66,29 @@ jobs:

- name: Generate coverage report
id: coverage_report
continue-on-error: true
run: |
coverage combine ${{ env.COVERAGE_DIR }}/.coverage.*
coverage report
coverage xml
coverage xml -o coverage.xml

- name: Verify coverage file
id: verify_coverage
if: always()
continue-on-error: true
run: |
if [ ! -s "${{ env.COVERAGE_FILE }}" ]; then
echo "Coverage file is empty or does not exist"
ls -la ${{ env.COVERAGE_FILE }} ${{ env.COVERAGE_DIR }}
if [ ! -s coverage.xml ]; then
echo "coverage.xml is empty or does not exist"
ls -la coverage.xml ${{ env.COVERAGE_DIR }} || true
exit 1
fi
echo "Coverage file exists and is not empty"
echo "coverage.xml exists and is not empty"

- name: Upload coverage to Codecov
if: always() && steps.verify_coverage.outcome == 'success'
continue-on-error: true
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ${{ env.COVERAGE_FILE }}
file: coverage.xml

- name: Check test results
if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure'
Expand Down
24 changes: 22 additions & 2 deletions harness/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,32 @@ docker run -d \
python-sdk-harness
```

You can also run the harness locally without Docker (from the repo root):
You can also run the harness locally without Docker (from the repo root).
A virtual environment keeps dependencies isolated from your system Python:

```bash
# Install the SDK in development mode (one-time)
# Check if a .venv already exists
ls .venv/bin/activate 2>/dev/null && echo "venv exists" || echo "no venv found"

# Create one if needed (one-time)
python3 -m venv .venv

# Activate it (required every time you open a new terminal)
source .venv/bin/activate # Windows: .venv\Scripts\activate

# Verify you're in the venv (should print the .venv path)
which python3

# Install the SDK in development mode (one-time, or after pulling new deps)
pip3 install -e .

# When you're done, deactivate the venv to restore your normal shell
deactivate
```

Once the venv is active and the SDK is installed:

```bash
export CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api
export CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY
export CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET
Expand Down
6 changes: 6 additions & 0 deletions harness/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.models.task_def import TaskDef
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
Expand Down Expand Up @@ -79,10 +80,15 @@ def main() -> None:
worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms)
workers.append(worker)

metrics_port = env_int_or_default("HARNESS_METRICS_PORT", 9991)
metrics_settings = MetricsSettings(http_port=metrics_port)
print(f"Prometheus metrics will be served on port {metrics_port}")

task_handler = TaskHandler(
workers=workers,
configuration=configuration,
scan_for_annotated_workers=False,
metrics_settings=metrics_settings,
)

workflow_executor = clients.get_workflow_executor()
Expand Down
4 changes: 2 additions & 2 deletions harness/manifests/configmap-gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ metadata:
labels:
app: python-sdk-harness-worker
data:
CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api"
CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a"
CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.io/api"
CONDUCTOR_AUTH_KEY: "25b681c1-34ec-11f1-b07a-9601c7a63373"
7 changes: 6 additions & 1 deletion harness/manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
# note: imagePullSecrets is not needed for public images
containers:
- name: harness
image: ghcr.io/conductor-oss/python-sdk/harness-worker:latest
image: ghcr.io/conductor-oss/python-sdk/harness-worker:main-latest
imagePullPolicy: Always
env:
# === CONDUCTOR CONNECTION (from per-cloud ConfigMap) ===
Expand Down Expand Up @@ -51,6 +51,11 @@ spec:
- name: HARNESS_POLL_INTERVAL_MS
value: "100"

ports:
- name: metrics
containerPort: 9991
protocol: TCP

resources:
requests:
memory: "256Mi"
Expand Down
136 changes: 119 additions & 17 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,25 @@ def __init__(
)
)

# Auth failure backoff tracking to prevent retry storms
# Auth failure backoff tracking to prevent retry storms.
# `_auth_failures` is capped at `_max_auth_failure_exp` so that
# 2**N cannot overflow on a long-lived worker whose auth is broken.
# The resulting sleep is further clamped to `_auth_backoff_cap_seconds`.
self._auth_failures = 0
self._last_auth_failure = 0
self._auth_backoff_cap_seconds = 60
self._max_auth_failure_exp = 6 # 2**6 = 64s, sleep clamped to cap

# Generic poll-failure backoff. This is distinct from the empty-poll
# adaptive delay (`_consecutive_empty_polls`) and from the auth-error
# backoff above. It kicks in when batch_poll raises an exception
# (server 5xx, NGINX 502/504 under load, DNS hiccup, a closed httpx
# client that couldn't heal, etc.) so we don't hot-loop the log with
# stack traces while waiting for the server to recover.
self._poll_failures = 0
self._last_poll_failure = 0
self._poll_backoff_cap_seconds = 120 # max 2 minutes between retries
self._max_poll_failure_exp = 7 # 2**7 = 128s, sleep clamped to cap

# Thread pool for concurrent task execution
# thread_count from worker configuration controls concurrency
Expand Down Expand Up @@ -543,15 +559,33 @@ def __batch_poll_tasks(self, count: int) -> list:
logger.debug("Stop polling task for: %s", task_definition_name)
return []

# Apply exponential backoff if we have recent auth failures
# Apply exponential backoff if we have recent auth failures.
if self._auth_failures > 0:
now = time.time()
backoff_seconds = min(2 ** self._auth_failures, 60)
backoff_seconds = min(
2 ** min(self._auth_failures, self._max_auth_failure_exp),
self._auth_backoff_cap_seconds,
)
time_since_last_failure = now - self._last_auth_failure
if time_since_last_failure < backoff_seconds:
time.sleep(0.1)
return []

# Apply exponential backoff for generic poll failures (5xx, network
# errors, closed-client runtime errors that couldn't self-heal, etc.).
# Bounded at `_poll_backoff_cap_seconds` (2 min) to avoid log floods
# without giving up on recovery.
if self._poll_failures > 0:
now = time.time()
backoff_seconds = min(
2 ** min(self._poll_failures, self._max_poll_failure_exp),
self._poll_backoff_cap_seconds,
)
time_since_last_failure = now - self._last_poll_failure
if time_since_last_failure < backoff_seconds:
time.sleep(0.1)
return []

# Publish PollStarted event (metrics collector will handle via event)
self.event_dispatcher.publish(PollStarted(
task_type=task_definition_name,
Expand Down Expand Up @@ -583,15 +617,20 @@ def __batch_poll_tasks(self, count: int) -> list:
tasks_received=len(tasks) if tasks else 0
))

# Success - reset auth failure counter (any successful HTTP response means auth is working)
# Success - reset both failure counters (any successful HTTP
# response means auth and connectivity are working).
self._auth_failures = 0
self._poll_failures = 0

return tasks if tasks else []

except AuthorizationException as auth_exception:
self._auth_failures += 1
self._last_auth_failure = time.time()
backoff_seconds = min(2 ** self._auth_failures, 60)
backoff_seconds = min(
2 ** min(self._auth_failures, self._max_auth_failure_exp),
self._auth_backoff_cap_seconds,
)

# Publish PollFailure event (metrics collector will handle via event)
self.event_dispatcher.publish(PollFailure(
Expand Down Expand Up @@ -619,10 +658,55 @@ def __batch_poll_tasks(self, count: int) -> list:
duration_ms=(time.time() - start_time) * 1000,
cause=e
))
logger.error(
"Failed to batch poll task for: %s, reason: %s",

# Bump the poll-failure counter so the next poll waits with
# exponential backoff instead of hot-looping on a broken server
# or connection.
self._poll_failures += 1
self._last_poll_failure = time.time()
backoff_seconds = min(
2 ** min(self._poll_failures, self._max_poll_failure_exp),
self._poll_backoff_cap_seconds,
)

# Belt-and-suspenders: if the underlying httpx client got closed
# and rest.request() couldn't heal it (e.g. because the error
# arrived as a non-RuntimeError), nudge it here. Pass the current
# connection as `expected` so concurrent threads racing to heal
# can't cause a reset storm: only the first caller per client
# generation actually replaces it.
try:
rest_client = getattr(
getattr(self.task_client, "api_client", None),
"rest_client",
None,
)
if rest_client is not None and getattr(rest_client, "_is_client_closed", lambda: False)():
current_conn = getattr(rest_client, "connection", None)
reset = rest_client._reset_connection(expected=current_conn)
if reset:
logger.warning(
"rest_client was closed after poll failure; reset"
)
except Exception:
# Healing is best-effort; never let it mask the original error.
pass

# Log a single-line warning at a modest level to avoid drowning
# ops in tracebacks when the server is flapping. Full traceback
# goes to debug for when operators need it.
logger.warning(
"Failed to batch poll task for: %s (failure #%d). Will retry with exponential backoff (%ss). Reason: %s: %s",
task_definition_name,
traceback.format_exc()
self._poll_failures,
backoff_seconds,
type(e).__name__,
e,
)
logger.debug(
"batch poll failure traceback for %s:\n%s",
task_definition_name,
traceback.format_exc(),
)
return []

Expand Down Expand Up @@ -891,15 +975,33 @@ def __update_task(self, task_result: TaskResult):
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
traceback.format_exc()
)
is_last_attempt = (attempt + 1) >= retry_count
# Known recoverable transport hiccups (stale keep-alive,
# HTTP/2 GOAWAY race, client closed mid-request) are flagged
# `transient=True` by the REST layer after it self-heals. For
# those, skip the stack trace until the final attempt — the
# retry normally succeeds immediately and a full traceback per
# in-flight task just spams the log.
if getattr(e, "transient", False) and not is_last_attempt:
logger.warning(
"Transient transport error updating task; will retry (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
getattr(e, "reason", None) or str(e),
)
else:
logger.error(
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
traceback.format_exc()
)
continue
except Exception as e:
last_exception = e
Expand Down
9 changes: 6 additions & 3 deletions src/conductor/client/http/async_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ async def request(self, method, url, query_params=None, headers=None,
await self._reset_connection()
if method in idempotent_methods:
continue
msg = f"Protocol error: {e}"
raise ApiException(status=0, reason=msg)
msg = f"Protocol error ({type(e).__name__}): {e}"
raise ApiException(status=0, reason=msg, transient=True) from e
except httpx.TimeoutException as e:
msg = f"Request timeout: {e}"
raise ApiException(status=0, reason=msg)
Expand Down Expand Up @@ -278,7 +278,10 @@ async def PATCH(self, url, headers=None, query_params=None, post_params=None,

class ApiException(Exception):

def __init__(self, status=None, reason=None, http_resp=None, body=None):
def __init__(self, status=None, reason=None, http_resp=None, body=None,
transient=False):
# See rest.ApiException for a description of `transient`.
self.transient = transient
if http_resp:
self.status = http_resp.status
self.code = http_resp.status
Expand Down
Loading
Loading