Skip to content

Commit

Permalink
added Rx.sub(Subscribable) (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessekelly881 authored May 7, 2024
1 parent 61053ec commit 8a46d85
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/witty-buckets-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": minor
---

added Rx.subscribable for working with Subscribables
65 changes: 61 additions & 4 deletions docs/rx/Rx.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ Added in v1.0.0
- [make](#make)
- [pull](#pull)
- [readable](#readable)
- [subRef](#subref)
- [subscribable](#subscribable)
- [subscriptionRef](#subscriptionref)
- [writable](#writable)
- [context](#context-1)
- [Context (interface)](#context-interface)
Expand All @@ -45,6 +46,9 @@ Added in v1.0.0
- [Get (type alias)](#get-type-alias)
- [GetResult (type alias)](#getresult-type-alias)
- [Infer (type alias)](#infer-type-alias)
- [InferFailure (type alias)](#inferfailure-type-alias)
- [InferPullSuccess (type alias)](#inferpullsuccess-type-alias)
- [InferSuccess (type alias)](#infersuccess-type-alias)
- [Mount (type alias)](#mount-type-alias)
- [Read (type alias)](#read-type-alias)
- [ReadFn (type alias)](#readfn-type-alias)
Expand Down Expand Up @@ -323,12 +327,29 @@ export declare const readable: <A>(read: Rx.Read<A>, refresh?: Rx.Refresh) => Rx
Added in v1.0.0
## subRef
## subscribable
**Signature**
```ts
export declare const subRef: {
export declare const subscribable: {
<A, E>(ref: Subscribable.Subscribable<A, E, never> | Rx.Read<Subscribable.Subscribable<A, E, never>>): Rx<A>
<A, E, E1>(
effect:
| Effect.Effect<Subscribable.Subscribable<A, E1, never>, E, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E1, never>, E, never>>
): Rx<A>
}
```
Added in v1.0.0
## subscriptionRef
**Signature**
```ts
export declare const subscriptionRef: {
<A>(ref: SubscriptionRef.SubscriptionRef<A> | Rx.Read<SubscriptionRef.SubscriptionRef<A>>): Writable<A, A>
<A, E>(
effect:
Expand Down Expand Up @@ -507,6 +528,36 @@ export type Infer<T extends Rx<any>> = T extends Rx<infer A> ? A : never
Added in v1.0.0
### InferFailure (type alias)
**Signature**
```ts
export type InferFailure<T extends Rx<any>> = T extends Rx<Result.Result<infer _, infer E>> ? E : never
```
Added in v1.0.0
### InferPullSuccess (type alias)
**Signature**
```ts
export type InferPullSuccess<T extends Rx<any>> = T extends Rx<PullResult<infer A, infer _>> ? A : never
```
Added in v1.0.0
### InferSuccess (type alias)
**Signature**
```ts
export type InferSuccess<T extends Rx<any>> = T extends Rx<Result.Result<infer A, infer _>> ? A : never
```
Added in v1.0.0
### Mount (type alias)
**Signature**
Expand Down Expand Up @@ -681,11 +732,17 @@ export interface RxRuntime<R, ER> extends Rx<Result.Result<Runtime.Runtime<R>, E
}
) => Writable<PullResult<A, E | ER>, void>

readonly subRef: <A, E>(
readonly subscriptionRef: <A, E>(
create:
| Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>
| Rx.Read<Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>>
) => Writable<Result.Result<A, E>, A>

readonly subscribable: <A, E, E1 = never>(
create:
| Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>>
) => Rx<Result.Result<A, E | E1>>
}
```

Expand Down
93 changes: 78 additions & 15 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { type Pipeable, pipeArguments } from "effect/Pipeable"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import * as Subscribable from "effect/Subscribable"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as internalRegistry from "./internal/registry.js"
import { runCallbackSync } from "./internal/runtime.js"
Expand Down Expand Up @@ -299,7 +300,7 @@ const RxRuntimeProto = {
return makeStreamPull(pullRx as any, options)
},

subRef(this: RxRuntime<any, any>, arg: any) {
subscriptionRef(this: RxRuntime<any, any>, arg: any) {
return makeSubRef(readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
Expand All @@ -313,6 +314,22 @@ const RxRuntimeProto = {
runtimeResult.value
)
}))
},

subscribable(this: RxRuntime<any, any>, arg: any) {
return makeSubscribable(readable((get) => {
const previous = get.self<Result.Result<any, any>>()
const runtimeResult = get(this)
if (runtimeResult._tag !== "Success") {
return Result.replacePrevious(runtimeResult, previous)
}
return makeEffect(
get,
arg,
Result.initial(true),
runtimeResult.value
)
}))
}
}

Expand Down Expand Up @@ -547,11 +564,17 @@ export interface RxRuntime<R, ER> extends Rx<Result.Result<Runtime.Runtime<R>, E
readonly initialValue?: ReadonlyArray<A>
}) => Writable<PullResult<A, E | ER>, void>

readonly subRef: <A, E>(
readonly subscriptionRef: <A, E>(
create:
| Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>
| Rx.Read<Effect.Effect<SubscriptionRef.SubscriptionRef<A>, E, R>>
) => Writable<Result.Result<A, E>, A>

readonly subscribable: <A, E, E1 = never>(
create:
| Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E, R>, E1, R>>
) => Rx<Result.Result<A, E | E1>>
}

/**
Expand Down Expand Up @@ -655,25 +678,28 @@ function makeStream<A, E>(
// constructors - subscription ref
// -----------------------------------------------------------------------------

const makeSubRef = (
refRx: Rx<SubscriptionRef.SubscriptionRef<any> | Result.Result<SubscriptionRef.SubscriptionRef<any>, any>>
) => {
function read(get: Context) {
const ref = get(refRx)
if (SubscriptionRef.SubscriptionRefTypeId in ref) {
/** @internal */
const readSubscribable =
(subRx: Rx<Subscribable.Subscribable<any, any> | Result.Result<Subscribable.Subscribable<any, any>, any>>) =>
(get: Context) => {
const sub = get(subRx)
if (Subscribable.TypeId in sub) {
get.addFinalizer(
ref.changes.pipe(
sub.changes.pipe(
Stream.runForEach((value) => get.setSelf(value)),
Effect.runCallback
)
)
return Effect.runSync(SubscriptionRef.get(ref))
} else if (ref._tag !== "Success") {
return ref
return Effect.runSync(sub.get)
} else if (sub._tag !== "Success") {
return sub
}
return makeStream(get, ref.value.changes, Result.initial(true))
return makeStream(get, sub.value.changes, Result.initial(true))
}

const makeSubRef = (
refRx: Rx<SubscriptionRef.SubscriptionRef<any> | Result.Result<SubscriptionRef.SubscriptionRef<any>, any>>
) => {
function write(ctx: WriteContext<SubscriptionRef.SubscriptionRef<any>>, value: any) {
const ref = ctx.get(refRx)
if (SubscriptionRef.SubscriptionRefTypeId in ref) {
Expand All @@ -683,14 +709,14 @@ const makeSubRef = (
}
}

return writable(read, write)
return writable(readSubscribable(refRx), write)
}

/**
* @since 1.0.0
* @category constructors
*/
export const subRef: {
export const subscriptionRef: {
<A>(ref: SubscriptionRef.SubscriptionRef<A> | Rx.Read<SubscriptionRef.SubscriptionRef<A>>): Writable<A, A>
<A, E>(
effect:
Expand All @@ -715,6 +741,43 @@ export const subRef: {
return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))

// -----------------------------------------------------------------------------
// constructors - subscribable
// -----------------------------------------------------------------------------

const makeSubscribable = (
subRx: Rx<Subscribable.Subscribable<any, any> | Result.Result<Subscribable.Subscribable<any, any>, any>>
) => readable(readSubscribable(subRx))

/**
* @since 1.0.0
* @category constructors
*/
export const subscribable: {
<A, E>(ref: Subscribable.Subscribable<A, E> | Rx.Read<Subscribable.Subscribable<A, E>>): Rx<A>
<A, E, E1>(
effect:
| Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<A, E1>, E, never>>
): Rx<A>
} = (
ref:
| Subscribable.Subscribable<any, any>
| Rx.Read<Subscribable.Subscribable<any, any>>
| Effect.Effect<Subscribable.Subscribable<any, any>, any, never>
| Rx.Read<Effect.Effect<Subscribable.Subscribable<any, any>, any, never>>
) =>
makeSubscribable(readable((get) => {
let value: Subscribable.Subscribable<any, any> | Effect.Effect<Subscribable.Subscribable<any, any>, any, any>
if (typeof ref === "function") {
value = ref(get)
} else {
value = ref
}

return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
}))

// -----------------------------------------------------------------------------
// constructors - functions
// -----------------------------------------------------------------------------
Expand Down
28 changes: 24 additions & 4 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Registry from "@effect-rx/rx/Registry"
import * as Result from "@effect-rx/rx/Result"
import * as Rx from "@effect-rx/rx/Rx"
import { Cause, Either, FiberRef, SubscriptionRef } from "effect"
import { Cause, Either, FiberRef, Subscribable, SubscriptionRef } from "effect"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Hash from "effect/Hash"
Expand Down Expand Up @@ -775,10 +775,30 @@ describe("Rx", () => {
assert.deepStrictEqual(r.get(rx), Either.right(123))
})

it("Subscribable", async () => {
vitest.useRealTimers()
const sub = Subscribable.make({ get: Effect.succeed(123), changes: Stream.empty })
const rx = Rx.subscribable(sub)
const r = Registry.make()
const unmount = r.mount(rx)
assert.deepStrictEqual(r.get(rx), 123)
unmount()
})

it("Subscribable/SubscriptionRef", async () => {
vitest.useRealTimers()
const ref = SubscriptionRef.make(123).pipe(Effect.runSync)
const rx = Rx.subscribable(ref)
const r = Registry.make()
assert.deepStrictEqual(r.get(rx), 123)
await Effect.runPromise(SubscriptionRef.update(ref, (a) => a + 1))
assert.deepStrictEqual(r.get(rx), 124)
})

it("SubscriptionRef", async () => {
vitest.useRealTimers()
const ref = SubscriptionRef.make(0).pipe(Effect.runSync)
const rx = Rx.subRef(ref)
const rx = Rx.subscriptionRef(ref)
const r = Registry.make()
const unmount = r.mount(rx)
assert.deepStrictEqual(r.get(rx), 0)
Expand All @@ -789,7 +809,7 @@ describe("Rx", () => {
})

it("SubscriptionRef/effect", async () => {
const rx = Rx.subRef(SubscriptionRef.make(0))
const rx = Rx.subscriptionRef(SubscriptionRef.make(0))
const r = Registry.make()
const unmount = r.mount(rx)
assert.deepStrictEqual(r.get(rx), Result.success(0, true))
Expand All @@ -800,7 +820,7 @@ describe("Rx", () => {
})

it("SubscriptionRef/runtime", async () => {
const rx = counterRuntime.subRef(SubscriptionRef.make(0))
const rx = counterRuntime.subscriptionRef(SubscriptionRef.make(0))
const r = Registry.make()
const unmount = r.mount(rx)
assert.deepStrictEqual(r.get(rx), Result.success(0, true))
Expand Down

0 comments on commit 8a46d85

Please sign in to comment.