diff --git a/.changeset/happy-mails-shave.md b/.changeset/happy-mails-shave.md
new file mode 100644
index 0000000..edfbea3
--- /dev/null
+++ b/.changeset/happy-mails-shave.md
@@ -0,0 +1,5 @@
+---
+"@effect-rx/rx": patch
+---
+
+fix effect/runtime based subscribables
diff --git a/packages/rx/src/Result.ts b/packages/rx/src/Result.ts
index dd96b14..383e28f 100644
--- a/packages/rx/src/Result.ts
+++ b/packages/rx/src/Result.ts
@@ -276,6 +276,17 @@ export const getOrElse: {
(self: Result, orElse: LazyArg): A | B
} = dual(2, (self: Result, orElse: LazyArg): A | B => self._tag === "Success" ? self.value : orElse())
+/**
+ * @since 1.0.0
+ * @category accessors
+ */
+export const getOrThrow = (self: Result): A => {
+ if (self._tag === "Success") {
+ return self.value
+ }
+ throw new Error("Result.getOrThrow: called on a Failure")
+}
+
/**
* @since 1.0.0
* @category accessors
diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts
index 739dc75..ead3c75 100644
--- a/packages/rx/src/Rx.ts
+++ b/packages/rx/src/Rx.ts
@@ -302,36 +302,37 @@ const RxRuntimeProto = {
return makeStreamPull(pullRx as any, options)
},
- subscriptionRef(this: RxRuntime, arg: any) {
- return makeSubRef(readable((get) => {
- const previous = get.self>()
- const runtimeResult = get(this)
- if (runtimeResult._tag !== "Success") {
- return Result.replacePrevious(runtimeResult, previous)
+ subscriptionRef(this: RxRuntime, ref: any) {
+ return makeSubRef(
+ readable((get) => {
+ const previous = get.self>()
+ const runtimeResult = get(this)
+ if (runtimeResult._tag !== "Success") {
+ return Result.replacePrevious(runtimeResult, previous)
+ }
+ const value = typeof ref === "function" ? ref(get) : ref
+ return Effect.isEffect(value) ? makeEffect(get, value as any, Result.initial(true), runtimeResult.value) : value
+ }),
+ (get, ref) => {
+ const runtime = Result.getOrThrow(get(this))
+ return readSubscribable(get, ref, runtime)
}
- return makeEffect(
- get,
- arg,
- Result.initial(true),
- runtimeResult.value
- )
- }))
+ )
},
subscribable(this: RxRuntime, arg: any) {
- return makeSubscribable(readable((get) => {
+ return readable((get) => {
const previous = get.self>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
}
- return makeEffect(
- get,
- arg,
- Result.initial(true),
- runtimeResult.value
- )
- }))
+ const value = typeof arg === "function" ? arg(get) : arg
+ const sub = Effect.isEffect(value)
+ ? makeEffect(get, value as any, Result.initial(true), runtimeResult.value)
+ : value
+ return readSubscribable(get, sub, runtimeResult.value)
+ })
}
}
@@ -511,18 +512,21 @@ function makeEffect(
Effect.runFork(Scope.close(scope, Exit.void))
})
const scopedEffect = Scope.extend(effect, scope)
+ let syncResult: Result.Result | undefined
const cancel = runCallbackSync(runtime)(
scopedEffect,
function(exit) {
- ctx.setSelfSync(Result.fromExitWithPrevious(exit, previous))
+ syncResult = Result.fromExitWithPrevious(exit, previous)
+ ctx.setSelfSync(syncResult)
},
uninterruptible
)
if (cancel !== undefined) {
ctx.addFinalizer(cancel)
}
-
- if (previous._tag === "Some") {
+ if (syncResult !== undefined) {
+ return syncResult
+ } else if (previous._tag === "Some") {
return Result.waitingFrom(previous)
}
return Result.waiting(initialValue)
@@ -683,26 +687,33 @@ function makeStream(
// -----------------------------------------------------------------------------
/** @internal */
-const readSubscribable =
- (subRx: Rx | Result.Result, any>>) =>
- (get: Context) => {
- const sub = get(subRx)
- if (Subscribable.TypeId in sub) {
- get.addFinalizer(
- sub.changes.pipe(
- Stream.runForEach((value) => get.setSelf(value)),
- Effect.runCallback
- )
+const readSubscribable = (
+ get: Context,
+ sub:
+ | Subscribable.Subscribable
+ | Result.Result, any>,
+ runtime = Runtime.defaultRuntime
+) => {
+ if (Subscribable.TypeId in sub) {
+ get.addFinalizer(
+ sub.changes.pipe(
+ Stream.runForEach((value) => get.setSelf(value)),
+ Effect.runCallback
)
- return Effect.runSync(sub.get)
- } else if (sub._tag !== "Success") {
- return sub
- }
- return makeStream(get, sub.value.changes, Result.initial(true))
+ )
+ return Effect.runSync(sub.get)
+ } else if (sub._tag !== "Success") {
+ return sub
}
+ return makeStream(get, sub.value.changes, Result.initial(true), runtime)
+}
const makeSubRef = (
- refRx: Rx | Result.Result, any>>
+ refRx: Rx | Result.Result, any>>,
+ read: (
+ get: Context,
+ ref: SubscriptionRef.SubscriptionRef | Result.Success, any>
+ ) => any
) => {
function write(ctx: WriteContext>, value: any) {
const ref = ctx.get(refRx)
@@ -712,8 +723,15 @@ const makeSubRef = (
Effect.runSync(SubscriptionRef.set(ref.value, value))
}
}
-
- return writable(readSubscribable(refRx), write)
+ return writable((get) => {
+ const ref = get(refRx)
+ if (SubscriptionRef.SubscriptionRefTypeId in ref) {
+ return read(get, ref)
+ } else if (Result.isSuccess(ref)) {
+ return read(get, ref)
+ }
+ return ref
+ }, write)
}
/**
@@ -734,25 +752,18 @@ export const subscriptionRef: {
| Effect.Effect, any, never>
| Rx.Read, any, never>>
) =>
- makeSubRef(readable((get) => {
- let value: SubscriptionRef.SubscriptionRef | Effect.Effect, any, any>
- if (typeof ref === "function") {
- value = ref(get)
- } else {
- value = ref
- }
-
- return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
- }))
+ makeSubRef(
+ readable((get) => {
+ const value = typeof ref === "function" ? ref(get) : ref
+ return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
+ }),
+ readSubscribable
+ )
// -----------------------------------------------------------------------------
// constructors - subscribable
// -----------------------------------------------------------------------------
-const makeSubscribable = (
- subRx: Rx | Result.Result, any>>
-) => readable(readSubscribable(subRx))
-
/**
* @since 1.0.0
* @category constructors
@@ -763,7 +774,7 @@ export const subscribable: {
effect:
| Effect.Effect, E, never>
| Rx.Read, E, never>>
- ): Rx
+ ): Rx>
} = (
ref:
| Subscribable.Subscribable
@@ -771,16 +782,11 @@ export const subscribable: {
| Effect.Effect, any, never>
| Rx.Read, any, never>>
) =>
- makeSubscribable(readable((get) => {
- let value: Subscribable.Subscribable | Effect.Effect, any, any>
- if (typeof ref === "function") {
- value = ref(get)
- } else {
- value = ref
- }
-
- return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
- }))
+ readable((get) => {
+ const value = typeof ref === "function" ? ref(get) : ref
+ const sub = Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
+ return readSubscribable(get, sub)
+ })
// -----------------------------------------------------------------------------
// constructors - functions
diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts
index 671265e..01793da 100644
--- a/packages/rx/test/Rx.test.ts
+++ b/packages/rx/test/Rx.test.ts
@@ -783,6 +783,16 @@ describe("Rx", () => {
unmount()
})
+ it("Subscribable effect", async () => {
+ vitest.useRealTimers()
+ const sub = Subscribable.make({ get: Effect.succeed(123), changes: Stream.succeed(123) })
+ const rx = Rx.subscribable(Effect.succeed(sub))
+ const r = Registry.make()
+ const unmount = r.mount(rx)
+ assert.deepStrictEqual(r.get(rx), Result.success(123))
+ unmount()
+ })
+
it("Subscribable/SubscriptionRef", async () => {
vitest.useRealTimers()
const ref = SubscriptionRef.make(123).pipe(Effect.runSync)