From 6a4d3c20d626e4f7ef2c31a98da3500e07be53d5 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Mon, 4 May 2026 15:17:18 +0200 Subject: [PATCH 1/2] feat(bindgen): add support for stream reads with count > 1 This adds `readMany(count)` support to generated p3 streams so host-side byte consumers can perform Canonical ABI stream reads instead of pulling one byte at a time. It uses the transferred count from the packed stream result when reading host buffers. --- .../src/intrinsics/p3/async_stream.rs | 54 ++++++++-- packages/preview3-shim/lib/nodejs/cli.js | 6 +- .../lib/nodejs/filesystem/descriptor.js | 8 +- .../preview3-shim/lib/nodejs/http/client.js | 4 +- .../preview3-shim/lib/nodejs/http/server.js | 4 +- .../preview3-shim/lib/nodejs/sockets/tcp.js | 19 ++-- packages/preview3-shim/lib/nodejs/stream.js | 102 ++++++++++++++++++ packages/preview3-shim/test/stream.test.js | 61 ++++++++++- 8 files changed, 227 insertions(+), 31 deletions(-) diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index 875899edb..44deaabb3 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -965,9 +965,13 @@ impl AsyncStreamIntrinsic { // fn (below) via an anonymous function. Self::StreamReadableEndClass => format!( r#" - async read() {{ + async read(count = 1) {{ {debug_log_fn}('[{end_class_name}#read()]'); + if (this.#endOfStream) {{ + return {{ value: undefined, done: true }}; + }} + // Wait for an existing read operation to end, if present, // otherwise register this read for any future operations. // @@ -995,7 +999,10 @@ impl AsyncStreamIntrinsic { // TODO(fix): when we do a read, we need to GET the string encoding from the // other side, via the lift/lower fn? - const count = 1; + if (!Number.isInteger(count) || count < 1) {{ + throw new TypeError(`invalid stream read count [${{count}}]`); + }} + count = Math.min(count, {managed_buffer_class}.MAX_LENGTH); try {{ const {{ id: bufferID, buffer }} = {global_buffer_manager}.createBuffer({{ componentIdx: -1, // componentIdx of -1 indicates the host @@ -1050,11 +1057,23 @@ impl AsyncStreamIntrinsic { }} }} - const vs = buffer.read(count); - const {{ typedArray }} = this.#elemMeta; - const res = typedArray === undefined || vs.length === 0 ? count === 1 ? vs[0] : vs : new typedArray(vs); - this.#result = null; - resolve(res); + const resultKind = packedResult & 0xF; + const transferred = packedResult >> 4; + + if (resultKind === {stream_end_class}.CopyResult.DROPPED) {{ + this.#endOfStream = true; + }} + + if (transferred > 0) {{ + const values = buffer.read(transferred); + const {{ typedArray }} = this.#elemMeta; + const value = typedArray === undefined ? count === 1 ? values[0] : values : new typedArray(values); + this.#result = null; + resolve(value); + }} else {{ + this.#result = null; + resolve(undefined); + }} }} catch (err) {{ {debug_log_fn}('[{end_class_name}#read()] error', err); @@ -1093,6 +1112,8 @@ impl AsyncStreamIntrinsic { #result = null; + #endOfStream = false; + constructor(args) {{ {debug_log_fn}('[{end_class_name}#constructor()] args', args); super(args); @@ -1325,8 +1346,8 @@ impl AsyncStreamIntrinsic { isReadable: streamEnd.isReadable(), isWritable: streamEnd.isWritable(), globalRep: this.#rep, - readFn: async () => {{ - return await streamEnd.read(); + readFn: async (count) => {{ + return await streamEnd.read(count); }}, writeFn: async (v) => {{ await streamEnd.write(v); @@ -1391,8 +1412,19 @@ impl AsyncStreamIntrinsic { async next() {{ {debug_log_fn}('[{external_stream_class_name}#next()]'); - if (!this.#isReadable) {{ throw new Error("stream is not marked as readable and cannot be written from"); }} - return this.#readFn(); + return this.read(); + }} + + async read(opts) {{ + {debug_log_fn}('[{external_stream_class_name}#read()]', {{ opts }}); + if (!this.#isReadable) {{ throw new Error("stream is not marked as readable and cannot be read from"); }} + return this.#readFn(this.#readCount(opts)); + }} + + #readCount(opts) {{ + const count = opts === undefined ? 1 : typeof opts === "number" ? opts : opts && typeof opts === "object" ? opts.count ?? 1 : undefined; + if (!Number.isInteger(count) || count < 1) {{ throw new TypeError(`invalid stream read count [${{count}}]`); }} + return count; }} async write() {{ diff --git a/packages/preview3-shim/lib/nodejs/cli.js b/packages/preview3-shim/lib/nodejs/cli.js index aebe7f046..a580531f8 100644 --- a/packages/preview3-shim/lib/nodejs/cli.js +++ b/packages/preview3-shim/lib/nodejs/cli.js @@ -2,7 +2,7 @@ import process from "node:process"; import { Readable } from "node:stream"; import { ResourceWorker } from "./workers/resource-worker.js"; -import { StreamReader, readableStreamFromIterator } from "./stream.js"; +import { StreamReader, readableByteStreamFromReader } from "./stream.js"; import { future } from "./future.js"; import { environment as environmentV2 } from "@bytecodealliance/preview2-shim/cli"; @@ -98,7 +98,7 @@ export const stdout = { * @returns {Promise<{tag: string, val?: string}>} Result of the write operation. */ async writeViaStream(streamReader) { - const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]()); + const readableStream = readableByteStreamFromReader(streamReader, { name: "stdout stream" }); try { await worker().run({ op: "stdout", stream: readableStream }, [readableStream]); return { tag: "ok", val: undefined }; @@ -119,7 +119,7 @@ export const stderr = { * @returns {Promise<{tag: string, val?: string}>} Result of the write operation. */ async writeViaStream(streamReader) { - const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]()); + const readableStream = readableByteStreamFromReader(streamReader, { name: "stderr stream" }); try { await worker().run({ op: "stderr", stream: readableStream }, [readableStream]); return { tag: "ok", val: undefined }; diff --git a/packages/preview3-shim/lib/nodejs/filesystem/descriptor.js b/packages/preview3-shim/lib/nodejs/filesystem/descriptor.js index dda457978..042f718fc 100644 --- a/packages/preview3-shim/lib/nodejs/filesystem/descriptor.js +++ b/packages/preview3-shim/lib/nodejs/filesystem/descriptor.js @@ -1,7 +1,7 @@ import fs from "node:fs/promises"; import process from "node:process"; -import { StreamReader, readableStreamFromIterator } from "../stream.js"; +import { StreamReader, readableByteStreamFromReader } from "../stream.js"; import { FutureReader } from "../future.js"; import { ResourceWorker } from "../workers/resource-worker.js"; import { earlyDispose, registerDispose } from "../finalization.js"; @@ -125,14 +125,14 @@ class Descriptor { * ``` * * @async - * @param {object} data A data source implementing `[Symbol.asyncIterator]()`. + * @param {object} data A readable byte stream. * @param {bigint} offset The offset within the file. * @returns {Promise} * @throws {FSError} `payload.tag` contains mapped WASI error code. */ async writeViaStream(data, offset) { this.#ensureHandle(); - const stream = readableStreamFromIterator(data[Symbol.asyncIterator]()); + const stream = readableByteStreamFromReader(data, { name: "file write data" }); try { await worker().run({ op: "write", fd: this.#handle.fd, offset, stream }, [stream]); @@ -149,7 +149,7 @@ class Descriptor { * ``` * * @async - * @param {object} data A data source implementing `[Symbol.asyncIterator]()`. + * @param {object} data A readable byte stream. * @returns {Promise} * @throws {FSError} `payload.tag` contains mapped WASI error code. */ diff --git a/packages/preview3-shim/lib/nodejs/http/client.js b/packages/preview3-shim/lib/nodejs/http/client.js index 0abf4dfda..485505503 100644 --- a/packages/preview3-shim/lib/nodejs/http/client.js +++ b/packages/preview3-shim/lib/nodejs/http/client.js @@ -1,5 +1,5 @@ import { ResourceWorker } from "../workers/resource-worker.js"; -import { StreamReader, readableStreamFromIterator } from "../stream.js"; +import { StreamReader, readableByteStreamFromReader } from "../stream.js"; import { FutureReader, future } from "../future.js"; import { _fieldsFromEntriesChecked } from "./fields.js"; import { HttpError } from "./error.js"; @@ -51,7 +51,7 @@ export const client = { const { port1: tx, port2: rx } = new MessageChannel(); const transfer = [rx]; - const stream = body ? readableStreamFromIterator(body[Symbol.asyncIterator]()) : undefined; + const stream = body ? readableByteStreamFromReader(body, { name: "request body" }) : undefined; if (stream) { transfer.unshift(stream); } diff --git a/packages/preview3-shim/lib/nodejs/http/server.js b/packages/preview3-shim/lib/nodejs/http/server.js index e303c8ad4..b4d5ff8dd 100644 --- a/packages/preview3-shim/lib/nodejs/http/server.js +++ b/packages/preview3-shim/lib/nodejs/http/server.js @@ -1,7 +1,7 @@ import { EventEmitter } from "node:events"; import { ResourceWorker } from "../workers/resource-worker.js"; -import { StreamReader, readableStreamFromIterator } from "../stream.js"; +import { StreamReader, readableByteStreamFromReader } from "../stream.js"; import { FutureReader, future } from "../future.js"; import { Request } from "./request.js"; import { Response } from "./response.js"; @@ -92,7 +92,7 @@ export class HttpServer extends EventEmitter { const [body, trailers] = Response.consumeBody(res, resRx); const { port1: tx, port2: rx } = new MessageChannel(); - const stream = readableStreamFromIterator(body[Symbol.asyncIterator]()); + const stream = readableByteStreamFromReader(body, { name: "response body" }); // Send trailers when ready trailers diff --git a/packages/preview3-shim/lib/nodejs/sockets/tcp.js b/packages/preview3-shim/lib/nodejs/sockets/tcp.js index 167a5d072..b25fddfde 100644 --- a/packages/preview3-shim/lib/nodejs/sockets/tcp.js +++ b/packages/preview3-shim/lib/nodejs/sockets/tcp.js @@ -1,4 +1,4 @@ -import { StreamReader, readableStreamFromIterator } from "../stream.js"; +import { StreamReader, readableByteStreamFromReader } from "../stream.js"; import { FutureReader } from "../future.js"; import { ResourceWorker } from "../workers/resource-worker.js"; import { SocketError } from "./error.js"; @@ -257,23 +257,26 @@ export class TcpSocket { * ``` * * @async - * @param {AsyncIterable} data - The data stream to send. Any - * async-iterable yielding byte chunks (e.g. a `StreamReader`) is accepted. + * @param {object} data - The byte stream to send. * @returns {Promise} * @throws {SocketError} With payload.tag 'invalid-state' if socket is not CONNECTED - * @throws {SocketError} With payload.tag 'invalid-argument' if `data` does not implement [Symbol.asyncIterator] + * @throws {SocketError} With payload.tag 'invalid-argument' if `data` is not a readable byte stream * @throws {SocketError} for other errors, payload.tag maps the system error */ async send(data) { if (this.#state !== STATE.CONNECTED) { throw new SocketError("invalid-state"); } - if (data == null || data == undefined || typeof data[Symbol.asyncIterator] !== "function") { - throw new SocketError("invalid-argument"); + let stream; + try { + stream = readableByteStreamFromReader(data, { name: "tcp send data" }); + } catch (error) { + if (error instanceof TypeError) { + throw new SocketError("invalid-argument", error.message, undefined, { cause: error }); + } + throw error; } - const stream = readableStreamFromIterator(data[Symbol.asyncIterator]()); - try { // Transfer the stream to the worker await worker().run( diff --git a/packages/preview3-shim/lib/nodejs/stream.js b/packages/preview3-shim/lib/nodejs/stream.js index c8909651d..b7c37c68b 100644 --- a/packages/preview3-shim/lib/nodejs/stream.js +++ b/packages/preview3-shim/lib/nodejs/stream.js @@ -32,6 +32,108 @@ export function readableStreamFromIterator(iterator, name = "iterator") { }); } +export const DEFAULT_BYTE_STREAM_CHUNK_SIZE = 64 * 1024; + +/** + * Creates a transferable ReadableStream from a byte-stream reader. + * + * Generated p3 stream readers expose `read({ count })`, which performs one + * bounded Canonical ABI stream read for up to `count` bytes. Prefer that path so + * host byte sinks consume practical chunks instead of one byte at a time. + * + * @param {object} reader - Reader that provides `read()`, or `[Symbol.asyncIterator]()`. + * @param {object} [opts={}] - Optional adapter settings. + * @param {number} [opts.chunkSize=DEFAULT_BYTE_STREAM_CHUNK_SIZE] - Maximum bytes to request per generated `read()` call. + * @param {string} [opts.name="stream reader"] - Reader name used in error messages. + * @returns {ReadableStream} A transferable stream of byte chunks. + */ +export function readableByteStreamFromReader(reader, opts = {}) { + const source = byteStreamSource(reader, opts); + return new ReadableStream({ + async pull(controller) { + const { done, value } = await source.read(); + if (done) { + controller.close(); + return; + } + controller.enqueue(byteChunk(value)); + }, + cancel(reason) { + return source.cancel?.(reason); + }, + }); +} + +function byteStreamSource(reader, opts) { + const name = opts.name ?? "stream reader"; + const chunkSize = opts.chunkSize ?? DEFAULT_BYTE_STREAM_CHUNK_SIZE; + if (!Number.isInteger(chunkSize) || chunkSize < 1) { + throw new TypeError(`invalid byte stream chunk size [${chunkSize}]`); + } + + if (typeof reader?.read === "function") { + return { + async read() { + const result = await reader.read({ count: chunkSize }); + if (isIteratorResult(result)) { + return result; + } + return { value: result, done: result === null }; + }, + }; + } + + if (typeof reader?.[Symbol.asyncIterator] === "function") { + const iterator = reader[Symbol.asyncIterator](); + return { + read() { + return iterator.next(); + }, + cancel(reason) { + return iterator.return?.(reason); + }, + }; + } + + throw new TypeError(`${name} must provide read() or [Symbol.asyncIterator]()`); +} + +function isIteratorResult(value) { + return value != null && typeof value === "object" && typeof value.done === "boolean"; +} + +function byteChunk(value) { + if (value instanceof Uint8Array) { + return value; + } + if (value instanceof ArrayBuffer) { + return new Uint8Array(value); + } + if (Array.isArray(value)) { + for (const byte of value) { + assertByte(byte); + } + return Uint8Array.from(value); + } + if (typeof value === "number") { + return Uint8Array.of(assertByte(value)); + } + if (typeof value === "string") { + return new TextEncoder().encode(value); + } + + throw new TypeError( + "byte stream chunk must be a byte, byte array, ArrayBuffer, Uint8Array, or string", + ); +} + +function assertByte(value) { + if (!Number.isInteger(value) || value < 0 || value > 255) { + throw new RangeError(`Invalid byte stream value: ${value}`); + } + return value; +} + /** * Creates a bidirectional stream with separate reader and writer interfaces. * diff --git a/packages/preview3-shim/test/stream.test.js b/packages/preview3-shim/test/stream.test.js index cff5e322d..fd86f05b8 100644 --- a/packages/preview3-shim/test/stream.test.js +++ b/packages/preview3-shim/test/stream.test.js @@ -1,6 +1,6 @@ import { describe, test, expect } from "vitest"; -const { StreamReader, StreamWriter, stream } = +const { StreamReader, StreamWriter, stream, readableByteStreamFromReader } = await import("@bytecodealliance/preview3-shim/stream"); describe("Node.js Preview3 canon stream reader", () => { @@ -278,3 +278,62 @@ describe("stream() helper", () => { expect(done).toBeNull(); }); }); + +describe("readableByteStreamFromReader()", () => { + test("uses read options and normalizes byte arrays", async () => { + const calls = []; + const source = { + async read(options) { + calls.push(options); + if (calls.length === 1) { + return { value: [65, 66, 67], done: false }; + } + return { value: undefined, done: true }; + }, + [Symbol.asyncIterator]() { + throw new Error("read should be used when available"); + }, + }; + + const reader = readableByteStreamFromReader(source, { chunkSize: 3 }).getReader(); + const first = await reader.read(); + const second = await reader.read(); + + expect(calls).toEqual([{ count: 3 }, { count: 3 }]); + expect(first.done).toBe(false); + expect(first.value).toBeInstanceOf(Uint8Array); + expect([...first.value]).toEqual([65, 66, 67]); + expect(second.done).toBe(true); + }); + + test("falls back to read() readers", async () => { + const values = [72, 105, null]; + const source = { + async read() { + return values.shift(); + }, + }; + + const reader = readableByteStreamFromReader(source).getReader(); + + const first = await reader.read(); + const second = await reader.read(); + const done = await reader.read(); + + expect([...first.value]).toEqual([72]); + expect([...second.value]).toEqual([105]); + expect(done.done).toBe(true); + }); + + test("rejects invalid byte values", async () => { + const source = { + async read() { + return { value: [256], done: false }; + }, + }; + + const reader = readableByteStreamFromReader(source).getReader(); + + await expect(reader.read()).rejects.toThrow("Invalid byte stream value: 256"); + }); +}); From af561c2ee9bcb64e8f4a9eaa03c0fcc729c5ee5c Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Fri, 8 May 2026 13:48:44 +0200 Subject: [PATCH 2/2] fix(p3-shim): cache text encoder for byte stream --- packages/preview3-shim/lib/nodejs/stream.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/preview3-shim/lib/nodejs/stream.js b/packages/preview3-shim/lib/nodejs/stream.js index b7c37c68b..5d433251e 100644 --- a/packages/preview3-shim/lib/nodejs/stream.js +++ b/packages/preview3-shim/lib/nodejs/stream.js @@ -1,3 +1,10 @@ +export const DEFAULT_BYTE_STREAM_CHUNK_SIZE = 64 * 1024; + +let BYTE_STREAM_ENCODER = null; +function encoder() { + return (BYTE_STREAM_ENCODER ??= new TextEncoder()); +} + /** * Creates a transferable ReadableStream from an async iterator. * @@ -32,8 +39,6 @@ export function readableStreamFromIterator(iterator, name = "iterator") { }); } -export const DEFAULT_BYTE_STREAM_CHUNK_SIZE = 64 * 1024; - /** * Creates a transferable ReadableStream from a byte-stream reader. * @@ -119,7 +124,7 @@ function byteChunk(value) { return Uint8Array.of(assertByte(value)); } if (typeof value === "string") { - return new TextEncoder().encode(value); + return encoder().encode(value); } throw new TypeError(