Skip to content

Commit

Permalink
add stream apis to context
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jan 9, 2024
1 parent 2161818 commit db67bde
Show file tree
Hide file tree
Showing 9 changed files with 551 additions and 196 deletions.
6 changes: 6 additions & 0 deletions .changeset/big-countries-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect-rx/rx-react": patch
"@effect-rx/rx": patch
---

add stream apis to context
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
"devDependencies": {
"@babel/cli": "^7.23.4",
"@babel/core": "^7.23.6",
"@babel/core": "^7.23.7",
"@babel/plugin-transform-export-namespace-from": "^7.23.4",
"@babel/plugin-transform-modules-commonjs": "^7.23.3",
"@changesets/changelog-github": "^0.5.0",
Expand All @@ -28,9 +28,9 @@
"@effect/docgen": "^0.3.8",
"@effect/eslint-plugin": "^0.1.2",
"@effect/language-service": "^0.1.0",
"@typescript-eslint/eslint-plugin": "^6.15.0",
"@typescript-eslint/parser": "^6.15.0",
"@vitest/coverage-v8": "^1.1.0",
"@typescript-eslint/eslint-plugin": "^6.18.1",
"@typescript-eslint/parser": "^6.18.1",
"@vitest/coverage-v8": "^1.1.3",
"babel-plugin-annotate-pure-calls": "^0.4.0",
"eslint": "^8.56.0",
"eslint-import-resolver-typescript": "^3.6.1",
Expand All @@ -45,6 +45,6 @@
"prettier": "^3.1.1",
"tsx": "^4.7.0",
"typescript": "^5.3.3",
"vitest": "^1.1.0"
"vitest": "^1.1.3"
}
}
4 changes: 2 additions & 2 deletions packages/rx-react/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
"license": "MIT",
"sideEffects": false,
"devDependencies": {
"@types/react": "^18.2.45",
"effect": "^2.0.0",
"@types/react": "^18.2.47",
"effect": "^2.0.2",
"react": "^18.2.0"
},
"peerDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/rx/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"license": "MIT",
"sideEffects": false,
"devDependencies": {
"effect": "^2.0.0"
"effect": "^2.0.2"
},
"peerDependencies": {
"effect": "^2.0.0"
Expand Down
7 changes: 7 additions & 0 deletions packages/rx/src/Result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ export const waitingFrom = <E, A>(previous: Option.Option<Result<E, A>>): Result
*/
export const isInitial = <E, A>(result: Result<E, A>): result is Initial<E, A> => result._tag === "Initial"

/**
* @since 1.0.0
* @category refinements
*/
export const isNotInitial = <E, A>(result: Result<E, A>): result is Success<E, A> | Failure<E, A> =>
result._tag !== "Initial"

/**
* @since 1.0.0
* @category constructors
Expand Down
8 changes: 8 additions & 0 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ export interface Context {
readonly setSelf: <A>(a: A) => Effect.Effect<never, never, void>
readonly setSync: <R, W>(rx: Writable<R, W>, value: W) => void
readonly set: <R, W>(rx: Writable<R, W>, value: W) => Effect.Effect<never, never, void>
readonly stream: <A>(rx: Rx<A>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) => Stream.Stream<never, never, A>
readonly streamResult: <E, A>(rx: Rx<Result.Result<E, A>>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) => Stream.Stream<never, E, A>
readonly subscribe: <A>(rx: Rx<A>, f: (_: A) => void, options?: {
readonly immediate?: boolean
}) => void
Expand Down
44 changes: 44 additions & 0 deletions packages/rx/src/internal/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ import type { NoSuchElementException } from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Equal from "effect/Equal"
import type { Exit } from "effect/Exit"
import { pipe } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream"
import type * as Registry from "../Registry.js"
import * as Result from "../Result.js"
import type * as Rx from "../Rx.js"
Expand Down Expand Up @@ -517,6 +520,47 @@ const LifetimeProto: Omit<Lifetime<any>, "node" | "finalizers" | "disposed"> = {
})
},

stream<A>(this: Lifetime<any>, rx: Rx.Rx<A>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) {
if (this.disposed) {
throw disposedError(this.node.rx)
}

return pipe(
Effect.acquireRelease(
Queue.bounded<A>(options?.bufferSize ?? 16),
Queue.shutdown
),
Effect.tap((queue) =>
Effect.acquireRelease(
Effect.sync(() => {
return this.node.registry.subscribe(rx, (_) => {
Queue.unsafeOffer(queue, _)
}, { immediate: options?.withoutInitialValue !== true })
}),
(cancel) => Effect.sync(cancel)
)
),
Effect.map((queue) => Stream.fromQueue(queue)),
Stream.unwrapScoped
)
},

streamResult<E, A>(this: Lifetime<any>, rx: Rx.Rx<Result.Result<E, A>>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) {
return pipe(
this.stream(rx, options),
Stream.filter(Result.isNotInitial),
Stream.flatMap((result) =>
result._tag === "Success" ? Stream.succeed(result.value) : Stream.failCause(result.cause)
)
)
},

dispose(this: Lifetime<any>): void {
this.disposed = true
if (this.finalizers === undefined) {
Expand Down
36 changes: 36 additions & 0 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,42 @@ describe("Rx", () => {
const r = Registry.make()
assert.strictEqual(r.get(bool), true)
})

it("get.stream", async () => {
const count = Rx.make(0)
const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2)))

const r = Registry.make()
const cancel = r.mount(multiplied)

assert.strictEqual(r.get(count), 0)
assert.deepStrictEqual(r.get(multiplied), Result.success(0, true))

r.set(count, 1)
await new Promise((resolve) => resolve(null))
assert.deepStrictEqual(r.get(multiplied), Result.success(2, true))

cancel()
})

it("get.streamResult", async () => {
const count = Rx.make(0)
const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2)))
const plusOne = Rx.make((get) => get.streamResult(multiplied).pipe(Stream.map((_) => _ + 1)))

const r = Registry.make()
const cancel = r.mount(plusOne)

assert.strictEqual(r.get(count), 0)
assert.deepStrictEqual(r.get(plusOne), Result.success(1, true))

r.set(count, 1)
await new Promise((resolve) => resolve(null))
await new Promise((resolve) => resolve(null))
assert.deepStrictEqual(r.get(plusOne), Result.success(3, true))

cancel()
})
})

interface BuildCounter {
Expand Down
Loading

0 comments on commit db67bde

Please sign in to comment.