Skip to content

Commit

Permalink
fix *Fn not running for constant return values
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 25, 2023
1 parent 690ffd5 commit 4e173d5
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 56 deletions.
5 changes: 5 additions & 0 deletions .changeset/new-crabs-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

fix \*Fn not running for constant return values
11 changes: 11 additions & 0 deletions docs/rx/Rx.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Added in v1.0.0
- [effect](#effect)
- [effectFn](#effectfn)
- [family](#family)
- [fn](#fn)
- [readable](#readable)
- [runtime](#runtime)
- [scoped](#scoped)
Expand Down Expand Up @@ -212,6 +213,16 @@ export declare const family: <Arg, T extends Rx<any>>(f: (arg: Arg) => T) => (ar
Added in v1.0.0
## fn
**Signature**
```ts
export declare const fn: <Arg, A>(f: Rx.ReadFn<Arg, A>) => Writable<Option.Option<A>, Arg>
```
Added in v1.0.0
## readable
**Signature**
Expand Down
117 changes: 61 additions & 56 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @since 1.0.0
*/
import * as internalRegistry from "@effect-rx/rx/internal/registry"
import { runCallbackSyncDefault } from "@effect-rx/rx/internal/runtime"
import { runCallbackSync, runCallbackSyncDefault } from "@effect-rx/rx/internal/runtime"
import * as Result from "@effect-rx/rx/Result"
import * as Chunk from "@effect/data/Chunk"
import { dual, pipe } from "@effect/data/Function"
Expand All @@ -14,7 +14,7 @@ import { NoSuchElementException } from "@effect/io/Cause"
import * as Effect from "@effect/io/Effect"
import * as Exit from "@effect/io/Exit"
import * as Layer from "@effect/io/Layer"
import * as Runtime from "@effect/io/Runtime"
import type * as Runtime from "@effect/io/Runtime"
import * as Scope from "@effect/io/Scope"
import * as Stream from "@effect/stream/Stream"

Expand Down Expand Up @@ -322,7 +322,7 @@ function makeEffectRuntime<R, E, A, RE>(
return runtimeResult as any
}

return makeEffect(ctx, create as any, Runtime.runCallback(runtimeResult.value))
return makeEffect(ctx, create as any, runCallbackSync(runtimeResult.value))
}

function makeScoped<R, E, A, RE>(
Expand All @@ -344,6 +344,44 @@ function makeScoped<R, E, A, RE>(
: makeEffect(ctx, createScoped as any)
}

/**
* @since 1.0.0
* @category constructors
*/
export const fn = <Arg, A>(f: Rx.ReadFn<Arg, A>): Writable<Option.Option<A>, Arg> => {
const argRx = state<[number, Arg]>([0, undefined as any])
return writable(function(get) {
const [counter, arg] = get(argRx)
if (counter === 0) {
return Option.none()
}
return Option.some(f(arg, get))
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
})
}

function makeFn<Arg, A, B>(
f: Rx.ReadFn<Arg, A>,
transform: (get: Context, _: Option.Option<Rx.Read<A>>) => B
) {
const argRx = state<[number, Arg]>([0, undefined as any])
return writable<B, Arg>(function(get) {
const [counter, arg] = get(argRx)
if (counter === 0) {
return transform(get, Option.none())
}
return transform(
get,
Option.some(function(ctx) {
return f(arg, ctx)
})
)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
})
}

/**
* @since 1.0.0
* @category constructors
Expand Down Expand Up @@ -401,27 +439,15 @@ export const effectFn: {
} = <Arg, R, E, A, RE>(
f: Rx.ReadFn<Arg, Effect.Effect<R, E, A>>,
options?: { readonly runtime?: RxRuntime<RE, R> }
) => {
const argRx = state<[number, Arg]>([0, undefined as any])
const effectRx = readable<Effect.Effect<R, E, A> | undefined>(function(get) {
const [counter, arg] = get(argRx)
if (counter === 0) {
return undefined
}
return f(arg, get)
})
return writable<Result.Result<E, A>, Arg>(function(get) {
const effect = get(effectRx)
if (effect === undefined) {
) =>
makeFn(f, function(get, create) {
if (create._tag === "None") {
return Result.initial()
}
return options?.runtime
? makeEffectRuntime(get, (_) => effect, options.runtime)
: makeEffect(get, (_) => effect as Effect.Effect<never, E, A>)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
? makeEffectRuntime(get, create.value, options.runtime)
: makeEffect(get, create.value as any)
})
}

/**
* @since 1.0.0
Expand All @@ -436,25 +462,13 @@ export const scopedFn: {
} = <Arg, R, E, A, RE>(
f: Rx.ReadFn<Arg, Effect.Effect<R, E, A>>,
options?: { readonly runtime?: RxRuntime<RE, R> }
) => {
const argRx = state<[number, Arg]>([0, undefined as any])
const effectRx = readable<Effect.Effect<R, E, A> | undefined>(function(get) {
const [counter, arg] = get(argRx)
if (counter === 0) {
return undefined
}
return f(arg, get)
})
return writable<Result.Result<E, A>, Arg>(function(get) {
const effect = get(effectRx)
if (effect === undefined) {
) =>
makeFn(f, function(get, create) {
if (create._tag === "None") {
return Result.initial()
}
return makeScoped(get, (_) => effect, options)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
return makeScoped(get, create.value, options)
})
}

/**
* @since 1.0.0
Expand Down Expand Up @@ -492,7 +506,7 @@ export const runtime: {
function makeStream<E, A>(
ctx: Context,
create: Rx.Read<Stream.Stream<never, E, A>>,
runCallback = Effect.runCallback
runCallback = runCallbackSyncDefault
): Result.Result<E | NoSuchElementException, A> {
const previous = ctx.self<Result.Result<E | NoSuchElementException, A>>()

Expand All @@ -518,7 +532,9 @@ function makeStream<E, A>(
}
}
)
ctx.addFinalizer(cancel)
if (cancel !== undefined) {
ctx.addFinalizer(cancel)
}

if (previous._tag === "Some") {
return Result.waitingFrom(previous)
Expand All @@ -541,7 +557,7 @@ function makeStreamRuntime<R, E, A, RE>(
return runtimeResult as any
}

return makeStream(ctx, create as any, Runtime.runCallback(runtimeResult.value))
return makeStream(ctx, create as any, runCallbackSync(runtimeResult.value))
}

/**
Expand Down Expand Up @@ -579,27 +595,16 @@ export const streamFn: {
} = <Arg, R, E, A, RE>(
f: Rx.ReadFn<Arg, Stream.Stream<R, E, A>>,
options?: { readonly runtime?: RxRuntime<RE, R> }
) => {
const argRx = state<[number, Arg]>([0, undefined as any])
const streamRx = readable<Stream.Stream<R, E, A> | undefined>(function(get) {
const [counter, arg] = get(argRx)
if (counter === 0) {
return undefined
}
return f(arg, get)
})
return writable<Result.Result<E | NoSuchElementException, A>, Arg>(function(get) {
const stream = get(streamRx)
if (stream === undefined) {
) =>
makeFn(f, function(get, create) {
if (create._tag === "None") {
return Result.initial()
}
return options?.runtime
? makeStreamRuntime(get, (_) => stream, options.runtime)
: makeStream(get, (_) => stream as Stream.Stream<never, E, A>)
}, function(ctx, arg) {
ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg])
? makeStreamRuntime(get, create.value, options.runtime)
: makeStream(get, create.value as any)
})
}

/**
* @since 1.0.0
* @category models
Expand Down

0 comments on commit 4e173d5

Please sign in to comment.