Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,13 @@ impl AsyncStreamIntrinsic {
// fn (below) via an anonymous function.
Self::StreamReadableEndClass => format!(
r#"
async read() {{
async read(count = 1) {{
{debug_log_fn}('[{end_class_name}#read()]');

if (this.#endOfStream) {{
return {{ value: undefined, done: true }};
}}

// Wait for an existing read operation to end, if present,
// otherwise register this read for any future operations.
//
Expand Down Expand Up @@ -995,7 +999,10 @@ impl AsyncStreamIntrinsic {
// TODO(fix): when we do a read, we need to GET the string encoding from the
// other side, via the lift/lower fn?

const count = 1;
if (!Number.isInteger(count) || count < 1) {{
throw new TypeError(`invalid stream read count [${{count}}]`);
}}
count = Math.min(count, {managed_buffer_class}.MAX_LENGTH);
try {{
const {{ id: bufferID, buffer }} = {global_buffer_manager}.createBuffer({{
componentIdx: -1, // componentIdx of -1 indicates the host
Expand Down Expand Up @@ -1050,11 +1057,23 @@ impl AsyncStreamIntrinsic {
}}
}}

const vs = buffer.read(count);
const {{ typedArray }} = this.#elemMeta;
const res = typedArray === undefined || vs.length === 0 ? count === 1 ? vs[0] : vs : new typedArray(vs);
this.#result = null;
resolve(res);
const resultKind = packedResult & 0xF;
const transferred = packedResult >> 4;

if (resultKind === {stream_end_class}.CopyResult.DROPPED) {{
this.#endOfStream = true;
}}

if (transferred > 0) {{
const values = buffer.read(transferred);
const {{ typedArray }} = this.#elemMeta;
const value = typedArray === undefined ? count === 1 ? values[0] : values : new typedArray(values);
this.#result = null;
resolve(value);
}} else {{
this.#result = null;
resolve(undefined);
}}

}} catch (err) {{
{debug_log_fn}('[{end_class_name}#read()] error', err);
Expand Down Expand Up @@ -1093,6 +1112,8 @@ impl AsyncStreamIntrinsic {

#result = null;

#endOfStream = false;

constructor(args) {{
{debug_log_fn}('[{end_class_name}#constructor()] args', args);
super(args);
Expand Down Expand Up @@ -1325,8 +1346,8 @@ impl AsyncStreamIntrinsic {
isReadable: streamEnd.isReadable(),
isWritable: streamEnd.isWritable(),
globalRep: this.#rep,
readFn: async () => {{
return await streamEnd.read();
readFn: async (count) => {{
return await streamEnd.read(count);
}},
writeFn: async (v) => {{
await streamEnd.write(v);
Expand Down Expand Up @@ -1391,8 +1412,19 @@ impl AsyncStreamIntrinsic {

async next() {{
{debug_log_fn}('[{external_stream_class_name}#next()]');
if (!this.#isReadable) {{ throw new Error("stream is not marked as readable and cannot be written from"); }}
return this.#readFn();
return this.read();
}}

async read(opts) {{
{debug_log_fn}('[{external_stream_class_name}#read()]', {{ opts }});
if (!this.#isReadable) {{ throw new Error("stream is not marked as readable and cannot be read from"); }}
return this.#readFn(this.#readCount(opts));
}}

#readCount(opts) {{
const count = opts === undefined ? 1 : typeof opts === "number" ? opts : opts && typeof opts === "object" ? opts.count ?? 1 : undefined;
if (!Number.isInteger(count) || count < 1) {{ throw new TypeError(`invalid stream read count [${{count}}]`); }}
return count;
}}

async write() {{
Expand Down
6 changes: 3 additions & 3 deletions packages/preview3-shim/lib/nodejs/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import process from "node:process";
import { Readable } from "node:stream";

import { ResourceWorker } from "./workers/resource-worker.js";
import { StreamReader, readableStreamFromIterator } from "./stream.js";
import { StreamReader, readableByteStreamFromReader } from "./stream.js";
import { future } from "./future.js";

import { environment as environmentV2 } from "@bytecodealliance/preview2-shim/cli";
Expand Down Expand Up @@ -98,7 +98,7 @@ export const stdout = {
* @returns {Promise<{tag: string, val?: string}>} Result of the write operation.
*/
async writeViaStream(streamReader) {
const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]());
const readableStream = readableByteStreamFromReader(streamReader, { name: "stdout stream" });
try {
await worker().run({ op: "stdout", stream: readableStream }, [readableStream]);
return { tag: "ok", val: undefined };
Expand All @@ -119,7 +119,7 @@ export const stderr = {
* @returns {Promise<{tag: string, val?: string}>} Result of the write operation.
*/
async writeViaStream(streamReader) {
const readableStream = readableStreamFromIterator(streamReader[Symbol.asyncIterator]());
const readableStream = readableByteStreamFromReader(streamReader, { name: "stderr stream" });
try {
await worker().run({ op: "stderr", stream: readableStream }, [readableStream]);
return { tag: "ok", val: undefined };
Expand Down
8 changes: 4 additions & 4 deletions packages/preview3-shim/lib/nodejs/filesystem/descriptor.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import process from "node:process";

import { StreamReader, readableStreamFromIterator } from "../stream.js";
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
import { FutureReader } from "../future.js";
import { ResourceWorker } from "../workers/resource-worker.js";
import { earlyDispose, registerDispose } from "../finalization.js";
Expand Down Expand Up @@ -125,14 +125,14 @@ class Descriptor {
* ```
*
* @async
* @param {object} data A data source implementing `[Symbol.asyncIterator]()`.
* @param {object} data A readable byte stream.
* @param {bigint} offset The offset within the file.
* @returns {Promise<void>}
* @throws {FSError} `payload.tag` contains mapped WASI error code.
*/
async writeViaStream(data, offset) {
this.#ensureHandle();
const stream = readableStreamFromIterator(data[Symbol.asyncIterator]());
const stream = readableByteStreamFromReader(data, { name: "file write data" });

try {
await worker().run({ op: "write", fd: this.#handle.fd, offset, stream }, [stream]);
Expand All @@ -149,7 +149,7 @@ class Descriptor {
* ```
*
* @async
* @param {object} data A data source implementing `[Symbol.asyncIterator]()`.
* @param {object} data A readable byte stream.
* @returns {Promise<void>}
* @throws {FSError} `payload.tag` contains mapped WASI error code.
*/
Expand Down
4 changes: 2 additions & 2 deletions packages/preview3-shim/lib/nodejs/http/client.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ResourceWorker } from "../workers/resource-worker.js";
import { StreamReader, readableStreamFromIterator } from "../stream.js";
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
import { FutureReader, future } from "../future.js";
import { _fieldsFromEntriesChecked } from "./fields.js";
import { HttpError } from "./error.js";
Expand Down Expand Up @@ -51,7 +51,7 @@ export const client = {
const { port1: tx, port2: rx } = new MessageChannel();

const transfer = [rx];
const stream = body ? readableStreamFromIterator(body[Symbol.asyncIterator]()) : undefined;
const stream = body ? readableByteStreamFromReader(body, { name: "request body" }) : undefined;
if (stream) {
transfer.unshift(stream);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/preview3-shim/lib/nodejs/http/server.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from "node:events";

import { ResourceWorker } from "../workers/resource-worker.js";
import { StreamReader, readableStreamFromIterator } from "../stream.js";
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
import { FutureReader, future } from "../future.js";
import { Request } from "./request.js";
import { Response } from "./response.js";
Expand Down Expand Up @@ -92,7 +92,7 @@ export class HttpServer extends EventEmitter {
const [body, trailers] = Response.consumeBody(res, resRx);
const { port1: tx, port2: rx } = new MessageChannel();

const stream = readableStreamFromIterator(body[Symbol.asyncIterator]());
const stream = readableByteStreamFromReader(body, { name: "response body" });

// Send trailers when ready
trailers
Expand Down
19 changes: 11 additions & 8 deletions packages/preview3-shim/lib/nodejs/sockets/tcp.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { StreamReader, readableStreamFromIterator } from "../stream.js";
import { StreamReader, readableByteStreamFromReader } from "../stream.js";
import { FutureReader } from "../future.js";
import { ResourceWorker } from "../workers/resource-worker.js";
import { SocketError } from "./error.js";
Expand Down Expand Up @@ -257,23 +257,26 @@ export class TcpSocket {
* ```
*
* @async
* @param {AsyncIterable<Uint8Array>} data - The data stream to send. Any
* async-iterable yielding byte chunks (e.g. a `StreamReader`) is accepted.
* @param {object} data - The byte stream to send.
* @returns {Promise<void>}
* @throws {SocketError} With payload.tag 'invalid-state' if socket is not CONNECTED
* @throws {SocketError} With payload.tag 'invalid-argument' if `data` does not implement [Symbol.asyncIterator]
* @throws {SocketError} With payload.tag 'invalid-argument' if `data` is not a readable byte stream
* @throws {SocketError} for other errors, payload.tag maps the system error
*/
async send(data) {
if (this.#state !== STATE.CONNECTED) {
throw new SocketError("invalid-state");
}
if (data == null || data == undefined || typeof data[Symbol.asyncIterator] !== "function") {
throw new SocketError("invalid-argument");
let stream;
try {
stream = readableByteStreamFromReader(data, { name: "tcp send data" });
} catch (error) {
if (error instanceof TypeError) {
throw new SocketError("invalid-argument", error.message, undefined, { cause: error });
}
throw error;
}

const stream = readableStreamFromIterator(data[Symbol.asyncIterator]());

try {
// Transfer the stream to the worker
await worker().run(
Expand Down
102 changes: 102 additions & 0 deletions packages/preview3-shim/lib/nodejs/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,108 @@ export function readableStreamFromIterator(iterator, name = "iterator") {
});
}

export const DEFAULT_BYTE_STREAM_CHUNK_SIZE = 64 * 1024;

/**
* Creates a transferable ReadableStream from a byte-stream reader.
*
* Generated p3 stream readers expose `read({ count })`, which performs one
* bounded Canonical ABI stream read for up to `count` bytes. Prefer that path so
* host byte sinks consume practical chunks instead of one byte at a time.
*
* @param {object} reader - Reader that provides `read()`, or `[Symbol.asyncIterator]()`.
* @param {object} [opts={}] - Optional adapter settings.
* @param {number} [opts.chunkSize=DEFAULT_BYTE_STREAM_CHUNK_SIZE] - Maximum bytes to request per generated `read()` call.
* @param {string} [opts.name="stream reader"] - Reader name used in error messages.
* @returns {ReadableStream<Uint8Array>} A transferable stream of byte chunks.
*/
export function readableByteStreamFromReader(reader, opts = {}) {
const source = byteStreamSource(reader, opts);
return new ReadableStream({
async pull(controller) {
const { done, value } = await source.read();
if (done) {
controller.close();
return;
}
controller.enqueue(byteChunk(value));
},
cancel(reason) {
return source.cancel?.(reason);
},
});
}

function byteStreamSource(reader, opts) {
const name = opts.name ?? "stream reader";
const chunkSize = opts.chunkSize ?? DEFAULT_BYTE_STREAM_CHUNK_SIZE;
if (!Number.isInteger(chunkSize) || chunkSize < 1) {
throw new TypeError(`invalid byte stream chunk size [${chunkSize}]`);
}

if (typeof reader?.read === "function") {
return {
async read() {
const result = await reader.read({ count: chunkSize });
if (isIteratorResult(result)) {
return result;
}
return { value: result, done: result === null };
},
};
}

if (typeof reader?.[Symbol.asyncIterator] === "function") {
const iterator = reader[Symbol.asyncIterator]();
return {
read() {
return iterator.next();
},
cancel(reason) {
return iterator.return?.(reason);
},
};
}

throw new TypeError(`${name} must provide read() or [Symbol.asyncIterator]()`);
}

function isIteratorResult(value) {
return value != null && typeof value === "object" && typeof value.done === "boolean";
}

function byteChunk(value) {
if (value instanceof Uint8Array) {
return value;
}
if (value instanceof ArrayBuffer) {
return new Uint8Array(value);
}
if (Array.isArray(value)) {
for (const byte of value) {
assertByte(byte);
}
return Uint8Array.from(value);
}
if (typeof value === "number") {
return Uint8Array.of(assertByte(value));
}
if (typeof value === "string") {
return new TextEncoder().encode(value);
}

throw new TypeError(
"byte stream chunk must be a byte, byte array, ArrayBuffer, Uint8Array, or string",
);
}

function assertByte(value) {
if (!Number.isInteger(value) || value < 0 || value > 255) {
throw new RangeError(`Invalid byte stream value: ${value}`);
}
return value;
}

/**
* Creates a bidirectional stream with separate reader and writer interfaces.
*
Expand Down
Loading
Loading