From c5b58d957f863fe27214af57762221ccea3ffc4d Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 21 Sep 2023 10:25:44 +1200 Subject: [PATCH 1/2] Rx.batch api --- .changeset/metal-forks-teach.md | 5 ++ docs/rx/Rx.ts.md | 14 +++++ packages/rx/src/Rx.ts | 6 ++ packages/rx/src/internal/registry.ts | 90 ++++++++++++++++++++++++++-- packages/rx/test/Rx.test.ts | 82 +++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 6 deletions(-) create mode 100644 .changeset/metal-forks-teach.md diff --git a/.changeset/metal-forks-teach.md b/.changeset/metal-forks-teach.md new file mode 100644 index 0000000..2642e5c --- /dev/null +++ b/.changeset/metal-forks-teach.md @@ -0,0 +1,5 @@ +--- +"@effect-rx/rx": patch +--- + +Rx.batch api diff --git a/docs/rx/Rx.ts.md b/docs/rx/Rx.ts.md index 536f7db..d2addf7 100644 --- a/docs/rx/Rx.ts.md +++ b/docs/rx/Rx.ts.md @@ -56,6 +56,8 @@ Added in v1.0.0 - [TypeId (type alias)](#typeid-type-alias) - [WritableTypeId](#writabletypeid) - [WritableTypeId (type alias)](#writabletypeid-type-alias) +- [utils](#utils) + - [batch](#batch) --- @@ -535,3 +537,15 @@ export type WritableTypeId = typeof WritableTypeId ``` Added in v1.0.0 + +# utils + +## batch + +**Signature** + +```ts +export declare const batch: (f: () => void) => void +``` + +Added in v1.0.0 diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index 228ddba..7c20e4d 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -1,6 +1,7 @@ /** * @since 1.0.0 */ +import * as internalRegistry from "@effect-rx/rx/internal/registry" import * as Result from "@effect-rx/rx/Result" import * as Chunk from "@effect/data/Chunk" import { dual, pipe } from "@effect/data/Function" @@ -700,3 +701,8 @@ export const withLabel = dual< ...self, label: [name, new Error().stack?.split("\n")[5] ?? ""] })) + +/** + * @since 1.0.0 + */ +export const batch: (f: () => void) => void = internalRegistry.batch diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts index 17954a7..95a979b 100644 --- a/packages/rx/src/internal/registry.ts +++ b/packages/rx/src/internal/registry.ts @@ -2,6 +2,7 @@ import type * as Registry from "@effect-rx/rx/Registry" import * as Result from "@effect-rx/rx/Result" import type * as Rx from "@effect-rx/rx/Rx" import * as Equal from "@effect/data/Equal" +import { globalValue } from "@effect/data/GlobalValue" import * as Option from "@effect/data/Option" import type { NoSuchElementException } from "@effect/io/Cause" import type { Exit } from "@effect/io/Exit" @@ -12,6 +13,60 @@ function constListener(_: any) {} /** @internal */ export const TypeId: Registry.TypeId = Symbol.for("@effect-rx/rx/Registry") as Registry.TypeId +/** @internal */ +export const enum BatchPhase { + disabled, + collect, + rebuild, + notify +} + +/** @internal */ +export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({ + phase: BatchPhase.disabled, + depth: 0, + stale: new Set>(), + valid: new Set>() +})) + +/** @internal */ +export function batch(f: () => void): void { + batchState.phase = BatchPhase.collect + batchState.depth++ + try { + f() + if (batchState.depth === 1) { + batchState.phase = BatchPhase.rebuild + for (const node of batchState.stale) { + node.value() + } + batchState.phase = BatchPhase.notify + for (const node of batchState.valid) { + node.notify() + } + } + } finally { + batchState.depth-- + if (batchState.depth === 0) { + batchState.phase = BatchPhase.disabled + batchState.stale.clear() + batchState.valid.clear() + } + } +} + +function batchAddValid(node: Node) { + batchState.valid.add(node) + batchState.stale.delete(node) +} + +function batchAddStale(node: Node) { + if (batchState.valid.has(node)) { + return + } + batchState.stale.add(node) +} + /** @internal */ export const make = (): Registry.Registry => new RegistryImpl() @@ -193,7 +248,13 @@ class Node { if ((this.state & NodeFlags.initialized) === 0) { this.state = NodeState.valid this._value = value - this.notify() + + if (batchState.phase === BatchPhase.disabled) { + this.notify() + } else if (batchState.phase !== BatchPhase.notify) { + batchAddValid(this) + } + return } @@ -204,7 +265,12 @@ class Node { this._value = value this.invalidateChildren() - this.notify() + + if (batchState.phase === BatchPhase.disabled) { + this.notify() + } else if (batchState.phase !== BatchPhase.notify) { + batchAddValid(this) + } } addParent(parent: Node): void { @@ -238,8 +304,12 @@ class Node { this.disposeLifetime() } - // rebuild - this.value() + if (batchState.phase === BatchPhase.collect) { + batchAddStale(this) + this.invalidateChildren() + } else { + this.value() + } } invalidateChildren(): void { @@ -249,8 +319,16 @@ class Node { const children = this.children this.children = [] - for (let i = 0; i < children.length; i++) { - children[i].invalidate() + if (batchState.phase === BatchPhase.rebuild) { + for (let i = 0; i < children.length; i++) { + if (batchState.stale.has(children[i]) === false) { + children[i].invalidate() + } + } + } else { + for (let i = 0; i < children.length; i++) { + children[i].invalidate() + } } } diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts index 88cf43f..4eca4d6 100644 --- a/packages/rx/test/Rx.test.ts +++ b/packages/rx/test/Rx.test.ts @@ -304,6 +304,88 @@ describe("Rx", () => { Rx.state(0).pipe(Rx.withLabel("counter")).label![1] ).toMatch(/Rx.test.ts:\d+:\d+/) }) + + it("batching", async () => { + const r = Registry.make() + const state = Rx.state(1).pipe(Rx.keepAlive) + const state2 = Rx.state("a").pipe(Rx.keepAlive) + let count = 0 + const derived = Rx.readable((get) => { + count++ + return get(state) + get(state2) + }) + expect(r.get(derived)).toEqual("1a") + expect(count).toEqual(1) + Rx.batch(() => { + r.set(state, 2) + r.set(state2, "b") + }) + expect(count).toEqual(2) + expect(r.get(derived)).toEqual("2b") + }) + + it("nested batch", async () => { + const r = Registry.make() + const state = Rx.state(1).pipe(Rx.keepAlive) + const state2 = Rx.state("a").pipe(Rx.keepAlive) + let count = 0 + const derived = Rx.readable((get) => { + count++ + return get(state) + get(state2) + }) + expect(r.get(derived)).toEqual("1a") + expect(count).toEqual(1) + Rx.batch(() => { + r.set(state, 2) + Rx.batch(() => { + r.set(state2, "b") + }) + }) + expect(count).toEqual(2) + expect(r.get(derived)).toEqual("2b") + }) + + it("read correct updated state in batch", async () => { + const r = Registry.make() + const state = Rx.state(1).pipe(Rx.keepAlive) + const state2 = Rx.state("a").pipe(Rx.keepAlive) + let count = 0 + const derived = Rx.readable((get) => { + count++ + return get(state) + get(state2) + }) + expect(r.get(derived)).toEqual("1a") + expect(count).toEqual(1) + Rx.batch(() => { + r.set(state, 2) + expect(r.get(derived)).toEqual("2a") + r.set(state2, "b") + }) + expect(count).toEqual(2) + expect(r.get(derived)).toEqual("2b") + expect(count).toEqual(3) + }) + + it("notifies listeners after batch commit", async () => { + const r = Registry.make() + const state = Rx.state(1).pipe(Rx.keepAlive) + const state2 = Rx.state("a").pipe(Rx.keepAlive) + let count = 0 + const derived = Rx.readable((get) => { + return get(state) + get(state2) + }) + r.subscribe(derived, () => { + count++ + }) + Rx.batch(() => { + r.get(derived) + r.set(state, 2) + r.get(derived) + r.set(state2, "b") + }) + expect(count).toEqual(1) + expect(r.get(derived)).toEqual("2b") + }) }) interface Counter { From 04339f2c715f07f32fc6be8dfe36e0278432f003 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 21 Sep 2023 13:12:38 +1200 Subject: [PATCH 2/2] batching wip --- packages/rx/src/internal/registry.ts | 133 +++++++++++++-------------- packages/rx/test/Rx.test.ts | 2 +- 2 files changed, 63 insertions(+), 72 deletions(-) diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts index 95a979b..6fd2671 100644 --- a/packages/rx/src/internal/registry.ts +++ b/packages/rx/src/internal/registry.ts @@ -13,60 +13,6 @@ function constListener(_: any) {} /** @internal */ export const TypeId: Registry.TypeId = Symbol.for("@effect-rx/rx/Registry") as Registry.TypeId -/** @internal */ -export const enum BatchPhase { - disabled, - collect, - rebuild, - notify -} - -/** @internal */ -export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({ - phase: BatchPhase.disabled, - depth: 0, - stale: new Set>(), - valid: new Set>() -})) - -/** @internal */ -export function batch(f: () => void): void { - batchState.phase = BatchPhase.collect - batchState.depth++ - try { - f() - if (batchState.depth === 1) { - batchState.phase = BatchPhase.rebuild - for (const node of batchState.stale) { - node.value() - } - batchState.phase = BatchPhase.notify - for (const node of batchState.valid) { - node.notify() - } - } - } finally { - batchState.depth-- - if (batchState.depth === 0) { - batchState.phase = BatchPhase.disabled - batchState.stale.clear() - batchState.valid.clear() - } - } -} - -function batchAddValid(node: Node) { - batchState.valid.add(node) - batchState.stale.delete(node) -} - -function batchAddStale(node: Node) { - if (batchState.valid.has(node)) { - return - } - batchState.stale.add(node) -} - /** @internal */ export const make = (): Registry.Registry => new RegistryImpl() @@ -249,10 +195,8 @@ class Node { this.state = NodeState.valid this._value = value - if (batchState.phase === BatchPhase.disabled) { + if (batchState.phase !== BatchPhase.collect) { this.notify() - } else if (batchState.phase !== BatchPhase.notify) { - batchAddValid(this) } return @@ -266,10 +210,8 @@ class Node { this._value = value this.invalidateChildren() - if (batchState.phase === BatchPhase.disabled) { + if (batchState.phase !== BatchPhase.collect) { this.notify() - } else if (batchState.phase !== BatchPhase.notify) { - batchAddValid(this) } } @@ -305,7 +247,7 @@ class Node { } if (batchState.phase === BatchPhase.collect) { - batchAddStale(this) + batchState.stale.push(this) this.invalidateChildren() } else { this.value() @@ -319,16 +261,8 @@ class Node { const children = this.children this.children = [] - if (batchState.phase === BatchPhase.rebuild) { - for (let i = 0; i < children.length; i++) { - if (batchState.stale.has(children[i]) === false) { - children[i].invalidate() - } - } - } else { - for (let i = 0; i < children.length; i++) { - children[i].invalidate() - } + for (let i = 0; i < children.length; i++) { + children[i].invalidate() } } @@ -451,3 +385,60 @@ class Lifetime implements Rx.Context { } } } + +// ----------------------------------------------------------------------------- +// batching +// ----------------------------------------------------------------------------- + +/** @internal */ +export const enum BatchPhase { + disabled, + collect, + commit +} + +/** @internal */ +export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({ + phase: BatchPhase.disabled, + depth: 0, + stale: [] as Array> +})) + +/** @internal */ +export function batch(f: () => void): void { + batchState.phase = BatchPhase.collect + batchState.depth++ + try { + f() + if (batchState.depth === 1) { + batchState.phase = BatchPhase.commit + for (let i = 0; i < batchState.stale.length; i++) { + batchRebuildNode(batchState.stale[i]) + } + } + } finally { + batchState.depth-- + if (batchState.depth === 0) { + batchState.phase = BatchPhase.disabled + batchState.stale = [] + } + } +} + +function batchRebuildNode(node: Node) { + if (node.state === NodeState.valid) { + return + } + + for (let i = 0; i < node.parents.length; i++) { + const parent = node.parents[i] + if (parent.state !== NodeState.valid) { + batchRebuildNode(parent) + } + } + + // @ts-ignore + if (node.state !== NodeState.valid) { + node.value() + } +} diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts index 4eca4d6..d8362a4 100644 --- a/packages/rx/test/Rx.test.ts +++ b/packages/rx/test/Rx.test.ts @@ -361,7 +361,7 @@ describe("Rx", () => { expect(r.get(derived)).toEqual("2a") r.set(state2, "b") }) - expect(count).toEqual(2) + expect(count).toEqual(3) expect(r.get(derived)).toEqual("2b") expect(count).toEqual(3) })