Skip to content

Commit

Permalink
Merge pull request #15 from tim-smart/batching
Browse files Browse the repository at this point in the history
Rx.batch api
  • Loading branch information
tim-smart authored Sep 21, 2023
2 parents deed0fd + 04339f2 commit a48884d
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-forks-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

Rx.batch api
14 changes: 14 additions & 0 deletions docs/rx/Rx.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Added in v1.0.0
- [TypeId (type alias)](#typeid-type-alias)
- [WritableTypeId](#writabletypeid)
- [WritableTypeId (type alias)](#writabletypeid-type-alias)
- [utils](#utils)
- [batch](#batch)

---

Expand Down Expand Up @@ -535,3 +537,15 @@ export type WritableTypeId = typeof WritableTypeId
```
Added in v1.0.0
# utils
## batch
**Signature**
```ts
export declare const batch: (f: () => void) => void
```
Added in v1.0.0
6 changes: 6 additions & 0 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @since 1.0.0
*/
import * as internalRegistry from "@effect-rx/rx/internal/registry"
import * as Result from "@effect-rx/rx/Result"
import * as Chunk from "@effect/data/Chunk"
import { dual, pipe } from "@effect/data/Function"
Expand Down Expand Up @@ -700,3 +701,8 @@ export const withLabel = dual<
...self,
label: [name, new Error().stack?.split("\n")[5] ?? ""]
}))

/**
* @since 1.0.0
*/
export const batch: (f: () => void) => void = internalRegistry.batch
77 changes: 73 additions & 4 deletions packages/rx/src/internal/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type * as Registry from "@effect-rx/rx/Registry"
import * as Result from "@effect-rx/rx/Result"
import type * as Rx from "@effect-rx/rx/Rx"
import * as Equal from "@effect/data/Equal"
import { globalValue } from "@effect/data/GlobalValue"
import * as Option from "@effect/data/Option"
import type { NoSuchElementException } from "@effect/io/Cause"
import type { Exit } from "@effect/io/Exit"
Expand Down Expand Up @@ -193,7 +194,11 @@ class Node<A> {
if ((this.state & NodeFlags.initialized) === 0) {
this.state = NodeState.valid
this._value = value
this.notify()

if (batchState.phase !== BatchPhase.collect) {
this.notify()
}

return
}

Expand All @@ -204,7 +209,10 @@ class Node<A> {

this._value = value
this.invalidateChildren()
this.notify()

if (batchState.phase !== BatchPhase.collect) {
this.notify()
}
}

addParent(parent: Node<any>): void {
Expand Down Expand Up @@ -238,8 +246,12 @@ class Node<A> {
this.disposeLifetime()
}

// rebuild
this.value()
if (batchState.phase === BatchPhase.collect) {
batchState.stale.push(this)
this.invalidateChildren()
} else {
this.value()
}
}

invalidateChildren(): void {
Expand Down Expand Up @@ -373,3 +385,60 @@ class Lifetime<A> implements Rx.Context {
}
}
}

// -----------------------------------------------------------------------------
// batching
// -----------------------------------------------------------------------------

/** @internal */
export const enum BatchPhase {
disabled,
collect,
commit
}

/** @internal */
export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({
phase: BatchPhase.disabled,
depth: 0,
stale: [] as Array<Node<any>>
}))

/** @internal */
export function batch(f: () => void): void {
batchState.phase = BatchPhase.collect
batchState.depth++
try {
f()
if (batchState.depth === 1) {
batchState.phase = BatchPhase.commit
for (let i = 0; i < batchState.stale.length; i++) {
batchRebuildNode(batchState.stale[i])
}
}
} finally {
batchState.depth--
if (batchState.depth === 0) {
batchState.phase = BatchPhase.disabled
batchState.stale = []
}
}
}

function batchRebuildNode(node: Node<any>) {
if (node.state === NodeState.valid) {
return
}

for (let i = 0; i < node.parents.length; i++) {
const parent = node.parents[i]
if (parent.state !== NodeState.valid) {
batchRebuildNode(parent)
}
}

// @ts-ignore
if (node.state !== NodeState.valid) {
node.value()
}
}
82 changes: 82 additions & 0 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,88 @@ describe("Rx", () => {
Rx.state(0).pipe(Rx.withLabel("counter")).label![1]
).toMatch(/Rx.test.ts:\d+:\d+/)
})

it("batching", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
r.set(state2, "b")
})
expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
})

it("nested batch", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
Rx.batch(() => {
r.set(state2, "b")
})
})
expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
})

it("read correct updated state in batch", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
count++
return get(state) + get(state2)
})
expect(r.get(derived)).toEqual("1a")
expect(count).toEqual(1)
Rx.batch(() => {
r.set(state, 2)
expect(r.get(derived)).toEqual("2a")
r.set(state2, "b")
})
expect(count).toEqual(3)
expect(r.get(derived)).toEqual("2b")
expect(count).toEqual(3)
})

it("notifies listeners after batch commit", async () => {
const r = Registry.make()
const state = Rx.state(1).pipe(Rx.keepAlive)
const state2 = Rx.state("a").pipe(Rx.keepAlive)
let count = 0
const derived = Rx.readable((get) => {
return get(state) + get(state2)
})
r.subscribe(derived, () => {
count++
})
Rx.batch(() => {
r.get(derived)
r.set(state, 2)
r.get(derived)
r.set(state2, "b")
})
expect(count).toEqual(1)
expect(r.get(derived)).toEqual("2b")
})
})

interface Counter {
Expand Down

0 comments on commit a48884d

Please sign in to comment.