Skip to content

Commit 4b5a8d1

Browse files
authored
fix: cancel a readable stream if a writable stream is closed before a readable stream is closed. (#280)
1 parent a5e7d50 commit 4b5a8d1

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

src/utils.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ export function writeFromReadableStreamDefaultReader(
1212
writable: Writable,
1313
currentReadPromise?: Promise<ReadableStreamReadResult<Uint8Array>> | undefined
1414
) {
15-
const handleError = () => {
16-
// ignore the error
15+
const cancel = (error?: unknown) => {
16+
reader.cancel(error).catch(() => {})
1717
}
1818

19-
writable.on('error', handleError)
19+
writable.on('close', cancel)
20+
writable.on('error', cancel)
2021
;(currentReadPromise ?? reader.read()).then(flow, handleStreamError)
2122

2223
return reader.closed.finally(() => {
23-
writable.off('error', handleError)
24+
writable.off('close', cancel)
25+
writable.off('error', cancel)
2426
})
2527

2628
// eslint-disable-next-line @typescript-eslint/no-explicit-any

test/utils.test.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,15 @@ describe('buildOutgoingHttpHeaders', () => {
7878
})
7979

8080
describe('writeFromReadableStream', () => {
81-
it('should handle client disconnection gracefully without canceling stream', async () => {
81+
it('should handle client disconnection', async () => {
8282
let enqueueCalled = false
8383
let cancelCalled = false
84+
let enqueueTimeout: NodeJS.Timeout | undefined
8485

8586
// Create test ReadableStream
8687
const stream = new ReadableStream({
8788
start(controller) {
88-
setTimeout(() => {
89+
enqueueTimeout = setTimeout(() => {
8990
try {
9091
controller.enqueue(new TextEncoder().encode('test'))
9192
enqueueCalled = true
@@ -97,6 +98,7 @@ describe('writeFromReadableStream', () => {
9798
},
9899
cancel() {
99100
cancelCalled = true
101+
clearTimeout(enqueueTimeout)
100102
},
101103
})
102104

@@ -110,8 +112,8 @@ describe('writeFromReadableStream', () => {
110112

111113
await writeFromReadableStream(stream, writable)
112114

113-
expect(enqueueCalled).toBe(true) // enqueue should succeed
114-
expect(cancelCalled).toBe(false) // cancel should not be called
115+
expect(enqueueCalled).toBe(false) // enqueue should not be called
116+
expect(cancelCalled).toBe(true) // cancel should be called
115117
})
116118
})
117119

0 commit comments

Comments
 (0)