Skip to content

Commit 7d26a69

Browse files
committed
refactor(durable-effects): split operations.ts into per-effect files
1 parent 666150e commit 7d26a69

11 files changed

Lines changed: 534 additions & 490 deletions

durable-effects/durable-eval.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Durable eval — persistent, replay-safe in-process code evaluation.
3+
*
4+
* Uses `createDurableOperation` from @effectionx/durable-streams.
5+
* During live execution, the operation runs and persists a Yield event.
6+
* During replay, the stored result is returned without executing.
7+
*/
8+
9+
import {
10+
type Json,
11+
type Workflow,
12+
createDurableOperation,
13+
} from "@effectionx/durable-streams";
14+
import type { Operation } from "effection";
15+
import { canonicalJson } from "./canonical-json.ts";
16+
import { computeSHA256 } from "./hash.ts";
17+
18+
export interface EvalOptions {
19+
source: string;
20+
language?: string;
21+
bindings?: Record<string, Json>;
22+
}
23+
24+
export interface EvalResult {
25+
value: Json;
26+
sourceHash: string;
27+
bindingsHash: string;
28+
}
29+
30+
/**
31+
* Evaluate code via a caller-provided evaluator durably.
32+
*
33+
* Source hash and bindings hash in the result for replay guard
34+
* freshness detection.
35+
*/
36+
export function* durableEval(
37+
name: string,
38+
evaluator: (
39+
source: string,
40+
bindings: Record<string, Json>,
41+
) => Operation<Json>,
42+
options: EvalOptions,
43+
): Workflow<EvalResult> {
44+
const { source, language, bindings = {} } = options;
45+
46+
return (yield createDurableOperation<Json>(
47+
{ type: "eval", name, ...(language ? { language } : {}) },
48+
function* () {
49+
const sourceHash = yield* computeSHA256(source);
50+
const bindingsHash = yield* computeSHA256(canonicalJson(bindings));
51+
const value = yield* evaluator(source, bindings);
52+
return { value, sourceHash, bindingsHash } as unknown as Json;
53+
},
54+
)) as EvalResult;
55+
}

durable-effects/durable-exec.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Durable exec — persistent, replay-safe subprocess execution.
3+
*
4+
* Uses `createDurableOperation` from @effectionx/durable-streams.
5+
* During live execution, the operation runs and persists a Yield event.
6+
* During replay, the stored result is returned without executing.
7+
*/
8+
9+
import {
10+
type Json,
11+
type Workflow,
12+
createDurableOperation,
13+
} from "@effectionx/durable-streams";
14+
import { useScope } from "effection";
15+
import { type DurableRuntime, DurableRuntimeCtx } from "./runtime.ts";
16+
17+
export interface ExecOptions {
18+
command: string[];
19+
cwd?: string;
20+
env?: Record<string, string>;
21+
timeout?: number;
22+
throwOnError?: boolean;
23+
}
24+
25+
export interface ExecResult {
26+
exitCode: number;
27+
stdout: string;
28+
stderr: string;
29+
}
30+
31+
/**
32+
* Execute a shell command durably.
33+
*
34+
* Never re-executed on replay — logs are authoritative.
35+
*
36+
* **Security note**: `env` values are NOT persisted to the journal —
37+
* only the env key names are recorded (for divergence detection).
38+
* The `throwOnError` flag is captured in the description so replay
39+
* behavior matches the original execution.
40+
*/
41+
export function* durableExec(
42+
name: string,
43+
options: ExecOptions,
44+
): Workflow<ExecResult> {
45+
const { command, cwd, env, timeout = 300_000, throwOnError = true } = options;
46+
47+
return (yield createDurableOperation<Json>(
48+
{
49+
type: "exec",
50+
name,
51+
command: command as Json,
52+
...(cwd ? { cwd } : {}),
53+
// Only record env key names — values may contain secrets
54+
...(env ? { envKeys: Object.keys(env).sort() as Json } : {}),
55+
timeout,
56+
throwOnError,
57+
},
58+
function* () {
59+
const scope = yield* useScope();
60+
const runtime = scope.expect<DurableRuntime>(DurableRuntimeCtx);
61+
62+
const output = yield* runtime.exec({ command, cwd, env, timeout });
63+
64+
if (throwOnError && output.exitCode !== 0) {
65+
throw new Error(
66+
`Command failed with exit code ${output.exitCode}: ${command.join(" ")}\n${output.stderr}`,
67+
);
68+
}
69+
return output as unknown as Json;
70+
},
71+
)) as ExecResult;
72+
}

durable-effects/durable-fetch.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/**
2+
* Durable fetch — persistent, replay-safe HTTP request.
3+
*
4+
* Uses `createDurableOperation` from @effectionx/durable-streams.
5+
* During live execution, the operation runs and persists a Yield event.
6+
* During replay, the stored result is returned without executing.
7+
*/
8+
9+
import {
10+
type Json,
11+
type Workflow,
12+
createDurableOperation,
13+
} from "@effectionx/durable-streams";
14+
import { useScope } from "effection";
15+
import { computeSHA256 } from "./hash.ts";
16+
import { type DurableRuntime, DurableRuntimeCtx } from "./runtime.ts";
17+
18+
export interface FetchOptions {
19+
url: string;
20+
method?: string;
21+
headers?: Record<string, string>;
22+
body?: string;
23+
timeout?: number;
24+
}
25+
26+
export interface FetchResult {
27+
status: number;
28+
headers: Record<string, string>;
29+
body: string;
30+
bodyHash: string;
31+
}
32+
33+
/** Header names that are safe to record in the journal. */
34+
const SAFE_REQUEST_HEADERS = new Set([
35+
"content-type",
36+
"accept",
37+
"accept-language",
38+
"cache-control",
39+
"user-agent",
40+
]);
41+
42+
/**
43+
* HTTP request durably.
44+
*
45+
* HTTP error status codes (404, 500) are successful effect results —
46+
* only network failures are effect errors.
47+
*
48+
* **Security note**: Only safe request header *names* are recorded in
49+
* the description — values of sensitive headers (Authorization, Cookie,
50+
* etc.) are never persisted. A body hash is included in the description
51+
* when a request body is present, so different payloads to the same URL
52+
* produce distinct journal entries.
53+
*/
54+
export function* durableFetch(
55+
name: string,
56+
options: FetchOptions,
57+
): Workflow<FetchResult> {
58+
const { url, method = "GET", headers = {}, body, timeout = 30_000 } = options;
59+
60+
// Record only safe header names + values; redact sensitive ones to key-only
61+
const safeHeaders: Record<string, string> = {};
62+
for (const [key, value] of Object.entries(headers)) {
63+
const lower = key.toLowerCase();
64+
if (SAFE_REQUEST_HEADERS.has(lower)) {
65+
safeHeaders[key] = value;
66+
} else {
67+
safeHeaders[key] = "[REDACTED]";
68+
}
69+
}
70+
71+
return (yield createDurableOperation<Json>(
72+
{
73+
type: "fetch",
74+
name,
75+
url,
76+
method,
77+
headers: safeHeaders as Json,
78+
// Include body hash so different payloads produce distinct entries
79+
...(body ? { bodyHash: `len:${body.length}` } : {}),
80+
},
81+
function* () {
82+
const scope = yield* useScope();
83+
const runtime = scope.expect<DurableRuntime>(DurableRuntimeCtx);
84+
85+
const response = yield* runtime.fetch(url, {
86+
method,
87+
headers,
88+
body,
89+
timeout,
90+
});
91+
const responseBody = yield* response.text();
92+
const bodyHash = yield* computeSHA256(responseBody);
93+
94+
// Filter response headers to keep only useful ones
95+
const responseHeaders: Record<string, string> = {};
96+
for (const key of [
97+
"content-type",
98+
"etag",
99+
"last-modified",
100+
"cache-control",
101+
]) {
102+
const val = response.headers.get(key);
103+
if (val) responseHeaders[key] = val;
104+
}
105+
106+
return {
107+
status: response.status,
108+
headers: responseHeaders,
109+
body: responseBody,
110+
bodyHash,
111+
} as unknown as Json;
112+
},
113+
)) as FetchResult;
114+
}

durable-effects/durable-glob.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Durable glob — persistent, replay-safe directory glob with scan hash.
3+
*
4+
* Uses `createDurableOperation` from @effectionx/durable-streams.
5+
* During live execution, the operation runs and persists a Yield event.
6+
* During replay, the stored result is returned without executing.
7+
*/
8+
9+
import {
10+
type Json,
11+
type Workflow,
12+
createDurableOperation,
13+
} from "@effectionx/durable-streams";
14+
import { useScope } from "effection";
15+
import { computeSHA256 } from "./hash.ts";
16+
import { type DurableRuntime, DurableRuntimeCtx } from "./runtime.ts";
17+
18+
export interface GlobOptions {
19+
baseDir: string;
20+
include: string[];
21+
exclude?: string[];
22+
}
23+
24+
export interface GlobMatch {
25+
path: string;
26+
contentHash: string;
27+
}
28+
29+
export interface GlobResult {
30+
matches: GlobMatch[];
31+
scanHash: string;
32+
}
33+
34+
/**
35+
* Discover files matching patterns durably.
36+
*
37+
* Sorted matches with per-file hashes. Composite scanHash for
38+
* replay guard staleness detection.
39+
*/
40+
export function* durableGlob(
41+
name: string,
42+
options: GlobOptions,
43+
): Workflow<GlobResult> {
44+
const { baseDir, include, exclude = [] } = options;
45+
46+
return (yield createDurableOperation<Json>(
47+
{
48+
type: "glob",
49+
name,
50+
baseDir,
51+
include: include as Json,
52+
exclude: exclude as Json,
53+
},
54+
function* () {
55+
const scope = yield* useScope();
56+
const runtime = scope.expect<DurableRuntime>(DurableRuntimeCtx);
57+
58+
const entries = yield* runtime.glob({
59+
patterns: include,
60+
root: baseDir,
61+
exclude,
62+
});
63+
64+
const matches: GlobMatch[] = [];
65+
for (const entry of entries) {
66+
if (!entry.isFile) continue;
67+
const content = yield* runtime.readTextFile(`${baseDir}/${entry.path}`);
68+
const contentHash = yield* computeSHA256(content);
69+
matches.push({ path: entry.path, contentHash });
70+
}
71+
72+
matches.sort((a, b) => a.path.localeCompare(b.path));
73+
const seen = new Set<string>();
74+
const deduped = matches.filter((m) => {
75+
if (seen.has(m.path)) return false;
76+
seen.add(m.path);
77+
return true;
78+
});
79+
80+
const scanHash = yield* computeSHA256(JSON.stringify(deduped));
81+
return { matches: deduped, scanHash } as unknown as Json;
82+
},
83+
)) as GlobResult;
84+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Durable read file — persistent, replay-safe file read with content hash.
3+
*
4+
* Uses `createDurableOperation` from @effectionx/durable-streams.
5+
* During live execution, the operation runs and persists a Yield event.
6+
* During replay, the stored result is returned without executing.
7+
*/
8+
9+
import {
10+
type Json,
11+
type Workflow,
12+
createDurableOperation,
13+
} from "@effectionx/durable-streams";
14+
import { useScope } from "effection";
15+
import { computeSHA256 } from "./hash.ts";
16+
import { type DurableRuntime, DurableRuntimeCtx } from "./runtime.ts";
17+
18+
export interface ReadFileResult {
19+
content: string;
20+
contentHash: string;
21+
}
22+
23+
/**
24+
* Read a file durably.
25+
*
26+
* Path in description, content + SHA-256 hash in result.
27+
* Designed for replay guard integration.
28+
*
29+
* Note: `encoding` is recorded in the description for future use but
30+
* the current `DurableRuntime.readTextFile` always reads as UTF-8.
31+
* Non-default encodings will require a runtime interface extension.
32+
*/
33+
export function* durableReadFile(
34+
name: string,
35+
path: string,
36+
options?: { encoding?: string },
37+
): Workflow<ReadFileResult> {
38+
const encoding = options?.encoding ?? "utf-8";
39+
40+
return (yield createDurableOperation<Json>(
41+
{ type: "read_file", name, path, encoding },
42+
function* () {
43+
const scope = yield* useScope();
44+
const runtime = scope.expect<DurableRuntime>(DurableRuntimeCtx);
45+
46+
const content = yield* runtime.readTextFile(path);
47+
const contentHash = yield* computeSHA256(content);
48+
return { content, contentHash } as unknown as Json;
49+
},
50+
)) as ReadFileResult;
51+
}

0 commit comments

Comments
 (0)