Skip to content

Commit

Permalink
add subscribeGetter
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 19, 2023
1 parent 23949d2 commit 9419ca3
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
6 changes: 6 additions & 0 deletions .changeset/tasty-kids-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect-rx/rx-react": patch
"@effect-rx/rx": patch
---

add subscribeGetter
39 changes: 20 additions & 19 deletions packages/rx-react/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@ export * as Rx from "@effect-rx/rx/Rx"
*/
export const RegistryContext = React.createContext<Registry.Registry>(Registry.make())

class RxStore<A> {
constructor(
readonly registry: Registry.Registry,
readonly rx: Rx.Rx<A>
) {}
value = this.registry.get(this.rx)
init = false
subscribe = (f: () => void): () => void => {
if (this.init === true) {
this.value = this.registry.get(this.rx)
} else {
this.init = true
}
return this.registry.subscribe(this.rx, (a) => {
this.value = a
f()
})
interface RxStore<A> {
readonly rx: Rx.Rx<A>
readonly registry: Registry.Registry
readonly subscribe: (f: () => void) => () => void
readonly snapshot: () => A
}

function makeStore<A>(registry: Registry.Registry, rx: Rx.Rx<A>): RxStore<A> {
let getter = function() {
return registry.get(rx)
}
function subscribe(f: () => void): () => void {
const [get, unmount] = registry.subscribeGetter(rx, f)
getter = get
return unmount
}
function snapshot() {
return getter()
}
snapshot = (): A => this.value
return { rx, registry, subscribe, snapshot }
}

/**
Expand All @@ -44,7 +45,7 @@ export const useRxValue = <A>(rx: Rx.Rx<A>): A => {
const registry = React.useContext(RegistryContext)
const store = React.useRef<RxStore<A>>(undefined as any)
if (store.current?.rx !== rx || store.current?.registry !== registry) {
store.current = new RxStore(registry, rx)
store.current = makeStore(registry, rx)
}
return React.useSyncExternalStore(store.current.subscribe, store.current.snapshot)
}
Expand Down
1 change: 1 addition & 0 deletions packages/rx/src/Registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface Registry {
readonly refresh: Rx.Rx.Refresh
readonly set: Rx.Rx.Set
readonly subscribe: Rx.Rx.Subscribe
readonly subscribeGetter: Rx.Rx.SubscribeGetter
}

/**
Expand Down
11 changes: 1 addition & 10 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { NoSuchElementException } from "@effect/io/Cause"
import * as Effect from "@effect/io/Effect"
import * as Exit from "@effect/io/Exit"
import * as Layer from "@effect/io/Layer"
import type * as Queue_ from "@effect/io/Queue"
import * as Runtime from "@effect/io/Runtime"
import * as Scope from "@effect/io/Scope"
import * as Channel from "@effect/stream/Channel"
Expand Down Expand Up @@ -83,15 +82,7 @@ export declare namespace Rx {
* @since 1.0.0
* @category models
*/
export type SubscribeWithPrevious = <A>(rx: Rx<A>, f: (prev: Option.Option<A>, value: A) => void, options?: {
readonly immediate?: boolean
}) => () => void

/**
* @since 1.0.0
* @category models
*/
export type Queue = <A>(rx: Rx<A>) => Effect.Effect<Scope.Scope, never, Queue_.Dequeue<A>>
export type SubscribeGetter = <A>(rx: Rx<A>, f: () => void) => readonly [get: () => A, unmount: () => void]
}

/**
Expand Down
15 changes: 15 additions & 0 deletions packages/rx/src/internal/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ class RegistryImpl implements Registry.Registry {
}
}

subscribeGetter = <A>(rx: Rx.Rx<A>, f: () => void): readonly [get: () => A, unmount: () => void] => {
const node = this.ensureNode(rx)
function get() {
return node.value()
}
const remove = node.subscribe(f)
const unmount = () => {
remove()
if (node.canBeRemoved) {
this.scheduleNodeRemoval(node)
}
}
return [get, unmount]
}

mount<A>(rx: Rx.Rx<A>) {
return this.subscribe(rx, constListener, constImmediate)
}
Expand Down

0 comments on commit 9419ca3

Please sign in to comment.