From 752157fe6f952d0be150abd2a45a6022eedfbc3d Mon Sep 17 00:00:00 2001 From: Paul Esson Date: Mon, 7 Oct 2024 20:19:59 +1100 Subject: [PATCH 1/2] Fix memory memory management for flatMap prevent the flatMap operator from accumulating in an internal array while concurrency is maxed out --- src/asynciterable/operators/_flatten.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/asynciterable/operators/_flatten.ts b/src/asynciterable/operators/_flatten.ts index b2513d9c..0d04e03f 100644 --- a/src/asynciterable/operators/_flatten.ts +++ b/src/asynciterable/operators/_flatten.ts @@ -93,10 +93,12 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } if (active < concurrent) { pullNextOuter(value as TSource); + results[0] = outer.next(); } else { + // remove the outer iterator from the race, we're full + results[0] = NEVER_PROMISE; outerValues.push(value as TSource); } - results[0] = outer.next(); break; } case Type.INNER: { @@ -119,6 +121,8 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } case Type.INNER: { --active; + // add the outer iterator to the race + results[0] = outer.next(); // return the current slot to the pool innerIndices.push(index); // synchronously drain the `outerValues` buffer From 8f90bc7d19e0ff53559ab6996619652f5017f292 Mon Sep 17 00:00:00 2001 From: Paul Esson Date: Tue, 8 Oct 2024 17:17:18 +1100 Subject: [PATCH 2/2] Amend resolve bugs in flatMap pr * loosing values due to using .next while still waiting for outer iterator * adding completed iterator to race resulting in exception --- src/asynciterable/operators/_flatten.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/asynciterable/operators/_flatten.ts b/src/asynciterable/operators/_flatten.ts index 0d04e03f..e808a466 100644 --- a/src/asynciterable/operators/_flatten.ts +++ b/src/asynciterable/operators/_flatten.ts @@ -121,8 +121,10 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } case Type.INNER: { --active; - // add the outer iterator to the race - results[0] = outer.next(); + // add the outer iterator to the race, if its been removed and we are not yet done with it + if (results[0] === NEVER_PROMISE && !outerComplete) { + results[0] = outer.next(); + } // return the current slot to the pool innerIndices.push(index); // synchronously drain the `outerValues` buffer