diff --git a/.changeset/metal-forks-teach.md b/.changeset/metal-forks-teach.md
new file mode 100644
index 0000000..2642e5c
--- /dev/null
+++ b/.changeset/metal-forks-teach.md
@@ -0,0 +1,5 @@
+---
+"@effect-rx/rx": patch
+---
+
+Rx.batch api
diff --git a/docs/rx/Rx.ts.md b/docs/rx/Rx.ts.md
index 536f7db..d2addf7 100644
--- a/docs/rx/Rx.ts.md
+++ b/docs/rx/Rx.ts.md
@@ -56,6 +56,8 @@ Added in v1.0.0
- [TypeId (type alias)](#typeid-type-alias)
- [WritableTypeId](#writabletypeid)
- [WritableTypeId (type alias)](#writabletypeid-type-alias)
+- [utils](#utils)
+ - [batch](#batch)
---
@@ -535,3 +537,15 @@ export type WritableTypeId = typeof WritableTypeId
```
Added in v1.0.0
+
+# utils
+
+## batch
+
+**Signature**
+
+```ts
+export declare const batch: (f: () => void) => void
+```
+
+Added in v1.0.0
diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts
index 228ddba..7c20e4d 100644
--- a/packages/rx/src/Rx.ts
+++ b/packages/rx/src/Rx.ts
@@ -1,6 +1,7 @@
/**
* @since 1.0.0
*/
+import * as internalRegistry from "@effect-rx/rx/internal/registry"
import * as Result from "@effect-rx/rx/Result"
import * as Chunk from "@effect/data/Chunk"
import { dual, pipe } from "@effect/data/Function"
@@ -700,3 +701,8 @@ export const withLabel = dual<
...self,
label: [name, new Error().stack?.split("\n")[5] ?? ""]
}))
+
+/**
+ * @since 1.0.0
+ */
+export const batch: (f: () => void) => void = internalRegistry.batch
diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts
index 17954a7..6fd2671 100644
--- a/packages/rx/src/internal/registry.ts
+++ b/packages/rx/src/internal/registry.ts
@@ -2,6 +2,7 @@ import type * as Registry from "@effect-rx/rx/Registry"
import * as Result from "@effect-rx/rx/Result"
import type * as Rx from "@effect-rx/rx/Rx"
import * as Equal from "@effect/data/Equal"
+import { globalValue } from "@effect/data/GlobalValue"
import * as Option from "@effect/data/Option"
import type { NoSuchElementException } from "@effect/io/Cause"
import type { Exit } from "@effect/io/Exit"
@@ -193,7 +194,11 @@ class Node {
if ((this.state & NodeFlags.initialized) === 0) {
this.state = NodeState.valid
this._value = value
- this.notify()
+
+ if (batchState.phase !== BatchPhase.collect) {
+ this.notify()
+ }
+
return
}
@@ -204,7 +209,10 @@ class Node {
this._value = value
this.invalidateChildren()
- this.notify()
+
+ if (batchState.phase !== BatchPhase.collect) {
+ this.notify()
+ }
}
addParent(parent: Node): void {
@@ -238,8 +246,12 @@ class Node {
this.disposeLifetime()
}
- // rebuild
- this.value()
+ if (batchState.phase === BatchPhase.collect) {
+ batchState.stale.push(this)
+ this.invalidateChildren()
+ } else {
+ this.value()
+ }
}
invalidateChildren(): void {
@@ -373,3 +385,60 @@ class Lifetime implements Rx.Context {
}
}
}
+
+// -----------------------------------------------------------------------------
+// batching
+// -----------------------------------------------------------------------------
+
+/** @internal */
+export const enum BatchPhase {
+ disabled,
+ collect,
+ commit
+}
+
+/** @internal */
+export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({
+ phase: BatchPhase.disabled,
+ depth: 0,
+ stale: [] as Array>
+}))
+
+/** @internal */
+export function batch(f: () => void): void {
+ batchState.phase = BatchPhase.collect
+ batchState.depth++
+ try {
+ f()
+ if (batchState.depth === 1) {
+ batchState.phase = BatchPhase.commit
+ for (let i = 0; i < batchState.stale.length; i++) {
+ batchRebuildNode(batchState.stale[i])
+ }
+ }
+ } finally {
+ batchState.depth--
+ if (batchState.depth === 0) {
+ batchState.phase = BatchPhase.disabled
+ batchState.stale = []
+ }
+ }
+}
+
+function batchRebuildNode(node: Node) {
+ if (node.state === NodeState.valid) {
+ return
+ }
+
+ for (let i = 0; i < node.parents.length; i++) {
+ const parent = node.parents[i]
+ if (parent.state !== NodeState.valid) {
+ batchRebuildNode(parent)
+ }
+ }
+
+ // @ts-ignore
+ if (node.state !== NodeState.valid) {
+ node.value()
+ }
+}
diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts
index 88cf43f..d8362a4 100644
--- a/packages/rx/test/Rx.test.ts
+++ b/packages/rx/test/Rx.test.ts
@@ -304,6 +304,88 @@ describe("Rx", () => {
Rx.state(0).pipe(Rx.withLabel("counter")).label![1]
).toMatch(/Rx.test.ts:\d+:\d+/)
})
+
+ it("batching", async () => {
+ const r = Registry.make()
+ const state = Rx.state(1).pipe(Rx.keepAlive)
+ const state2 = Rx.state("a").pipe(Rx.keepAlive)
+ let count = 0
+ const derived = Rx.readable((get) => {
+ count++
+ return get(state) + get(state2)
+ })
+ expect(r.get(derived)).toEqual("1a")
+ expect(count).toEqual(1)
+ Rx.batch(() => {
+ r.set(state, 2)
+ r.set(state2, "b")
+ })
+ expect(count).toEqual(2)
+ expect(r.get(derived)).toEqual("2b")
+ })
+
+ it("nested batch", async () => {
+ const r = Registry.make()
+ const state = Rx.state(1).pipe(Rx.keepAlive)
+ const state2 = Rx.state("a").pipe(Rx.keepAlive)
+ let count = 0
+ const derived = Rx.readable((get) => {
+ count++
+ return get(state) + get(state2)
+ })
+ expect(r.get(derived)).toEqual("1a")
+ expect(count).toEqual(1)
+ Rx.batch(() => {
+ r.set(state, 2)
+ Rx.batch(() => {
+ r.set(state2, "b")
+ })
+ })
+ expect(count).toEqual(2)
+ expect(r.get(derived)).toEqual("2b")
+ })
+
+ it("read correct updated state in batch", async () => {
+ const r = Registry.make()
+ const state = Rx.state(1).pipe(Rx.keepAlive)
+ const state2 = Rx.state("a").pipe(Rx.keepAlive)
+ let count = 0
+ const derived = Rx.readable((get) => {
+ count++
+ return get(state) + get(state2)
+ })
+ expect(r.get(derived)).toEqual("1a")
+ expect(count).toEqual(1)
+ Rx.batch(() => {
+ r.set(state, 2)
+ expect(r.get(derived)).toEqual("2a")
+ r.set(state2, "b")
+ })
+ expect(count).toEqual(3)
+ expect(r.get(derived)).toEqual("2b")
+ expect(count).toEqual(3)
+ })
+
+ it("notifies listeners after batch commit", async () => {
+ const r = Registry.make()
+ const state = Rx.state(1).pipe(Rx.keepAlive)
+ const state2 = Rx.state("a").pipe(Rx.keepAlive)
+ let count = 0
+ const derived = Rx.readable((get) => {
+ return get(state) + get(state2)
+ })
+ r.subscribe(derived, () => {
+ count++
+ })
+ Rx.batch(() => {
+ r.get(derived)
+ r.set(state, 2)
+ r.get(derived)
+ r.set(state2, "b")
+ })
+ expect(count).toEqual(1)
+ expect(r.get(derived)).toEqual("2b")
+ })
})
interface Counter {