Skip to content

Commit

Permalink
fix effect/runtime based subscribables
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Aug 9, 2024
1 parent 5782977 commit 6669baf
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 67 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-mails-shave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

fix effect/runtime based subscribables
11 changes: 11 additions & 0 deletions packages/rx/src/Result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ export const getOrElse: {
<A, E, B>(self: Result<A, E>, orElse: LazyArg<B>): A | B
} = dual(2, <A, E, B>(self: Result<A, E>, orElse: LazyArg<B>): A | B => self._tag === "Success" ? self.value : orElse())

/**
* @since 1.0.0
* @category accessors
*/
export const getOrThrow = <A, E>(self: Result<A, E>): A => {
if (self._tag === "Success") {
return self.value
}
throw new Error("Result.getOrThrow: called on a Failure")
}

/**
* @since 1.0.0
* @category accessors
Expand Down
140 changes: 73 additions & 67 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,36 +302,37 @@ const RxRuntimeProto = {
return makeStreamPull(pullRx as any, options)
},

subscriptionRef(this: RxRuntime<any, any>, arg: any) {
return makeSubRef(readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
subscriptionRef(this: RxRuntime<any, any>, ref: any) {
return makeSubRef(
readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
}
const value = typeof ref === "function" ? ref(get) : ref
return Effect.isEffect(value) ? makeEffect(get, value as any, Result.initial(true), runtimeResult.value) : value
}),
(get, ref) => {
const runtime = Result.getOrThrow(get(this))
return readSubscribable(get, ref, runtime)
}
return makeEffect(
get,
arg,
Result.initial(true),
runtimeResult.value
)
}))
)
},

subscribable(this: RxRuntime<any, any>, arg: any) {
return makeSubscribable(readable((get) => {
return readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
}
return makeEffect(
get,
arg,
Result.initial(true),
runtimeResult.value
)
}))
const value = typeof arg === "function" ? arg(get) : arg
const sub = Effect.isEffect(value)
? makeEffect(get, value as any, Result.initial(true), runtimeResult.value)
: value
return readSubscribable(get, sub, runtimeResult.value)
})
}
}

Expand Down Expand Up @@ -511,18 +512,21 @@ function makeEffect<A, E>(
Effect.runFork(Scope.close(scope, Exit.void))
})
const scopedEffect = Scope.extend(effect, scope)
let syncResult: Result.Result<A, E> | undefined
const cancel = runCallbackSync(runtime)(
scopedEffect,
function(exit) {
ctx.setSelfSync(Result.fromExitWithPrevious(exit, previous))
syncResult = Result.fromExitWithPrevious(exit, previous)
ctx.setSelfSync(syncResult)
},
uninterruptible
)
if (cancel !== undefined) {
ctx.addFinalizer(cancel)
}

if (previous._tag === "Some") {
if (syncResult !== undefined) {
return syncResult
} else if (previous._tag === "Some") {
return Result.waitingFrom(previous)
}
return Result.waiting(initialValue)
Expand Down Expand Up @@ -683,26 +687,33 @@ function makeStream<A, E>(
// -----------------------------------------------------------------------------

/** @internal */
const readSubscribable =
(subRx: Rx<Subscribable.Subscribable<any, any> | Result.Result<Subscribable.Subscribable<any, any>, any>>) =>
(get: Context) => {
const sub = get(subRx)
if (Subscribable.TypeId in sub) {
get.addFinalizer(
sub.changes.pipe(
Stream.runForEach((value) => get.setSelf(value)),
Effect.runCallback
)
const readSubscribable = (
get: Context,
sub:
| Subscribable.Subscribable<any, any>
| Result.Result<Subscribable.Subscribable<any, any>, any>,
runtime = Runtime.defaultRuntime
) => {
if (Subscribable.TypeId in sub) {
get.addFinalizer(
sub.changes.pipe(
Stream.runForEach((value) => get.setSelf(value)),
Effect.runCallback
)
return Effect.runSync(sub.get)
} else if (sub._tag !== "Success") {
return sub
}
return makeStream(get, sub.value.changes, Result.initial(true))
)
return Effect.runSync(sub.get)
} else if (sub._tag !== "Success") {
return sub
}
return makeStream(get, sub.value.changes, Result.initial(true), runtime)
}

const makeSubRef = (
refRx: Rx<SubscriptionRef.SubscriptionRef<any> | Result.Result<SubscriptionRef.SubscriptionRef<any>, any>>
refRx: Rx<SubscriptionRef.SubscriptionRef<any> | Result.Result<SubscriptionRef.SubscriptionRef<any>, any>>,
read: (
get: Context,
ref: SubscriptionRef.SubscriptionRef<any> | Result.Success<SubscriptionRef.SubscriptionRef<any>, any>
) => any
) => {
function write(ctx: WriteContext<SubscriptionRef.SubscriptionRef<any>>, value: any) {
const ref = ctx.get(refRx)
Expand All @@ -712,8 +723,15 @@ const makeSubRef = (
Effect.runSync(SubscriptionRef.set(ref.value, value))
}
}

return writable(readSubscribable(refRx), write)
return writable((get) => {
const ref = get(refRx)
if (SubscriptionRef.SubscriptionRefTypeId in ref) {
return read(get, ref)
} else if (Result.isSuccess(ref)) {
return read(get, ref)
}
return ref
}, write)
}

/**
Expand All @@ -734,25 +752,18 @@ export const subscriptionRef: {
| Effect.Effect<SubscriptionRef.SubscriptionRef<any>, any, never>
| Rx.Read<Effect.Effect<SubscriptionRef.SubscriptionRef<any>, any, never>>
) =>
makeSubRef(readable((get) => {
let value: SubscriptionRef.SubscriptionRef<any> | Effect.Effect<SubscriptionRef.SubscriptionRef<any>, any, any>
if (typeof ref === "function") {
value = ref(get)
} else {
value = ref
}

return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))
makeSubRef(
readable((get) => {
const value = typeof ref === "function" ? ref(get) : ref
return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}),
readSubscribable
)

// -----------------------------------------------------------------------------
// constructors - subscribable
// -----------------------------------------------------------------------------

const makeSubscribable = (
subRx: Rx<Subscribable.Subscribable<any, any> | Result.Result<Subscribable.Subscribable<any, any>, any>>
) => readable(readSubscribable(subRx))

/**
* @since 1.0.0
* @category constructors
Expand All @@ -763,24 +774,19 @@ export const subscribable: {
effect:
| Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>>
): Rx<A>
): Rx<Result.Result<A, E | E1>>
} = (
ref:
| Subscribable.Subscribable<any, any>
| Rx.Read<Subscribable.Subscribable<any, any>>
| Effect.Effect<Subscribable.Subscribable<any, any>, any, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<any, any>, any, never>>
) =>
makeSubscribable(readable((get) => {
let value: Subscribable.Subscribable<any, any> | Effect.Effect<Subscribable.Subscribable<any, any>, any, any>
if (typeof ref === "function") {
value = ref(get)
} else {
value = ref
}

return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))
readable((get) => {
const value = typeof ref === "function" ? ref(get) : ref
const sub = Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
return readSubscribable(get, sub)
})

// -----------------------------------------------------------------------------
// constructors - functions
Expand Down
10 changes: 10 additions & 0 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,16 @@ describe("Rx", () => {
unmount()
})

it("Subscribable effect", async () => {
vitest.useRealTimers()
const sub = Subscribable.make({ get: Effect.succeed(123), changes: Stream.succeed(123) })
const rx = Rx.subscribable(Effect.succeed(sub))
const r = Registry.make()
const unmount = r.mount(rx)
assert.deepStrictEqual(r.get(rx), Result.success(123))
unmount()
})

it("Subscribable/SubscriptionRef", async () => {
vitest.useRealTimers()
const ref = SubscriptionRef.make(123).pipe(Effect.runSync)
Expand Down

0 comments on commit 6669baf

Please sign in to comment.