Skip to content

Commit

Permalink
make effect/stream creation lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 22, 2023
1 parent 571ea58 commit 2b58bfc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/plenty-donkeys-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

make effect/stream creation lazy
66 changes: 33 additions & 33 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ export const state = <A>(

function makeEffect<E, A>(
ctx: Context,
effect: Effect.Effect<never, E, A>,
create: Rx.Read<Effect.Effect<never, E, A>>,
runCallback = Effect.runCallback
): Result.Result<E, A> {
const previous = ctx.self<Result.Result<E, A>>()

const cancel = runCallback(
effect,
create(ctx),
function(exit) {
if (!Exit.isInterrupted(exit)) {
ctx.setSelf(Result.fromExit(exit))
Expand All @@ -294,7 +294,7 @@ function makeEffect<E, A>(

function makeEffectRuntime<R, E, A, RE>(
ctx: Context,
effect: Effect.Effect<R, E, A>,
create: Rx.Read<Effect.Effect<R, E, A>>,
runtime: RxRuntime<RE, R>
): Result.Result<E, A> {
const previous = ctx.self<Result.Result<E, A>>()
Expand All @@ -307,24 +307,26 @@ function makeEffectRuntime<R, E, A, RE>(
return runtimeResult as any
}

return makeEffect(ctx, effect as any, Runtime.runCallback(runtimeResult.value))
return makeEffect(ctx, create as any, Runtime.runCallback(runtimeResult.value))
}

function makeScoped<R, E, A, RE>(
ctx: Context,
effect: Effect.Effect<R, E, A>,
create: Rx.Read<Effect.Effect<R, E, A>>,
options?: { readonly runtime?: RxRuntime<RE, R> }
) {
const scope = Effect.runSync(Scope.make())
ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit)))
const scopedEffect = Effect.provideService(
effect,
Scope.Scope,
scope
)
): Result.Result<E, A> {
function createScoped(ctx: Context) {
const scope = Effect.runSync(Scope.make())
ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit)))
return Effect.provideService(
create(ctx),
Scope.Scope,
scope
)
}
return options?.runtime
? makeEffectRuntime(ctx, scopedEffect, options.runtime)
: makeEffect(ctx, scopedEffect as Effect.Effect<never, E, A>)
? makeEffectRuntime(ctx, createScoped, options.runtime)
: makeEffect(ctx, createScoped as any)
}

/**
Expand All @@ -342,10 +344,9 @@ export const effect: {
options?: { readonly runtime?: RxRuntime<RE, R> }
) =>
readable<Result.Result<E, A>>(function(get) {
const effect = create(get)
return options?.runtime
? makeEffectRuntime(get, effect, options.runtime)
: makeEffect(get, effect as Effect.Effect<never, E, A>)
? makeEffectRuntime(get, create, options.runtime)
: makeEffect(get, create as any)
})

/**
Expand All @@ -363,7 +364,7 @@ export const scoped: {
options?: { readonly runtime?: RxRuntime<RE, R> }
) =>
readable<Result.Result<E, A>>(function(get) {
return makeScoped(get, create(get), options)
return makeScoped(get, create, options)
})

/**
Expand Down Expand Up @@ -400,8 +401,8 @@ export const effectFn: {
return Result.initial()
}
return options?.runtime
? makeEffectRuntime(get, effect, options.runtime)
: makeEffect(get, effect as Effect.Effect<never, E, A>)
? makeEffectRuntime(get, (_) => effect, options.runtime)
: makeEffect(get, (_) => effect as Effect.Effect<never, E, A>)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
})
Expand Down Expand Up @@ -434,7 +435,7 @@ export const scopedFn: {
if (effect === undefined) {
return Result.initial()
}
return makeScoped(get, effect, options)
return makeScoped(get, (_) => effect, options)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
})
Expand Down Expand Up @@ -475,14 +476,14 @@ export const runtime: {

function makeStream<E, A>(
ctx: Context,
stream: Stream.Stream<never, E, A>,
create: Rx.Read<Stream.Stream<never, E, A>>,
runCallback = Effect.runCallback
): Result.Result<E | NoSuchElementException, A> {
const previous = ctx.self<Result.Result<E | NoSuchElementException, A>>()

const cancel = runCallback(
Stream.runForEach(
stream,
create(ctx),
(a) => Effect.sync(() => ctx.setSelf(Result.waiting(Result.success(a))))
),
(exit) => {
Expand Down Expand Up @@ -512,7 +513,7 @@ function makeStream<E, A>(

function makeStreamRuntime<R, E, A, RE>(
ctx: Context,
stream: Stream.Stream<R, E, A>,
create: Rx.Read<Stream.Stream<R, E, A>>,
runtime: RxRuntime<RE, R>
): Result.Result<E | NoSuchElementException, A> {
const previous = ctx.self<Result.Result<E | NoSuchElementException, A>>()
Expand All @@ -525,7 +526,7 @@ function makeStreamRuntime<R, E, A, RE>(
return runtimeResult as any
}

return makeStream(ctx, stream as any, Runtime.runCallback(runtimeResult.value))
return makeStream(ctx, create as any, Runtime.runCallback(runtimeResult.value))
}

/**
Expand All @@ -545,10 +546,9 @@ export const stream: {
options?: { readonly runtime?: RxRuntime<RE, R> }
) =>
readable<Result.Result<E | NoSuchElementException, A>>(function(get) {
const stream = create(get)
return options?.runtime
? makeStreamRuntime(get, stream, options.runtime)
: makeStream(get, stream as Stream.Stream<never, E, A>)
? makeStreamRuntime(get, create, options.runtime)
: makeStream(get, create as any)
})

/**
Expand Down Expand Up @@ -579,8 +579,8 @@ export const streamFn: {
return Result.initial()
}
return options?.runtime
? makeStreamRuntime(get, stream, options.runtime)
: makeStream(get, stream as Stream.Stream<never, E, A>)
? makeStreamRuntime(get, (_) => stream, options.runtime)
: makeStream(get, (_) => stream as Stream.Stream<never, E, A>)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
})
Expand Down Expand Up @@ -668,8 +668,8 @@ export const streamPull: {
)
)
return options?.runtime
? makeEffectRuntime(get, pull, options.runtime)
: makeEffect(get, pull as any)
? makeEffectRuntime(get, (_) => pull, options.runtime)
: makeEffect(get, (_) => pull as any)
}, function(ctx, _) {
ctx.refreshSelf()
}, function(refresh) {
Expand Down

0 comments on commit 2b58bfc

Please sign in to comment.