Skip to content

Commit d472b8a

Browse files
committed
feat(bindgen): add support for stream reads with count > 1
This adds `readMany(count)` support to generated p3 streams so host-side byte consumers can perform Canonical ABI stream reads instead of pulling one byte at a time. It uses the transferred count from the packed stream result when reading host buffers.
1 parent 7f86b4a commit d472b8a

8 files changed

Lines changed: 221 additions & 28 deletions

File tree

crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -965,9 +965,13 @@ impl AsyncStreamIntrinsic {
965965
// fn (below) via an anonymous function.
966966
Self::StreamReadableEndClass => format!(
967967
r#"
968-
async read() {{
968+
async read(count = 1) {{
969969
{debug_log_fn}('[{end_class_name}#read()]');
970970
971+
if (this.#endOfStream) {{
972+
return {{ value: undefined, done: true }};
973+
}}
974+
971975
// Wait for an existing read operation to end, if present,
972976
// otherwise register this read for any future operations.
973977
//
@@ -995,7 +999,10 @@ impl AsyncStreamIntrinsic {
995999
// TODO(fix): when we do a read, we need to GET the string encoding from the
9961000
// other side, via the lift/lower fn?
9971001
998-
const count = 1;
1002+
if (!Number.isInteger(count) || count < 1) {{
1003+
throw new TypeError(`invalid stream read count [${{count}}]`);
1004+
}}
1005+
count = Math.min(count, {managed_buffer_class}.MAX_LENGTH);
9991006
try {{
10001007
const {{ id: bufferID, buffer }} = {global_buffer_manager}.createBuffer({{
10011008
componentIdx: -1, // componentIdx of -1 indicates the host
@@ -1050,10 +1057,21 @@ impl AsyncStreamIntrinsic {
10501057
}}
10511058
}}
10521059
1053-
const vs = buffer.read(count);
1054-
const res = count === 1 ? vs[0] : vs;
1055-
this.#result = null;
1056-
resolve(res);
1060+
const resultKind = packedResult & 0xF;
1061+
const transferred = packedResult >> 4;
1062+
1063+
if (resultKind === {stream_end_class}.CopyResult.DROPPED) {{
1064+
this.#endOfStream = true;
1065+
}}
1066+
1067+
if (transferred > 0) {{
1068+
const values = buffer.read(transferred);
1069+
this.#result = null;
1070+
resolve(count === 1 ? values[0] : values);
1071+
}} else {{
1072+
this.#result = null;
1073+
resolve(undefined);
1074+
}}
10571075
10581076
}} catch (err) {{
10591077
{debug_log_fn}('[{end_class_name}#read()] error', err);
@@ -1092,6 +1110,8 @@ impl AsyncStreamIntrinsic {
10921110
10931111
#result = null;
10941112
1113+
#endOfStream = false;
1114+
10951115
constructor(args) {{
10961116
{debug_log_fn}('[{end_class_name}#constructor()] args', args);
10971117
super(args);
@@ -1324,8 +1344,8 @@ impl AsyncStreamIntrinsic {
13241344
isReadable: streamEnd.isReadable(),
13251345
isWritable: streamEnd.isWritable(),
13261346
globalRep: this.#rep,
1327-
readFn: async () => {{
1328-
return await streamEnd.read();
1347+
readFn: async (count) => {{
1348+
return await streamEnd.read(count);
13291349
}},
13301350
writeFn: async (v) => {{
13311351
await streamEnd.write(v);
@@ -1394,6 +1414,12 @@ impl AsyncStreamIntrinsic {
13941414
return this.#readFn();
13951415
}}
13961416
1417+
async readMany(count) {{
1418+
{debug_log_fn}('[{external_stream_class_name}#readMany()]', {{ count }});
1419+
if (!this.#isReadable) {{ throw new Error("stream is not marked as readable and cannot be read from"); }}
1420+
return this.#readFn(count);
1421+
}}
1422+
13971423
async write() {{
13981424
{debug_log_fn}('[{external_stream_class_name}#write()]');
13991425
if (!this.#isWritable) {{ throw new Error("stream is not marked as writable and cannot be written to"); }}

packages/preview3-shim/lib/nodejs/cli.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import process from "node:process";
22
import { Readable } from "node:stream";
33

44
import { ResourceWorker } from "./workers/resource-worker.js";
5-
import { StreamReader, readableStreamFromIterator } from "./stream.js";
5+
import { StreamReader, readableByteStreamFromReader } from "./stream.js";
66
import { future } from "./future.js";
77

88
import { environment as environmentV2 } from "@bytecodealliance/preview2-shim/cli";
@@ -98,7 +98,7 @@ export const stdout = {
9898
* @returns {Promise<{tag: string, val?: string}>} Result of the write operation.
9999
*/
100100
async writeViaStream(streamReader) {
101-
const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]());
101+
const readableStream = readableByteStreamFromReader(streamReader, { name: "stdout stream" });
102102
try {
103103
await worker().run({ op: "stdout", stream: readableStream }, [readableStream]);
104104
return { tag: "ok", val: undefined };
@@ -119,7 +119,7 @@ export const stderr = {
119119
* @returns {Promise<{tag: string, val?: string}>} Result of the write operation.
120120
*/
121121
async writeViaStream(streamReader) {
122-
const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]());
122+
const readableStream = readableByteStreamFromReader(streamReader, { name: "stderr stream" });
123123
try {
124124
await worker().run({ op: "stderr", stream: readableStream }, [readableStream]);
125125
return { tag: "ok", val: undefined };

packages/preview3-shim/lib/nodejs/filesystem/descriptor.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import fs from "node:fs/promises";
22
import process from "node:process";
33

4-
import { StreamReader, readableStreamFromIterator } from "../stream.js";
4+
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
55
import { FutureReader } from "../future.js";
66
import { ResourceWorker } from "../workers/resource-worker.js";
77
import { earlyDispose, registerDispose } from "../finalization.js";
@@ -125,14 +125,14 @@ class Descriptor {
125125
* ```
126126
*
127127
* @async
128-
* @param {object} data A data source implementing `[Symbol.asyncIterator]()`.
128+
* @param {object} data A readable byte stream.
129129
* @param {bigint} offset The offset within the file.
130130
* @returns {Promise<void>}
131131
* @throws {FSError} `payload.tag` contains mapped WASI error code.
132132
*/
133133
async writeViaStream(data, offset) {
134134
this.#ensureHandle();
135-
const stream = readableStreamFromIterator(data[Symbol.asyncIterator]());
135+
const stream = readableByteStreamFromReader(data, { name: "file write data" });
136136

137137
try {
138138
await worker().run({ op: "write", fd: this.#handle.fd, offset, stream }, [stream]);
@@ -149,7 +149,7 @@ class Descriptor {
149149
* ```
150150
*
151151
* @async
152-
* @param {object} data A data source implementing `[Symbol.asyncIterator]()`.
152+
* @param {object} data A readable byte stream.
153153
* @returns {Promise<void>}
154154
* @throws {FSError} `payload.tag` contains mapped WASI error code.
155155
*/

packages/preview3-shim/lib/nodejs/http/client.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ResourceWorker } from "../workers/resource-worker.js";
2-
import { StreamReader, readableStreamFromIterator } from "../stream.js";
2+
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
33
import { FutureReader, future } from "../future.js";
44
import { _fieldsFromEntriesChecked } from "./fields.js";
55
import { HttpError } from "./error.js";
@@ -51,7 +51,7 @@ export const client = {
5151
const { port1: tx, port2: rx } = new MessageChannel();
5252

5353
const transfer = [rx];
54-
const stream = body ? readableStreamFromIterator(body[Symbol.asyncIterator]()) : undefined;
54+
const stream = body ? readableByteStreamFromReader(body, { name: "request body" }) : undefined;
5555
if (stream) {
5656
transfer.unshift(stream);
5757
}

packages/preview3-shim/lib/nodejs/http/server.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { EventEmitter } from "node:events";
22

33
import { ResourceWorker } from "../workers/resource-worker.js";
4-
import { StreamReader, readableStreamFromIterator } from "../stream.js";
4+
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
55
import { FutureReader, future } from "../future.js";
66
import { Request } from "./request.js";
77
import { Response } from "./response.js";
@@ -92,7 +92,7 @@ export class HttpServer extends EventEmitter {
9292
const [body, trailers] = Response.consumeBody(res, resRx);
9393
const { port1: tx, port2: rx } = new MessageChannel();
9494

95-
const stream = readableStreamFromIterator(body[Symbol.asyncIterator]());
95+
const stream = readableByteStreamFromReader(body, { name: "response body" });
9696

9797
// Send trailers when ready
9898
trailers

packages/preview3-shim/lib/nodejs/sockets/tcp.js

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StreamReader, readableStreamFromIterator } from "../stream.js";
1+
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
22
import { FutureReader } from "../future.js";
33
import { ResourceWorker } from "../workers/resource-worker.js";
44
import { SocketError } from "./error.js";
@@ -257,23 +257,26 @@ export class TcpSocket {
257257
* ```
258258
*
259259
* @async
260-
* @param {AsyncIterable<Uint8Array>} data - The data stream to send. Any
261-
* async-iterable yielding byte chunks (e.g. a `StreamReader`) is accepted.
260+
* @param {object} data - The byte stream to send.
262261
* @returns {Promise<void>}
263262
* @throws {SocketError} With payload.tag 'invalid-state' if socket is not CONNECTED
264-
* @throws {SocketError} With payload.tag 'invalid-argument' if `data` does not implement [Symbol.asyncIterator]
263+
* @throws {SocketError} With payload.tag 'invalid-argument' if `data` is not a readable byte stream
265264
* @throws {SocketError} for other errors, payload.tag maps the system error
266265
*/
267266
async send(data) {
268267
if (this.#state !== STATE.CONNECTED) {
269268
throw new SocketError("invalid-state");
270269
}
271-
if (data == null || data == undefined || typeof data[Symbol.asyncIterator] !== "function") {
272-
throw new SocketError("invalid-argument");
270+
let stream;
271+
try {
272+
stream = readableByteStreamFromReader(data, { name: "tcp send data" });
273+
} catch (error) {
274+
if (error instanceof TypeError) {
275+
throw new SocketError("invalid-argument", error.message, undefined, { cause: error });
276+
}
277+
throw error;
273278
}
274279

275-
const stream = readableStreamFromIterator(data[Symbol.asyncIterator]());
276-
277280
try {
278281
// Transfer the stream to the worker
279282
await worker().run(

packages/preview3-shim/lib/nodejs/stream.js

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,111 @@ export function readableStreamFromIterator(iterator, name = "iterator") {
3232
});
3333
}
3434

35+
export const DEFAULT_BYTE_STREAM_CHUNK_SIZE = 64 * 1024;
36+
37+
/**
38+
* Creates a transferable ReadableStream from a byte-stream reader.
39+
*
40+
* Generated p3 stream readers expose `readMany(count)`, which performs one
41+
* bounded Canonical ABI stream read for up to `count` bytes. Prefer that path
42+
* so host byte sinks consume practical chunks instead of one byte at a time.
43+
*
44+
* @param {object} reader - Reader that provides `readMany()`, `[Symbol.asyncIterator]()`, or `read()`.
45+
* @param {object} [opts={}] - Optional adapter settings.
46+
* @param {number} [opts.chunkSize=DEFAULT_BYTE_STREAM_CHUNK_SIZE] - Maximum bytes to request per `readMany()` call.
47+
* @param {string} [opts.name="stream reader"] - Reader name used in error messages.
48+
* @returns {ReadableStream<Uint8Array>} A transferable stream of byte chunks.
49+
*/
50+
export function readableByteStreamFromReader(reader, opts = {}) {
51+
const source = byteStreamSource(reader, opts);
52+
return new ReadableStream({
53+
async pull(controller) {
54+
const { done, value } = await source.read();
55+
if (done) {
56+
controller.close();
57+
return;
58+
}
59+
controller.enqueue(byteChunk(value));
60+
},
61+
cancel(reason) {
62+
return source.cancel?.(reason);
63+
},
64+
});
65+
}
66+
67+
function byteStreamSource(reader, opts) {
68+
const name = opts.name ?? "stream reader";
69+
70+
if (typeof reader?.readMany === "function") {
71+
const chunkSize = opts.chunkSize ?? DEFAULT_BYTE_STREAM_CHUNK_SIZE;
72+
if (!Number.isInteger(chunkSize) || chunkSize < 1) {
73+
throw new TypeError(`invalid byte stream chunk size [${chunkSize}]`);
74+
}
75+
return {
76+
async read() {
77+
const result = await reader.readMany(chunkSize);
78+
if (result == null || typeof result !== "object") {
79+
throw new TypeError("readMany() must return an iterator result");
80+
}
81+
return result;
82+
},
83+
};
84+
}
85+
if (typeof reader?.[Symbol.asyncIterator] === "function") {
86+
const iterator = reader[Symbol.asyncIterator]();
87+
return {
88+
read() {
89+
return iterator.next();
90+
},
91+
cancel(reason) {
92+
return iterator.return?.(reason);
93+
},
94+
};
95+
}
96+
if (typeof reader?.read === "function") {
97+
return {
98+
async read() {
99+
const value = await reader.read();
100+
return { value, done: value === null };
101+
},
102+
};
103+
}
104+
105+
throw new TypeError(`${name} must provide readMany(), [Symbol.asyncIterator](), or read()`);
106+
}
107+
108+
function byteChunk(value) {
109+
if (value instanceof Uint8Array) {
110+
return value;
111+
}
112+
if (value instanceof ArrayBuffer) {
113+
return new Uint8Array(value);
114+
}
115+
if (Array.isArray(value)) {
116+
for (const byte of value) {
117+
assertByte(byte);
118+
}
119+
return Uint8Array.from(value);
120+
}
121+
if (typeof value === "number") {
122+
return Uint8Array.of(assertByte(value));
123+
}
124+
if (typeof value === "string") {
125+
return new TextEncoder().encode(value);
126+
}
127+
128+
throw new TypeError(
129+
"byte stream chunk must be a byte, byte array, ArrayBuffer, Uint8Array, or string",
130+
);
131+
}
132+
133+
function assertByte(value) {
134+
if (!Number.isInteger(value) || value < 0 || value > 255) {
135+
throw new RangeError(`Invalid byte stream value: ${value}`);
136+
}
137+
return value;
138+
}
139+
35140
/**
36141
* Creates a bidirectional stream with separate reader and writer interfaces.
37142
*

0 commit comments

Comments
 (0)