Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions lib/internal/streams/iter/classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const {
const {
toAsyncStreamable: kToAsyncStreamable,
kValidatedSource,
kSyncWriteAccepted,
drainableProtocol,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -764,13 +765,41 @@ function toWritable(writer) {
const hasEndSync = hasEnd &&
typeof writer.endSync === 'function';
const hasFail = typeof writer.fail === 'function';
const hasSyncWriteAccepted =
typeof writer[kSyncWriteAccepted] === 'function';

// Try-sync-first pattern: attempt the synchronous method and
// fall back to the async method if it returns false (indicating
// the sync path was not accepted) or throws. When the sync path
// succeeds, the callback is deferred via queueMicrotask to
// preserve the async resolution contract that Writable internals
// expect from _write/_writev/_final callbacks.
function syncWriteAccepted() {
return hasSyncWriteAccepted && writer[kSyncWriteAccepted]();
}

function finishAfterSyncBackpressure(cb) {
let ondrain;
try {
if (typeof writer[drainableProtocol] === 'function') {
ondrain = writer[drainableProtocol]();
}
} catch (err) {
cb(err);
return;
}
if (ondrain !== null && ondrain !== undefined) {
PromisePrototypeThen(ondrain, (drained) => {
if (drained === false) {
cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer'));
return;
}
cb();
}, cb);
return;
}
queueMicrotask(cb);
}

// Try-sync-first pattern: attempt the synchronous method and fall back to the
// async method if it returns false without accepting the data, or if it
// throws. When the sync path succeeds, the callback is deferred via
// queueMicrotask to preserve the async resolution contract that Writable
// internals expect from _write/_writev/_final callbacks.

function _write(chunk, encoding, cb) {
const bytes = typeof chunk === 'string' ?
Expand All @@ -781,6 +810,11 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunk was accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
}
Expand All @@ -805,6 +839,11 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunks were accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
}
Expand Down
12 changes: 12 additions & 0 deletions lib/internal/streams/iter/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {

const {
drainableProtocol,
kSyncWriteAccepted,
kSyncWriteAcceptedOnFalse,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -545,11 +546,16 @@ class PushQueue {

class PushWriter {
#queue;
#syncWriteAccepted = false;

constructor(queue) {
this.#queue = queue;
}

[kSyncWriteAccepted]() {
return this.#syncWriteAccepted;
}

[drainableProtocol]() {
const desired = this.desiredSize;
if (desired === null) return null;
Expand Down Expand Up @@ -589,19 +595,23 @@ class PushWriter {
}

writeSync(chunk) {
this.#syncWriteAccepted = false;
const bytes = toUint8Array(chunk);
const result = this.#queue.writeSync([bytes]);
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
// Block policy: force-enqueue and return false as backpressure signal.
// Data IS accepted; false tells caller to slow down.
this.#queue.forceEnqueue([bytes]);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
}

writevSync(chunks) {
this.#syncWriteAccepted = false;
if (!ArrayIsArray(chunks)) {
throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
}
Expand All @@ -610,8 +620,10 @@ class PushWriter {
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
this.#queue.forceEnqueue(bytes);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
}

Expand Down
13 changes: 13 additions & 0 deletions lib/internal/streams/iter/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,24 @@ const kValidatedTransform = Symbol('kValidatedTransform');
*/
const kValidatedSource = Symbol('kValidatedSource');

/**
* Internal sentinel for writers whose sync write methods can return false
* after accepting data as a backpressure signal.
*/
const kSyncWriteAccepted = Symbol('kSyncWriteAccepted');

/**
* Internal sentinel for writers whose sync write methods may return false
* after accepting data when backpressure is applied. Such writers must expose
* desiredSize so callers can distinguish accepted backpressure from a sync
* write that was not performed.
*/
const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');

module.exports = {
broadcastProtocol,
drainableProtocol,
kSyncWriteAccepted,
kSyncWriteAcceptedOnFalse,
kValidatedSource,
kValidatedTransform,
Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-stream-iter-writable-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,53 @@ async function testRoundTrip() {
assert.strictEqual(result, data);
}

// =============================================================================
// PushWriter writeSync false accepted as backpressure is not retried
// =============================================================================

async function testPushWriterBlockBackpressureNoDuplicate() {
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
const writable = toWritable(writer);

await new Promise((resolve, reject) => {
writable.write('a', (err) => {
if (err) reject(err);
else resolve();
});
});

writable.write('b');
writable.end();

const result = await text(readable);
assert.strictEqual(result, 'ab');
}

// =============================================================================
// PushWriter writevSync false accepted as backpressure is not retried
// =============================================================================

async function testPushWriterBlockBackpressureWritevNoDuplicate() {
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
const writable = toWritable(writer);

await new Promise((resolve, reject) => {
writable.write('a', (err) => {
if (err) reject(err);
else resolve();
});
});

writable.cork();
writable.write('b');
writable.write('c');
writable.uncork();
writable.end();

const result = await text(readable);
assert.strictEqual(result, 'abc');
}

// =============================================================================
// Multiple sequential writes
// =============================================================================
Expand Down Expand Up @@ -590,6 +637,8 @@ Promise.all([
testWriteThrowsSyncPropagation(),
testEndThrowsSyncPropagation(),
testRoundTrip(),
testPushWriterBlockBackpressureNoDuplicate(),
testPushWriterBlockBackpressureWritevNoDuplicate(),
testSequentialWrites(),
testSyncCallbackDeferred(),
testMinimalWriter(),
Expand Down
Loading