Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory memory management for flatMap #376

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/asynciterable/operators/_flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
}
if (active < concurrent) {
pullNextOuter(value as TSource);
results[0] = outer.next();
} else {
// remove the outer iterator from the race, we're full
results[0] = NEVER_PROMISE;
outerValues.push(value as TSource);
}
results[0] = outer.next();
Comment on lines +96 to -99
Copy link
Member

@trxcllnt trxcllnt Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out.

This operator is technically working as designed, i.e. we are intentionally pulling and buffering values from the outer sequence as quickly as they're emitted, but only flattening concurrent number of inner sequences at a time (similar to the Observable implementation).

But I can see how the internal buffering is problematic, and I agree that we shouldn't need to buffer the outer sequence values.

Copy link
Author

@thepont thepont Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @trxcllnt, I manged to get the tests running locally, thanks for your help identifying the problems with my code.

When I started looking at the actual code I figured it must have been by design.

My intent was to fan out to do multiple API requests ( to AWS SQS) concurrently based on the result of of a dynamodb scan and the speed that I could processes the items is reduced significantly when I removed the concurrency and tried a concatMap ( although this technically works, and solves my memory issues, although a little slower for my application).

My initial motivation for moving from RX to IX was to fix this issue, figuring it would be difficult to prevent my memory issues unless I had a way of throttling the producer, I assume another approach that would work as the library currently stands is to use a throttle before the flatMap and attempt to tweak that time so the consumer can keep up, and the buffer never explodes.

On the tests, I managed to run the unit tests although the contribution guide mentions performance tests but I seem unable to locate these?

break;
}
case Type.INNER: {
Expand All @@ -119,6 +121,8 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
}
case Type.INNER: {
--active;
// add the outer iterator to the race
results[0] = outer.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe, since here we are still potentially waiting on the latest promise from the outer iterator to resolve.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'll see if I can't figure out a better way of doing it without potentially loosing values.

Copy link
Author

@thepont thepont Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @trxcllnt thanks for pointing this out, 8f90bc7 resolves the tests locally for me, as you Identified some were failing because I was loosing items due to the above calling next when a previous item still remained unresolved and other tests were failing because next was called on the outer after it had been completed above.

// return the current slot to the pool
innerIndices.push(index);
// synchronously drain the `outerValues` buffer
Expand Down
Loading