-
-
Notifications
You must be signed in to change notification settings - Fork 34.4k
Open
Description
Version
v24.13.0
Platform
Microsoft Windows NT 10.0.26200.0 x64 / Linux 6.6.87.2-microsoft-standard-WSL2 x86_64 x86_64
Subsystem
No response
What steps will reproduce the bug?
Receiver:
import http from "http"
import { Readable } from 'stream'
const server = http.createServer({
highWaterMark: 1
}, async (req, res) => {
console.log("Receiver connected")
const writeStream = new WritableStream({
start(_controller) {
console.log('LOG (writeStream): Stream started')
},
write(value) {
console.log('LOG (writeStream):', value.length, Date.now())
// Simulate slow consumer
return new Promise(res => setTimeout(res, 1000))
},
close() {
console.log('LOG (writeStream): Stream closed')
},
abort(err) {
console.error('LOG (writeStream): Stream aborted:', err);
}
}, {
highWaterMark: 1
})
const readable = Readable.toWeb(
req,
{ strategy: { highWaterMark: 1 } }
) as unknown as ReadableStream
await readable
.pipeTo(writeStream)
.catch(console.error)
console.log("Receiver done")
return res.end("ok")
})
server.listen(8080, '0.0.0.0', () => console.log("Server listening on port 8080"))Sender:
const CHUNK_SIZE = 1024 * 16
const TOTAL_CHUNKS = 1000000
console.log(`Uploading ${TOTAL_CHUNKS} chunks of ${CHUNK_SIZE} bytes`)
const inputStream = new ReadableStream({
start() {
this.i = 0
console.log('LOG (inputStream): Stream started')
},
pull(controller) {
if (this.i >= TOTAL_CHUNKS) {
console.log('[Stream] Closing')
return controller.close()
}
console.log('LOG (inputStream): pull() called - chunk', this.i)
const chunk = new Uint8Array(CHUNK_SIZE)
chunk.fill(this.i % 256)
controller.enqueue(chunk)
this.i++
}
}, {
highWaterMark: 1
})
const encoder = new TransformStream({
async transform(chunk, controller) {
controller.enqueue(chunk)
}
}, {
highWaterMark: 1
}, {
highWaterMark: 1
})
const stream = inputStream.pipeThrough(encoder)
try {
const response = await fetch('http://127.0.0.1:8080', {
method: 'POST',
body: stream,
duplex: 'half'
})
console.log(await response.text())
} catch (err) {
console.error(err)
}How often does it reproduce? Is there a required condition?
- Run receiver.
- Run sender.
- Ctrl+C receiver.
What is the expected behavior? Why is that the expected behavior?
Stream should be canceled internally.
What do you see instead?
Stream continues pushing chunks into thin air.
Additional information
Unrelated, but eventually, node will run out of memory and crashes (especially with large chunk sizes):
<--- Last few GCs --->
[18480:000001B3252C7860] 360972 ms: Mark-sweep 2019.5 (2084.4) -> 2018.3 (2083.9) MB, 905.0 / 0.0 ms (average mu = 0.926, current mu = 0.642) task; scavenge might not succeed
[18480:000001B3252C7860] 362295 ms: Mark-sweep 2019.9 (2084.6) -> 2019.2 (2084.9) MB, 912.1 / 0.0 ms (average mu = 0.826, current mu = 0.311) allocation failure; GC in old space requested
<--- JS stacktrace --->
FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory
1: 00007FF715FFD7AF node_api_throw_syntax_error+174159
2: 00007FF715F80D66 v8::base::CPU::num_virtual_address_bits+59926
3: 00007FF715F82A00 v8::base::CPU::num_virtual_address_bits+67248
4: 00007FF716A2D564 v8::Isolate::ReportExternalAllocationLimitReached+116
5: 00007FF716A188F2 v8::Isolate::Exit+674
6: 00007FF71689A6AC v8::internal::EmbedderStackStateScope::ExplicitScopeForTesting+124
7: 00007FF7168978CB v8::internal::Heap::CollectGarbage+3963
8: 00007FF7168ADB13 v8::internal::HeapAllocator::AllocateRawWithLightRetrySlowPath+2099
9: 00007FF7168AE3BD v8::internal::HeapAllocator::AllocateRawWithRetryOrFailSlowPath+93
10: 00007FF7168B6CEF v8::internal::Factory::AllocateRaw+783
11: 00007FF7168C9EF9 v8::internal::FactoryBase<v8::internal::Factory>::NewHeapNumber<0>+233
12: 00007FF7168CE408 v8::internal::FactoryBase<v8::internal::Factory>::NewRawOneByteString+72
13: 00007FF716675927 v8::internal::String::SlowFlatten+679
14: 00007FF716A370ED v8::String::Utf8Length+141
15: 00007FF715FA8E2D node::Buffer::Data+1789
16: 00007FF716A50652 v8::internal::SetupIsolateDelegate::SetupHeap+54946
17: 00007FF696CD0B09
fetch() (or the Undici client), seems to carry its own internal buffer. It is expected that backpressure takes effect and halts the stream before the heap memory reaches its limit. But instead, it keeps getting filled until it crashes, which is probably a bug?
Metadata
Metadata
Assignees
Labels
No labels