Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c4b03e7
implement metrics as both legacy and canonical
chrishagglund-ship-it Apr 28, 2026
9ee4b94
standardize on what truthiness means for env var
chrishagglund-ship-it Apr 29, 2026
c5a399b
fixes for a few canonical metrics
chrishagglund-ship-it Apr 30, 2026
09d269e
improve test coverage for new canonical vs legacy metrics output
chrishagglund-ship-it May 4, 2026
d3fea79
integration tests create a local sqllite db as part of running a oss …
chrishagglund-ship-it May 5, 2026
3992c9e
cleaner reporting on which metrics implementation is used in the test…
chrishagglund-ship-it May 6, 2026
df6e220
adjust documentation surrounding metrics
chrishagglund-ship-it May 6, 2026
365b8d5
additional update of docs re: metrics
chrishagglund-ship-it May 6, 2026
2fc8e32
convenience metrics wiring so it doesn't take so much boilerplate to …
chrishagglund-ship-it May 7, 2026
dfba36e
spotless lint
chrishagglund-ship-it May 7, 2026
02dfa22
add or update a changelog
chrishagglund-ship-it May 7, 2026
c9b995f
wip - harness worker should be emitting cardinality explosion, update…
chrishagglund-ship-it May 7, 2026
5dcf067
address some concerns about uncaught exception handler mechanism, lis…
chrishagglund-ship-it May 8, 2026
611a000
change where queue full metric is sent to be much more meaningful and…
chrishagglund-ship-it May 8, 2026
d5e4941
improve how metrics are wired in to not require a public class for do…
chrishagglund-ship-it May 8, 2026
2e49e15
adjust changelog entry and metric description
chrishagglund-ship-it May 8, 2026
0db65dd
remove an unnecessary and potentially problematic early return guard
chrishagglund-ship-it May 8, 2026
eff0659
testing an implementation that doesn't break a singleton contract
chrishagglund-ship-it May 10, 2026
420bb4c
don't assume versions
chrishagglund-ship-it May 10, 2026
fcdde93
wip, addressing some self-review concerns about imposing new behavior…
chrishagglund-ship-it May 10, 2026
600e2c4
wip, addressing some self-review concerns about internal mechanics
chrishagglund-ship-it May 10, 2026
2331542
update doc, run spotless
chrishagglund-ship-it May 10, 2026
dcd384b
moved technical details of metrics changes into metrics related docs,…
chrishagglund-ship-it May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ bin/
.project
.settings/
.factorypath
*.db
*.db-shm
*.db-wal
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Canonical metrics mode: opt-in harmonized metric surface via `WORKER_CANONICAL_METRICS=true` — [details](conductor-client-metrics/README.md#detailed-technical-notes--unreleased)
- Automatic metrics wiring: `ConductorClient.Builder.withMetricsCollector(...)` installs the HTTP interceptor and auto-registers listeners on `TaskClient` and `WorkflowClient` (automatic in canonical mode; opt-in via `setAutoWiringEnabled(true)` for legacy)

### Changed

- Legacy metrics emit unchanged by default; no env var required
- `micrometer-registry-prometheus` is now a transitive (`api`) dependency

### Deprecated

- `PrometheusMetricsCollector` — use `MetricsCollectorFactory.create()` or `MetricsBundle.create()`
- `TaskClient.ack(String, String)` — use `ack(String taskType, String taskId, String workerId)`

## [4.0.0] - 2024-10-09
- New major release – [Read more](https://orkes.io/blog/conductor-java-client-v4/)

Expand Down
111 changes: 70 additions & 41 deletions INTERCEPTOR.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,31 @@ public class ListenerRegister {
}
```

### 4. PrometheusMetricsCollector
### 4. MetricsCollectorFactory / LegacyPrometheusMetricsCollector / CanonicalPrometheusMetricsCollector

**Location**: `conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/PrometheusMetricsCollector.java`
**Location**: `conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/`

Reference implementation of `MetricsCollector` using Micrometer Prometheus.

**Features**:
- Exposes HTTP endpoint for Prometheus scraping (default: `localhost:9991/metrics`)
- Records timers for poll duration (success/failure)
- Records timers for task execution duration (completed/failure)
- Records counters for poll started and task execution started
- All metrics tagged with task type

**Metrics Exposed**:
- `poll_failure` (timer) - Duration of failed polls
- `poll_success` (timer) - Duration of successful polls
- `poll_started` (counter) - Count of poll attempts
- `task_execution_started` (counter) - Count of task executions started
- `task_execution_completed` (timer) - Duration of completed task executions
- `task_execution_failure` (timer) - Duration of failed task executions
- Selects either the legacy or canonical Prometheus collector at startup
- Records worker, task client, and workflow client metrics through the event listener system
- Records HTTP API client metrics through an OkHttp interceptor
- Keeps the metrics backend separated from task and workflow business logic

For setup instructions, environment-variable selection, the complete legacy and canonical metric catalogs, and migration guidance, see [`conductor-client-metrics/README.md`](conductor-client-metrics/README.md).

### Compatibility: `PrometheusMetricsCollector`

`com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector` is retained as a deprecated alias for `LegacyPrometheusMetricsCollector`. Existing 4.0.x code that does:

```java
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
metricsCollector.startServer(9991, "/metrics");
```

continues to compile and emit the same six legacy meter names (`poll_started`, `poll_success`, `poll_failure`, `task_execution_started`, `task_execution_completed`, `task_execution_failure`) byte-for-byte. The shim deliberately delegates to `LegacyPrometheusMetricsCollector`, **not** to `MetricsCollectorFactory.create()`, so an upgrader who already has `WORKER_CANONICAL_METRICS=true` in their environment is not silently flipped to the canonical surface. New code should use `MetricsCollectorFactory.create()` (or `MetricsBundle.create()`) to opt into env-var-driven selection.

## Event Lifecycle

Expand Down Expand Up @@ -308,21 +313,20 @@ Reference implementation of `MetricsCollector` using Micrometer Prometheus.

```
┌─────────────────────────────────────────────────────────────────┐
│ 1. Check payload size │
│ 1. Off-load oversized payload (only when │
│ isEnforceThresholds() == true) │
│ WorkflowClient.checkAndUploadToExternalStorage() │
│ └─→ eventDispatcher.publish( │
│ new WorkflowInputPayloadSizeEvent(name, version, size)) │
│ └─→ if size > threshold: │
│ eventDispatcher.publish( │
│ new WorkflowPayloadUsedEvent(name, version, │
│ "WRITE", "WORKFLOW_INPUT")) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. Upload to external storage (if needed) │
│ └─→ eventDispatcher.publish( │
│ new WorkflowPayloadUsedEvent(name, version, │
│ "WRITE", "WORKFLOW_INPUT")) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. Start workflow │
│ 2. Start workflow (POST /workflow tagged with │
│ PayloadKind.WorkflowInput so the ApiClientMetrics │
│ OkHttp interceptor records workflow_input_size_bytes │
│ from RequestBody.contentLength() at wire time) │
│ WorkflowClient.startWorkflow() │
│ • Success: eventDispatcher.publish( │
│ new WorkflowStartedEvent(name, version)) │
Expand All @@ -331,37 +335,54 @@ Reference implementation of `MetricsCollector` using Micrometer Prometheus.
└─────────────────────────────────────────────────────────────────┘
```

> Note: `WorkflowInputPayloadSizeEvent` is no longer published from
> `WorkflowClient` — the canonical `workflow_input_size_bytes` histogram is
> populated at wire time by `ApiClientMetrics`, which avoids serializing the
> input twice. The event POJO and `consume(WorkflowInputPayloadSizeEvent)`
> hook are retained for third-party publishers and route through the same
> `PrometheusApiClientMetrics` helper. The same applies to
> `TaskResultPayloadSizeEvent` and `task_result_size_bytes`.

## Usage Guide

### Basic Setup with Prometheus Metrics

```java
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector;
import com.netflix.conductor.client.http.ConductorClient;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.http.WorkflowClient;
import com.netflix.conductor.client.metrics.prometheus.MetricsBundle;

// 1. Create TaskClient
TaskClient taskClient = new TaskClient("http://conductor-server:8080");
// 1. Create and start metrics (factory-selected collector + Prometheus scrape server)
MetricsBundle bundle = MetricsBundle.create(); // port 9991, /metrics

// 2. Create and start PrometheusMetricsCollector
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
metricsCollector.startServer(); // Starts HTTP server on port 9991
// 2. Create ConductorClient — withMetricsCollector installs the HTTP interceptor
// and enables automatic listener registration on downstream clients
ConductorClient client = ConductorClient.builder()
.basePath("http://conductor-server:8080/api")
.withMetricsCollector(bundle.getCollector())
.build();

// 3. Downstream clients auto-register as listeners
TaskClient taskClient = new TaskClient(client);
WorkflowClient workflowClient = new WorkflowClient(client);

// 3. Configure TaskRunner with metrics
TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withThreadCount(10)
.withMetricsCollector(metricsCollector)
.build();

// 4. Start polling
configurer.init();
```

For fine-grained control over which listeners are registered, use `withHttpMetrics` instead of `withMetricsCollector`. This installs only the HTTP interceptor and leaves all listener registration to you. See the [Manual Wiring](conductor-client-metrics/README.md#manual-wiring) section in the metrics README.

### Custom Metrics Endpoint

```java
// Start Prometheus server on custom port and endpoint
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
AbstractPrometheusMetricsCollector metricsCollector = MetricsCollectorFactory.create();
metricsCollector.startServer(8080, "/custom-metrics");
```

Expand Down Expand Up @@ -509,8 +530,11 @@ ListenerRegister.register(new TaskMonitor(), dispatcher);
### Workflow and Task Client Event Listeners

```java
WorkflowClient workflowClient = new WorkflowClient("http://conductor-server:8080");
TaskClient taskClient = new TaskClient("http://conductor-server:8080");
ConductorClient client = ConductorClient.builder()
.basePath("http://conductor-server:8080/api")
.build();
WorkflowClient workflowClient = new WorkflowClient(client);
TaskClient taskClient = new TaskClient(client);

// Register workflow listener
workflowClient.registerListener(new WorkflowClientListener() {
Expand Down Expand Up @@ -834,7 +858,7 @@ public class CloudWatchMetricsCollector implements MetricsCollector {

```java
// Create multiple collectors
PrometheusMetricsCollector prometheus = new PrometheusMetricsCollector();
AbstractPrometheusMetricsCollector prometheus = MetricsCollectorFactory.create();
prometheus.startServer(9991, "/metrics");

DatadogMetricsCollector datadog = new DatadogMetricsCollector(
Expand Down Expand Up @@ -1192,16 +1216,21 @@ public class MetricsOverheadMonitor implements TaskRunnerEventsListener {
package com.example.conductor;

import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.http.ConductorClient;
import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector;
import com.netflix.conductor.client.metrics.prometheus.MetricsCollectorFactory;
import com.netflix.conductor.client.metrics.prometheus.AbstractPrometheusMetricsCollector;
import java.util.List;

public class ConductorMonitoringSetup {

public static void main(String[] args) throws Exception {
// 1. Create clients
TaskClient taskClient = new TaskClient("http://conductor-server:8080");
ConductorClient client = ConductorClient.builder()
.basePath("http://conductor-server:8080/api")
.build();
TaskClient taskClient = new TaskClient(client);

// 2. Create workers
List<Worker> workers = List.of(
Expand All @@ -1210,7 +1239,7 @@ public class ConductorMonitoringSetup {
);

// 3. Setup Prometheus metrics
PrometheusMetricsCollector prometheus = new PrometheusMetricsCollector();
AbstractPrometheusMetricsCollector prometheus = MetricsCollectorFactory.create();
prometheus.startServer(9991, "/metrics");

// 4. Setup custom monitoring
Expand Down
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,29 @@ executor.initWorkers("com.mycompany.workers"); // Package to scan for @WorkerTa

## Monitoring Workers

Enable metrics collection for monitoring workers:
Enable Prometheus metrics collection for monitoring workers:

```java
```groovy
// Using conductor-client-metrics module
dependencies {
implementation 'org.conductoross:conductor-client-metrics:4.0.1'
}
```

```java
// Configure metrics with Prometheus
import com.netflix.conductor.client.metrics.prometheus.MetricsCollectorFactory;

TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withThreadCount(10)
.withMetricsCollector(new PrometheusMetricsCollector())
.withMetricsCollector(MetricsCollectorFactory.create())
.build();
```

See [conductor-client-metrics/README.md](conductor-client-metrics/README.md) for full metrics documentation.
`MetricsCollectorFactory.create()` uses the legacy Java SDK metric names by default. Set `WORKER_CANONICAL_METRICS=true` to opt in to the canonical cross-SDK metric names.

> Migrating from 4.0.x? `PrometheusMetricsCollector` still works — it is now a deprecated alias for `LegacyPrometheusMetricsCollector` and emits the same six legacy meter names byte-for-byte. New code should use `MetricsCollectorFactory.create()` (or `MetricsBundle.create()`) so it can opt into canonical metrics via `WORKER_CANONICAL_METRICS=true`.

See [conductor-client-metrics/README.md](conductor-client-metrics/README.md) for setup details, the complete legacy and canonical metric catalogs, and migration guidance.

## Workflows

Expand Down
Loading
Loading