Skip to content

Commit

Permalink
Rx.batch api
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 21, 2023
1 parent deed0fd commit c5b58d9
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-forks-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

Rx.batch api
14 changes: 14 additions & 0 deletions docs/rx/Rx.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Added in v1.0.0
- [TypeId (type alias)](#typeid-type-alias)
- [WritableTypeId](#writabletypeid)
- [WritableTypeId (type alias)](#writabletypeid-type-alias)
- [utils](#utils)
- [batch](#batch)

---

Expand Down Expand Up @@ -535,3 +537,15 @@ export type WritableTypeId = typeof WritableTypeId
```
Added in v1.0.0
# utils
## batch
**Signature**
```ts
export declare const batch: (f: () => void) => void
```
Added in v1.0.0
6 changes: 6 additions & 0 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @since 1.0.0
*/
import * as internalRegistry from "@effect-rx/rx/internal/registry"
import * as Result from "@effect-rx/rx/Result"
import * as Chunk from "@effect/data/Chunk"
import { dual, pipe } from "@effect/data/Function"
Expand Down Expand Up @@ -700,3 +701,8 @@ export const withLabel = dual<
...self,
label: [name, new Error().stack?.split("\n")[5] ?? ""]
}))

/**
* @since 1.0.0
*/
export const batch: (f: () => void) => void = internalRegistry.batch
90 changes: 84 additions & 6 deletions packages/rx/src/internal/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type * as Registry from "@effect-rx/rx/Registry"
import * as Result from "@effect-rx/rx/Result"
import type * as Rx from "@effect-rx/rx/Rx"
import * as Equal from "@effect/data/Equal"
import { globalValue } from "@effect/data/GlobalValue"
import * as Option from "@effect/data/Option"
import type { NoSuchElementException } from "@effect/io/Cause"
import type { Exit } from "@effect/io/Exit"
Expand All @@ -12,6 +13,60 @@ function constListener(_: any) {}
/** @internal */
export const TypeId: Registry.TypeId = Symbol.for("@effect-rx/rx/Registry") as Registry.TypeId

/** @internal */
export const enum BatchPhase {
disabled,
collect,
rebuild,
notify
}

/** @internal */
export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({
phase: BatchPhase.disabled,
depth: 0,
stale: new Set<Node<any>>(),
valid: new Set<Node<any>>()
}))

/** @internal */
export function batch(f: () => void): void {
batchState.phase = BatchPhase.collect
batchState.depth++
try {
f()
if (batchState.depth === 1) {
batchState.phase = BatchPhase.rebuild
for (const node of batchState.stale) {
node.value()
}
batchState.phase = BatchPhase.notify
for (const node of batchState.valid) {
node.notify()
}
}
} finally {
batchState.depth--
if (batchState.depth === 0) {
batchState.phase = BatchPhase.disabled
batchState.stale.clear()
batchState.valid.clear()
}
}
}

function batchAddValid(node: Node<any>) {
batchState.valid.add(node)
batchState.stale.delete(node)
}

function batchAddStale(node: Node<any>) {
if (batchState.valid.has(node)) {
return
}
batchState.stale.add(node)
}

/** @internal */
export const make = (): Registry.Registry => new RegistryImpl()

Expand Down Expand Up @@ -193,7 +248,13 @@ class Node<A> {
if ((this.state & NodeFlags.initialized) === 0) {
this.state = NodeState.valid
this._value = value
this.notify()

if (batchState.phase === BatchPhase.disabled) {
this.notify()
} else if (batchState.phase !== BatchPhase.notify) {
batchAddValid(this)
}

return
}

Expand All @@ -204,7 +265,12 @@ class Node<A> {

this._value = value
this.invalidateChildren()
this.notify()

if (batchState.phase === BatchPhase.disabled) {
this.notify()
} else if (batchState.phase !== BatchPhase.notify) {
batchAddValid(this)
}
}

addParent(parent: Node<any>): void {
Expand Down Expand Up @@ -238,8 +304,12 @@ class Node<A> {
this.disposeLifetime()
}

// rebuild
this.value()
if (batchState.phase === BatchPhase.collect) {
batchAddStale(this)
this.invalidateChildren()
} else {
this.value()
}
}

invalidateChildren(): void {
Expand All @@ -249,8 +319,16 @@ class Node<A> {

const children = this.children
this.children = []
for (let i = 0; i < children.length; i++) {
children[i].invalidate()
if (batchState.phase === BatchPhase.rebuild) {
for (let i = 0; i < children.length; i++) {
if (batchState.stale.has(children[i]) === false) {
children[i].invalidate()
}
}
} else {
for (let i = 0; i < children.length; i++) {
children[i].invalidate()
}
}
}

Expand Down
82 changes: 82 additions & 0 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,88 @@ describe("Rx", () => {
Rx.state(0).pipe(Rx.withLabel("counter")).label![1]
).toMatch(/Rx.test.ts:\d+:\d+/)
})

it("batching", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
r.set(state2, "b")
})
expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
})

it("nested batch", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
Rx.batch(() => {
r.set(state2, "b")
})
})
expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
})

it("read correct updated state in batch", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
expect(r.get(derived)).toEqual("2a")
r.set(state2, "b")
})
expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
expect(count).toEqual(3)
})

it("notifies listeners after batch commit", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
return get(state) + get(state2)
})
r.subscribe(derived, () => {
count++
})
Rx.batch(() => {
r.get(derived)
r.set(state, 2)
r.get(derived)
r.set(state2, "b")
})
expect(count).toEqual(1)
expect(r.get(derived)).toEqual("2b")
})
})

interface Counter {
Expand Down

0 comments on commit c5b58d9

Please sign in to comment.