diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 7b6fc3525d122f..e6a404729d6e0b 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -686,6 +686,10 @@ function wireBroadcastWriteSignal(entry, signal, resolve, reject, self) { signal.addEventListener('abort', onAbort, { __proto__: null, once: true }); } +function onBroadcastCancel(broadcastImpl, signal) { + onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason)); +} + // ============================================================================= // Public API // ============================================================================= @@ -720,7 +724,7 @@ function broadcast(options = { __proto__: null }) { broadcastImpl.setWriter(writer); if (signal) { - onSignalAbort(signal, () => broadcastImpl.cancel()); + onBroadcastCancel(broadcastImpl, signal); } return { __proto__: null, writer, broadcast: broadcastImpl }; diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 0160bc7eace009..3cd24409222712 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -144,6 +144,7 @@ class ShareImpl { // cursor must re-pull rather than terminating prematurely. for (;;) { if (state.detached) { + if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -628,6 +629,10 @@ class SyncShareImpl { } } +function onShareCancel(shareImpl, signal) { + onSignalAbort(signal, () => shareImpl.cancel(signal.reason)); +} + // ============================================================================= // Public API // ============================================================================= @@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) { const shareImpl = new ShareImpl(normalized, opts); if (signal) { - onSignalAbort(signal, () => shareImpl.cancel()); + onShareCancel(shareImpl, signal); } return shareImpl; diff --git a/test/parallel/test-stream-iter-broadcast-from.js b/test/parallel/test-stream-iter-broadcast-from.js index 2f17b1a7de92fa..c7458dee19ad64 100644 --- a/test/parallel/test-stream-iter-broadcast-from.js +++ b/test/parallel/test-stream-iter-broadcast-from.js @@ -70,11 +70,12 @@ async function testAbortSignal() { ac.abort(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } async function testAlreadyAbortedSignal() { @@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() { const { broadcast: bc } = broadcast({ signal: ac.signal }); const consumer = bc.push(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 86b0eb9b273a34..076fe0a4037aa0 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -134,16 +134,66 @@ async function testShareCancelWithReason() { async function testShareAbortSignal() { const ac = new AbortController(); - const shared = share(from('data'), { signal: ac.signal }); - const consumer = shared.pull(); + const reason = new Error('share aborted'); + const enc = new TextEncoder(); + async function* source() { + yield [enc.encode('a')]; + yield [enc.encode('b')]; + } + const shared = share(source(), { + highWaterMark: 1, + backpressure: 'block', + signal: ac.signal, + }); + const fast = shared.pull()[Symbol.asyncIterator](); + shared.pull(); + + await fast.next(); + const read = fast.next(); + const rejected = assert.rejects(read, (error) => error === reason); + ac.abort(reason); + + await rejected; +} + +async function testShareAbortSignalWhileSourcePullPending() { + const ac = new AbortController(); + const { + promise: resumePromise, + resolve: resume, + } = Promise.withResolvers(); + const { + promise: sourceStartedPromise, + resolve: sourceStarted, + } = Promise.withResolvers(); + + const source = { + __proto__: null, + [Symbol.asyncIterator]() { + return { + __proto__: null, + async next() { + sourceStarted(); + await resumePromise; + return { __proto__: null, done: true, value: undefined }; + }, + }; + }, + }; + + const shared = share(source, { signal: ac.signal }); + const iter1 = shared.pull()[Symbol.asyncIterator](); + const iter2 = shared.pull()[Symbol.asyncIterator](); + const read1 = iter1.next(); + const read2 = iter2.next(); + const rejected1 = assert.rejects(read1, { name: 'AbortError' }); + const rejected2 = assert.rejects(read2, { name: 'AbortError' }); + await sourceStartedPromise; ac.abort(); + resume(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await Promise.all([rejected1, rejected2]); } async function testShareAlreadyAborted() { @@ -153,11 +203,12 @@ async function testShareAlreadyAborted() { const shared = share(from('data'), { signal: ac.signal }); const consumer = shared.pull(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= @@ -273,6 +324,7 @@ Promise.all([ testShareCancelMidIteration(), testShareCancelWithReason(), testShareAbortSignal(), + testShareAbortSignalWhileSourcePullPending(), testShareAlreadyAborted(), testShareSourceError(), testShareLateJoiningConsumer(),