diff --git a/.changeset/plenty-donkeys-hear.md b/.changeset/plenty-donkeys-hear.md new file mode 100644 index 0000000..dc63de4 --- /dev/null +++ b/.changeset/plenty-donkeys-hear.md @@ -0,0 +1,5 @@ +--- +"@effect-rx/rx": patch +--- + +make effect/stream creation lazy diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index 8f9e491..b6bd9c9 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -271,13 +271,13 @@ export const state = ( function makeEffect( ctx: Context, - effect: Effect.Effect, + create: Rx.Read>, runCallback = Effect.runCallback ): Result.Result { const previous = ctx.self>() const cancel = runCallback( - effect, + create(ctx), function(exit) { if (!Exit.isInterrupted(exit)) { ctx.setSelf(Result.fromExit(exit)) @@ -294,7 +294,7 @@ function makeEffect( function makeEffectRuntime( ctx: Context, - effect: Effect.Effect, + create: Rx.Read>, runtime: RxRuntime ): Result.Result { const previous = ctx.self>() @@ -307,24 +307,26 @@ function makeEffectRuntime( return runtimeResult as any } - return makeEffect(ctx, effect as any, Runtime.runCallback(runtimeResult.value)) + return makeEffect(ctx, create as any, Runtime.runCallback(runtimeResult.value)) } function makeScoped( ctx: Context, - effect: Effect.Effect, + create: Rx.Read>, options?: { readonly runtime?: RxRuntime } -) { - const scope = Effect.runSync(Scope.make()) - ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit))) - const scopedEffect = Effect.provideService( - effect, - Scope.Scope, - scope - ) +): Result.Result { + function createScoped(ctx: Context) { + const scope = Effect.runSync(Scope.make()) + ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit))) + return Effect.provideService( + create(ctx), + Scope.Scope, + scope + ) + } return options?.runtime - ? makeEffectRuntime(ctx, scopedEffect, options.runtime) - : makeEffect(ctx, scopedEffect as Effect.Effect) + ? makeEffectRuntime(ctx, createScoped, options.runtime) + : makeEffect(ctx, createScoped as any) } /** @@ -342,10 +344,9 @@ export const effect: { options?: { readonly runtime?: RxRuntime } ) => readable>(function(get) { - const effect = create(get) return options?.runtime - ? makeEffectRuntime(get, effect, options.runtime) - : makeEffect(get, effect as Effect.Effect) + ? makeEffectRuntime(get, create, options.runtime) + : makeEffect(get, create as any) }) /** @@ -363,7 +364,7 @@ export const scoped: { options?: { readonly runtime?: RxRuntime } ) => readable>(function(get) { - return makeScoped(get, create(get), options) + return makeScoped(get, create, options) }) /** @@ -400,8 +401,8 @@ export const effectFn: { return Result.initial() } return options?.runtime - ? makeEffectRuntime(get, effect, options.runtime) - : makeEffect(get, effect as Effect.Effect) + ? makeEffectRuntime(get, (_) => effect, options.runtime) + : makeEffect(get, (_) => effect as Effect.Effect) }, function(ctx, arg) { ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) }) @@ -434,7 +435,7 @@ export const scopedFn: { if (effect === undefined) { return Result.initial() } - return makeScoped(get, effect, options) + return makeScoped(get, (_) => effect, options) }, function(ctx, arg) { ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) }) @@ -475,14 +476,14 @@ export const runtime: { function makeStream( ctx: Context, - stream: Stream.Stream, + create: Rx.Read>, runCallback = Effect.runCallback ): Result.Result { const previous = ctx.self>() const cancel = runCallback( Stream.runForEach( - stream, + create(ctx), (a) => Effect.sync(() => ctx.setSelf(Result.waiting(Result.success(a)))) ), (exit) => { @@ -512,7 +513,7 @@ function makeStream( function makeStreamRuntime( ctx: Context, - stream: Stream.Stream, + create: Rx.Read>, runtime: RxRuntime ): Result.Result { const previous = ctx.self>() @@ -525,7 +526,7 @@ function makeStreamRuntime( return runtimeResult as any } - return makeStream(ctx, stream as any, Runtime.runCallback(runtimeResult.value)) + return makeStream(ctx, create as any, Runtime.runCallback(runtimeResult.value)) } /** @@ -545,10 +546,9 @@ export const stream: { options?: { readonly runtime?: RxRuntime } ) => readable>(function(get) { - const stream = create(get) return options?.runtime - ? makeStreamRuntime(get, stream, options.runtime) - : makeStream(get, stream as Stream.Stream) + ? makeStreamRuntime(get, create, options.runtime) + : makeStream(get, create as any) }) /** @@ -579,8 +579,8 @@ export const streamFn: { return Result.initial() } return options?.runtime - ? makeStreamRuntime(get, stream, options.runtime) - : makeStream(get, stream as Stream.Stream) + ? makeStreamRuntime(get, (_) => stream, options.runtime) + : makeStream(get, (_) => stream as Stream.Stream) }, function(ctx, arg) { ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) }) @@ -668,8 +668,8 @@ export const streamPull: { ) ) return options?.runtime - ? makeEffectRuntime(get, pull, options.runtime) - : makeEffect(get, pull as any) + ? makeEffectRuntime(get, (_) => pull, options.runtime) + : makeEffect(get, (_) => pull as any) }, function(ctx, _) { ctx.refreshSelf() }, function(refresh) {