Skip to content

Commit

Permalink
added Rx.sub
Browse files Browse the repository at this point in the history
  • Loading branch information
jessekelly881 committed Apr 20, 2024
1 parent aa22ac2 commit ecebbe2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/witty-buckets-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

added Rx.sub for working with Subscribables
82 changes: 82 additions & 0 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { type Pipeable, pipeArguments } from "effect/Pipeable"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import * as Subscribable from "effect/Subscribable"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as internalRegistry from "./internal/registry.js"
import { runCallbackSync } from "./internal/runtime.js"
Expand Down Expand Up @@ -297,6 +298,22 @@ const RxRuntimeProto = {
runtimeResult.value
)
}))
},

sub(this: RxRuntime<any, any>, arg: any) {
return makeSub(readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
}
return makeEffect(
get,
arg,
Result.initial(true),
runtimeResult.value
)
}))
}
}

Expand Down Expand Up @@ -536,6 +553,12 @@ export interface RxRuntime<R, ER> extends Rx<Result.Result<Runtime.Runtime<R>, E
| Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>
| Rx.Read<Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>>
) => Writable<Result.Result<A, E>, A>

readonly sub: <A, E, E1 = never>(
create:
| Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>>
) => Rx<Result.Result<A, E | E1>>
}

/**
Expand Down Expand Up @@ -699,6 +722,65 @@ export const subRef: {
return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))

// -----------------------------------------------------------------------------
// constructors - subscribable
// -----------------------------------------------------------------------------

/**
* @since 1.0.0
* @category constructors
*/
export const makeSub = (
subRx: Rx<Subscribable.Subscribable<any, any> | Result.Result<Subscribable.Subscribable<any, any>, any>>
) => {
function read(get: Context) {
const sub = get(subRx)
if (Subscribable.TypeId in sub) {
get.addFinalizer(
sub.changes.pipe(
Stream.runForEach((value) => get.setSelf(value)),
Effect.runCallback
)
)
return Effect.runSync(sub.get)
} else if (sub._tag !== "Success") {
return sub
}
return makeStream(get, sub.value.changes, Result.initial(true))
}

return readable(read)
}

/**
* @since 1.0.0
* @category constructors
*/
export const sub: {
<A, E>(ref: Subscribable.Subscribable<A, E> | Rx.Read<Subscribable.Subscribable<A, E>>): Rx<A>
<A, E, E1>(
effect:
| Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>>
): Rx<A>
} = (
ref:
| Subscribable.Subscribable<any, any>
| Rx.Read<Subscribable.Subscribable<any, any>>
| Effect.Effect<Subscribable.Subscribable<any, any>, any, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<any, any>, any, never>>
) =>
makeSub(readable((get) => {
let value: Subscribable.Subscribable<any, any> | Effect.Effect<Subscribable.Subscribable<any, any>, any, any>
if (typeof ref === "function") {
value = ref(get)
} else {
value = ref
}

return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))

// -----------------------------------------------------------------------------
// constructors - functions
// -----------------------------------------------------------------------------
Expand Down

0 comments on commit ecebbe2

Please sign in to comment.