From e5bc8617c044415a1f015c0fb4ebcc7a1d764cfe Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 24 May 2024 15:55:05 +1200 Subject: [PATCH] replace keepAlive with autoDispose --- .changeset/shy-terms-dream.md | 7 ++ .changeset/ten-ghosts-scream.md | 7 ++ packages/rx/src/Registry.ts | 2 - packages/rx/src/Rx.ts | 162 ++++++++++++++------------- packages/rx/src/internal/registry.ts | 91 +++++++++------ packages/rx/test/Rx.test.ts | 101 +++++++++-------- 6 files changed, 205 insertions(+), 165 deletions(-) create mode 100644 .changeset/shy-terms-dream.md create mode 100644 .changeset/ten-ghosts-scream.md diff --git a/.changeset/shy-terms-dream.md b/.changeset/shy-terms-dream.md new file mode 100644 index 0000000..7c0a8f3 --- /dev/null +++ b/.changeset/shy-terms-dream.md @@ -0,0 +1,7 @@ +--- +"@effect-rx/rx": minor +"@effect-rx/rx-react": minor +"@effect-rx/rx-vue": minor +--- + +if an Rx has no finilizers, use lazy calculation diff --git a/.changeset/ten-ghosts-scream.md b/.changeset/ten-ghosts-scream.md new file mode 100644 index 0000000..743cfaf --- /dev/null +++ b/.changeset/ten-ghosts-scream.md @@ -0,0 +1,7 @@ +--- +"@effect-rx/rx": minor +"@effect-rx/rx-react": minor +"@effect-rx/rx-vue": minor +--- + +replace keepAlive with autoDispose diff --git a/packages/rx/src/Registry.ts b/packages/rx/src/Registry.ts index 942efcc..f239256 100644 --- a/packages/rx/src/Registry.ts +++ b/packages/rx/src/Registry.ts @@ -27,8 +27,6 @@ export interface Registry { readonly refresh: Rx.Rx.RefreshRxSync readonly set: Rx.Rx.Set readonly subscribe: Rx.Rx.Subscribe - readonly reset: () => void - readonly dispose: () => void } /** diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index 5d6af2e..40fd724 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -14,6 +14,7 @@ import * as Inspectable from "effect/Inspectable" import * as Layer from "effect/Layer" import * as Option from "effect/Option" import { type Pipeable, pipeArguments } from "effect/Pipeable" +import { hasProperty } from "effect/Predicate" import * as Runtime from "effect/Runtime" import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" @@ -40,11 +41,11 @@ export type TypeId = typeof TypeId */ export interface Rx extends Pipeable, Inspectable.Inspectable { readonly [TypeId]: TypeId - readonly keepAlive: boolean + readonly autoDispose: boolean + readonly idleTTL?: number readonly read: Rx.Read readonly refresh?: Rx.Refresh readonly label?: readonly [name: string, stack: string] - readonly idleTTL?: number } /** @@ -241,7 +242,8 @@ const RxProto = { toJSON(this: Rx) { return { _id: "Rx", - keepAlive: this.keepAlive, + autoDispose: this.autoDispose, + idleTTL: this.idleTTL, label: this.label } }, @@ -296,40 +298,44 @@ const RxRuntimeProto = { Result.initial(true), runtimeResult.value ) - }) + }).pipe(autoDispose()) return makeStreamPull(pullRx as any, options) }, subscriptionRef(this: RxRuntime, arg: any) { - return makeSubRef(readable((get) => { - const previous = get.self>() - const runtimeResult = get(this) - if (runtimeResult._tag !== "Success") { - return Result.replacePrevious(runtimeResult, previous) - } - return makeEffect( - get, - arg, - Result.initial(true), - runtimeResult.value - ) - })) + return makeSubRef(autoDispose( + readable((get) => { + const previous = get.self>() + const runtimeResult = get(this) + if (runtimeResult._tag !== "Success") { + return Result.replacePrevious(runtimeResult, previous) + } + return makeEffect( + get, + arg, + Result.initial(true), + runtimeResult.value + ) + }) + ) as any) }, subscribable(this: RxRuntime, arg: any) { - return makeSubscribable(readable((get) => { - const previous = get.self>() - const runtimeResult = get(this) - if (runtimeResult._tag !== "Success") { - return Result.replacePrevious(runtimeResult, previous) - } - return makeEffect( - get, - arg, - Result.initial(true), - runtimeResult.value - ) - })) + return makeSubscribable(autoDispose( + readable((get) => { + const previous = get.self>() + const runtimeResult = get(this) + if (runtimeResult._tag !== "Success") { + return Result.replacePrevious(runtimeResult, previous) + } + return makeEffect( + get, + arg, + Result.initial(true), + runtimeResult.value + ) + }) + ) as any) } } @@ -353,7 +359,7 @@ export const readable = ( refresh?: Rx.Refresh ): Rx => { const rx = Object.create(RxProto) - rx.keepAlive = false + rx.autoDispose = false rx.read = read rx.refresh = refresh return rx @@ -369,7 +375,7 @@ export const writable = ( refresh?: Rx.Refresh ): Writable => { const rx = Object.create(WritableProto) - rx.keepAlive = false + rx.autoDispose = false rx.read = read rx.write = write rx.refresh = refresh @@ -411,6 +417,23 @@ export const make: { return readable(readOrRx) } +/** + * @since 1.0.0 + * @category combinators + */ +export const autoDispose: { + (idleTTL?: Duration.DurationInput | undefined): >(self: A) => A + >(self: A, idleTTL?: Duration.DurationInput | undefined): A +} = dual( + (args) => hasProperty(args[0], TypeId), + >(self: A, idleTTL?: Duration.DurationInput | undefined): A => + Object.assign(Object.create(Object.getPrototypeOf(self)), { + ...self, + autoDispose: true, + idleTTL: idleTTL !== undefined ? Duration.toMillis(idleTTL) : undefined + }) +) + // ----------------------------------------------------------------------------- // constructors - effect // ----------------------------------------------------------------------------- @@ -584,13 +607,13 @@ export interface RxRuntime extends Rx, E export const context: () => ( create: Layer.Layer | Rx.Read> ) => RxRuntime = () => { - const memoMapRx = make(Layer.makeMemoMap) + const memoMapRx = make(Layer.makeMemoMap).pipe(autoDispose()) return (create: Layer.Layer | Rx.Read>): RxRuntime => { const rx = Object.create(RxRuntimeProto) - rx.keepAlive = false + rx.autoDispose = false rx.refresh = undefined - const layerRx = keepAlive(typeof create === "function" ? readable(create) : readable(() => create)) + const layerRx = typeof create === "function" ? readable(create) : readable(() => create) rx.layer = layerRx rx.read = function read(get: Context) { @@ -730,16 +753,18 @@ export const subscriptionRef: { | Effect.Effect, any, never> | Rx.Read, any, never>> ) => - makeSubRef(readable((get) => { - let value: SubscriptionRef.SubscriptionRef | Effect.Effect, any, any> - if (typeof ref === "function") { - value = ref(get) - } else { - value = ref - } + makeSubRef( + readable((get) => { + let value: SubscriptionRef.SubscriptionRef | Effect.Effect, any, any> + if (typeof ref === "function") { + value = ref(get) + } else { + value = ref + } - return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value - })) + return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value + }).pipe(autoDispose()) + ) // ----------------------------------------------------------------------------- // constructors - subscribable @@ -767,7 +792,7 @@ export const subscribable: { | Effect.Effect, any, never> | Rx.Read, any, never>> ) => - makeSubscribable(readable((get) => { + makeSubscribable(autoDispose(readable((get) => { let value: Subscribable.Subscribable | Effect.Effect, any, any> if (typeof ref === "function") { value = ref(get) @@ -776,7 +801,7 @@ export const subscribable: { } return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value - })) + }))) // ----------------------------------------------------------------------------- // constructors - functions @@ -797,7 +822,7 @@ export const fnSync: { } = (f: Rx.ReadFn, options?: { readonly initialValue?: A }): Writable | A, RxResultFn.ArgToVoid> => { - const argRx = state<[number, Arg]>([0, undefined as any]) + const argRx = state<[number, Arg]>([0, undefined as any]).pipe(autoDispose()) const hasInitialValue = options?.initialValue !== undefined return writable(function(get) { const [counter, arg] = get(argRx) @@ -860,7 +885,7 @@ function makeResultFn( f: Rx.ReadFn | Stream.Stream>, options?: { readonly initialValue?: A } ) { - const argRx = state<[number, Arg]>([0, undefined as any]) + const argRx = state<[number, Arg]>([0, undefined as any]).pipe(autoDispose()) const initialValue = options?.initialValue !== undefined ? Result.success(options.initialValue) : Result.initial() @@ -907,7 +932,7 @@ export const pull = (create: Rx.Read> | Stream.Stream< makeRead(function(get) { return makeStreamPullEffect(get, create, options) }) - ) + ).pipe(autoDispose()) return makeStreamPull(pullRx, options) } @@ -1056,7 +1081,7 @@ export const withFallback: { } return result } - return isWritable(self) + const rx = isWritable(self) ? writable( withFallback, self.write, @@ -1070,18 +1095,9 @@ export const withFallback: { refresh(self) } ) as any + return self.autoDispose ? autoDispose(rx, self.idleTTL) : rx }) -/** - * @since 1.0.0 - * @category combinators - */ -export const keepAlive = >(self: A): A => - Object.assign(Object.create(Object.getPrototypeOf(self)), { - ...self, - keepAlive: true - }) - /** * @since 1.0.0 * @category combinators @@ -1114,23 +1130,7 @@ export const withLabel: { * @since 1.0.0 * @category combinators */ -export const setIdleTTL: { - (duration: Duration.DurationInput): >(self: A) => A - >(self: A, duration: Duration.DurationInput): A -} = dual< - (duration: Duration.DurationInput) => >(self: A) => A, - >(self: A, duration: Duration.DurationInput) => A ->(2, (self, duration) => - Object.assign(Object.create(Object.getPrototypeOf(self)), { - ...self, - idleTTL: Duration.toMillis(duration) - })) - -/** - * @since 1.0.0 - * @category combinators - */ -export const initialValue: { +export const toInitialValue: { (initialValue: A): (self: Rx) => readonly [Rx, A] (self: Rx, initialValue: A): readonly [Rx, A] } = dual< @@ -1152,8 +1152,8 @@ export const transform: { ): [R] extends [Writable] ? Writable : Rx } = dual( 2, - ((self: Rx, f: (get: Context) => B): Rx => - isWritable(self) + ((self: Rx, f: (get: Context) => B): Rx => { + const rx = isWritable(self) ? writable( f, function(ctx, value) { @@ -1168,7 +1168,9 @@ export const transform: { self.refresh ?? function(refresh) { refresh(self) } - )) as any + ) + return self.autoDispose ? autoDispose(rx, self.idleTTL) : rx + }) as any ) /** diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts index a2160f8..067d650 100644 --- a/packages/rx/src/internal/registry.ts +++ b/packages/rx/src/internal/registry.ts @@ -33,7 +33,7 @@ class RegistryImpl implements Registry.Registry { constructor( initialValues?: Iterable, any]>, readonly scheduleTask = queueMicrotask, - readonly timeoutResolution = 5000 + readonly timeoutResolution = 1000 ) { this[TypeId] = TypeId if (initialValues !== undefined) { @@ -43,7 +43,7 @@ class RegistryImpl implements Registry.Registry { } } - private readonly nodes = new Map, Node>() + private readonly nodes = new WeakMap, Node>() private readonly timeoutBuckets = new Map>, handle: number]>() private readonly nodeTimeoutBucket = new Map, number>() private disposed = false @@ -89,7 +89,7 @@ class RegistryImpl implements Registry.Registry { if (node === undefined) { node = this.createNode(rx) this.nodes.set(rx, node) - } else if (!rx.keepAlive && rx.idleTTL) { + } else if (rx.autoDispose && rx.idleTTL) { this.removeNodeTimeout(node) } return node @@ -100,7 +100,7 @@ class RegistryImpl implements Registry.Registry { throw new Error(`Cannot access Rx ${rx}: registry is disposed`) } - if (!rx.keepAlive) { + if (rx.autoDispose) { this.scheduleRxRemoval(rx) } return new Node(this, rx) @@ -186,32 +186,17 @@ class RegistryImpl implements Registry.Registry { node.remove() }) } - - reset(): void { - this.timeoutBuckets.forEach(([, handle]) => clearTimeout(handle)) - this.timeoutBuckets.clear() - this.nodeTimeoutBucket.clear() - - this.nodes.forEach((node) => node.remove()) - this.nodes.clear() - } - - dispose(): void { - this.disposed = true - this.reset() - } } const enum NodeFlags { alive = 1 << 0, initialized = 1 << 1, - waitingForValue = 1 << 2 + waitingForValue = 1 << 2, + notified = 1 << 3 } const enum NodeState { uninitialized = NodeFlags.alive | NodeFlags.waitingForValue, - stale = NodeFlags.alive | NodeFlags.initialized | NodeFlags.waitingForValue, - valid = NodeFlags.alive | NodeFlags.initialized, removed = 0 } @@ -233,10 +218,19 @@ class Node { listeners: Array<() => void> = [] get canBeRemoved(): boolean { - return !this.rx.keepAlive && this.listeners.length === 0 && this.children.length === 0 && + return this.rx.autoDispose && this.listeners.length === 0 && this.children.length === 0 && this.state !== 0 } + get debugFlags() { + return { + alive: (this.state & NodeFlags.alive) !== 0, + initialized: (this.state & NodeFlags.initialized) !== 0, + waitingForValue: (this.state & NodeFlags.waitingForValue) !== 0, + notified: (this.state & NodeFlags.notified) !== 0 + } + } + _value: A = undefined as any value(): A { if ((this.state & NodeFlags.waitingForValue) !== 0) { @@ -270,7 +264,8 @@ class Node { setValue(value: A): void { if ((this.state & NodeFlags.initialized) === 0) { - this.state = NodeState.valid + this.state |= NodeFlags.initialized + this.state &= ~NodeFlags.waitingForValue this._value = value if (batchState.phase !== BatchPhase.collect) { @@ -280,10 +275,11 @@ class Node { return } - this.state = NodeState.valid + this.state &= ~NodeFlags.waitingForValue if (Equal.equals(this._value, value)) { return } + this.state &= ~NodeFlags.notified this._value = value this.invalidateChildren() @@ -319,16 +315,24 @@ class Node { } invalidate(): void { - if (this.state === NodeState.valid) { - this.state = NodeState.stale + const hadFinalizers = this.lifetime !== undefined && this.lifetime.hasFinalizers + if ((this.state & NodeFlags.waitingForValue) === 0) { + this.state |= NodeFlags.waitingForValue this.disposeLifetime() } if (batchState.phase === BatchPhase.collect) { - batchState.stale.push(this) + batchState.stale.push([this, hadFinalizers]) this.invalidateChildren() - } else { + if (!hadFinalizers) { + this.state &= ~NodeFlags.notified + } + } else if (hadFinalizers) { this.value() + } else { + this.invalidateChildren() + this.state &= ~NodeFlags.notified + this.notify() } } @@ -345,6 +349,10 @@ class Node { } notify(): void { + if ((this.state & NodeFlags.notified) !== 0) { + return + } + this.state |= NodeFlags.notified for (let i = 0; i < this.listeners.length; i++) { this.listeners[i]() } @@ -400,6 +408,7 @@ class Node { interface Lifetime extends Rx.Context { readonly node: Node + readonly hasFinalizers: boolean finalizers: Array<() => void> | undefined disposed: boolean readonly dispose: () => void @@ -408,6 +417,10 @@ interface Lifetime extends Rx.Context { const disposedError = (rx: Rx.Rx): Error => new Error(`Cannot use context of disposed Rx: ${rx}`) const LifetimeProto: Omit, "node" | "finalizers" | "disposed"> = { + get hasFinalizers() { + return (this as Lifetime).finalizers !== undefined && (this as Lifetime).finalizers!.length !== 0 + }, + addFinalizer(this: Lifetime, f: () => void): void { if (this.disposed) { throw disposedError(this.node.rx) @@ -654,7 +667,7 @@ export const enum BatchPhase { export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({ phase: BatchPhase.disabled, depth: 0, - stale: [] as Array> + stale: [] as Array<[Node, hadFinalizers: boolean]> })) /** @internal */ @@ -666,7 +679,8 @@ export function batch(f: () => void): void { if (batchState.depth === 1) { batchState.phase = BatchPhase.commit for (let i = 0; i < batchState.stale.length; i++) { - batchRebuildNode(batchState.stale[i]) + const [node, hadFinalizers] = batchState.stale[i] + batchRebuildNode(node, hadFinalizers) } } } finally { @@ -678,20 +692,25 @@ export function batch(f: () => void): void { } } -function batchRebuildNode(node: Node) { - if (node.state === NodeState.valid) { +function batchRebuildNode(node: Node, hadFinalizers: boolean) { + if ((node.state & NodeFlags.waitingForValue) === 0) { return } for (let i = 0; i < node.parents.length; i++) { const parent = node.parents[i] - if (parent.state !== NodeState.valid) { - batchRebuildNode(parent) + if ((parent.state & NodeFlags.waitingForValue) === 0) { + batchRebuildNode(parent, false) } } // @ts-ignore - if (node.state !== NodeState.valid) { - node.value() + if ((node.state & NodeFlags.waitingForValue) !== 0) { + if (hadFinalizers) { + node.value() + } else { + node.invalidateChildren() + node.notify() + } } } diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts index a9d3ca5..a831980 100644 --- a/packages/rx/test/Rx.test.ts +++ b/packages/rx/test/Rx.test.ts @@ -26,8 +26,8 @@ describe("Rx", () => { expect(r.get(counter)).toEqual(1) }) - it("keepAlive false", async () => { - const counter = Rx.make(0) + it("autoDispose true", async () => { + const counter = Rx.make(0).pipe(Rx.autoDispose()) const r = Registry.make() r.set(counter, 1) expect(r.get(counter)).toEqual(1) @@ -35,10 +35,8 @@ describe("Rx", () => { expect(r.get(counter)).toEqual(0) }) - it("keepAlive true", async () => { - const counter = Rx.make(0).pipe( - Rx.keepAlive - ) + it("autoDispose false", async () => { + const counter = Rx.make(0) const r = Registry.make() r.set(counter, 1) expect(r.get(counter)).toEqual(1) @@ -47,7 +45,7 @@ describe("Rx", () => { }) it("subscribe", async () => { - const counter = Rx.make(0) + const counter = Rx.make(0).pipe(Rx.autoDispose()) const r = Registry.make() let count = 0 const cancel = r.subscribe(counter, (_) => { @@ -72,9 +70,11 @@ describe("Rx", () => { }) it("runtime replacement", async () => { - const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)) + const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)).pipe( + Rx.autoDispose + ) const r = Registry.make({ - initialValues: [Rx.initialValue(counterRuntime.layer, CounterTest)] + initialValues: [Rx.toInitialValue(counterRuntime.layer, CounterTest)] }) const result = r.get(count) assert(Result.isSuccess(result)) @@ -82,8 +82,10 @@ describe("Rx", () => { }) it("runtime multiple", async () => { - const buildCount = buildCounterRuntime.fn((_: void) => Effect.flatMap(BuildCounter, (_) => _.get)) - const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)) + const buildCount = buildCounterRuntime.fn((_: void) => Effect.flatMap(BuildCounter, (_) => _.get)).pipe( + Rx.autoDispose() + ) + const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)).pipe(Rx.autoDispose()) const timesTwo = multiplierRuntime.rx((get) => Effect.gen(function*() { const counter = yield* Counter @@ -92,7 +94,7 @@ describe("Rx", () => { expect(yield* get.result(count)).toEqual(2) return yield* multiplier.times(2) }) - ) + ).pipe(Rx.autoDispose()) const r = Registry.make() const cancel = r.mount(buildCount) @@ -132,7 +134,7 @@ describe("Rx", () => { const count = Rx.make( Effect.succeed(1).pipe(Effect.delay(100)), { initialValue: 0 } - ).pipe(Rx.keepAlive) + ) const r = Registry.make() let result = r.get(count) assert(Result.isSuccess(result)) @@ -232,7 +234,7 @@ describe("Rx", () => { ) ) ) - ).pipe(Rx.keepAlive) + ) const r = Registry.make() let result = r.get(count) assert(Result.isInitial(result)) @@ -255,7 +257,7 @@ describe("Rx", () => { Stream.range(0, 2).pipe( Stream.tap(() => Effect.sleep(50)) ) - ) + ).pipe(Rx.autoDispose()) const r = Registry.make() const unmount = r.mount(count) let result = r.get(count) @@ -319,7 +321,7 @@ describe("Rx", () => { Stream.range(start, start + 1).pipe( Stream.tap(() => Effect.sleep(50)) ) - ) + ).pipe(Rx.autoDispose()) const r = Registry.make() const unmount = r.mount(count) let result = r.get(count) @@ -375,7 +377,7 @@ describe("Rx", () => { Stream.range(0, 1, 1).pipe( Stream.tap(() => Effect.sleep(50)) ) - ).pipe(Rx.refreshable) + ).pipe(Rx.refreshable, Rx.autoDispose()) const r = Registry.make() const unmount = r.mount(count) @@ -432,7 +434,7 @@ describe("Rx", () => { Stream.unwrap, Stream.tap(() => Effect.sleep(50)) ) - ).pipe(Rx.refreshable) + ).pipe(Rx.refreshable, Rx.autoDispose()) const r = Registry.make() const unmount = r.mount(count) @@ -485,7 +487,7 @@ describe("Rx", () => { const count = Rx.pull(() => Stream.range(1, 2, 1).pipe( Stream.tap(() => Effect.sleep(50)) - ), { initialValue: [0] }).pipe(Rx.refreshable) + ), { initialValue: [0] }).pipe(Rx.refreshable, Rx.autoDispose()) const r = Registry.make() const unmount = r.mount(count) @@ -515,7 +517,7 @@ describe("Rx", () => { r.set(count(1), 2) assert.strictEqual(r.get(count(1)), 2) - const countKeep = Rx.family((n: number) => Rx.make(n).pipe(Rx.keepAlive)) + const countKeep = Rx.family((n: number) => Rx.make(n)) assert.strictEqual(countKeep(1), countKeep(1)) r.get(countKeep(1)) const hashKeep = Hash.hash(countKeep(1)) @@ -537,8 +539,8 @@ describe("Rx", () => { it("batching", async () => { const r = Registry.make() - const state = Rx.make(1).pipe(Rx.keepAlive) - const state2 = Rx.make("a").pipe(Rx.keepAlive) + const state = Rx.make(1) + const state2 = Rx.make("a") let count = 0 const derived = Rx.readable((get) => { count++ @@ -550,14 +552,15 @@ describe("Rx", () => { r.set(state, 2) r.set(state2, "b") }) - expect(count).toEqual(2) + expect(count).toEqual(1) expect(r.get(derived)).toEqual("2b") + expect(count).toEqual(2) }) it("nested batch", async () => { const r = Registry.make() - const state = Rx.make(1).pipe(Rx.keepAlive) - const state2 = Rx.make("a").pipe(Rx.keepAlive) + const state = Rx.make(1) + const state2 = Rx.make("a") let count = 0 const derived = Rx.readable((get) => { count++ @@ -571,14 +574,15 @@ describe("Rx", () => { r.set(state2, "b") }) }) - expect(count).toEqual(2) + expect(count).toEqual(1) expect(r.get(derived)).toEqual("2b") + expect(count).toEqual(2) }) it("read correct updated state in batch", async () => { const r = Registry.make() - const state = Rx.make(1).pipe(Rx.keepAlive) - const state2 = Rx.make("a").pipe(Rx.keepAlive) + const state = Rx.make(1) + const state2 = Rx.make("a") let count = 0 const derived = Rx.readable((get) => { count++ @@ -591,15 +595,15 @@ describe("Rx", () => { expect(r.get(derived)).toEqual("2a") r.set(state2, "b") }) - expect(count).toEqual(3) + expect(count).toEqual(2) expect(r.get(derived)).toEqual("2b") expect(count).toEqual(3) }) it("notifies listeners after batch commit", async () => { const r = Registry.make() - const state = Rx.make(1).pipe(Rx.keepAlive) - const state2 = Rx.make("a").pipe(Rx.keepAlive) + const state = Rx.make(1).pipe(Rx.autoDispose()) + const state2 = Rx.make("a").pipe(Rx.autoDispose()) let count = 0 const derived = Rx.readable((get) => { return get(state) + get(state2) @@ -615,13 +619,14 @@ describe("Rx", () => { }) expect(count).toEqual(1) expect(r.get(derived)).toEqual("2b") + expect(count).toEqual(2) }) it("initialValues", async () => { - const state = Rx.make(0) + const state = Rx.make(0).pipe(Rx.autoDispose()) const r = Registry.make({ initialValues: [ - Rx.initialValue(state, 10) + Rx.toInitialValue(state, 10) ] }) expect(r.get(state)).toEqual(10) @@ -631,13 +636,13 @@ describe("Rx", () => { it("idleTTL", async () => { const state = Rx.make(0).pipe( - Rx.setIdleTTL(2000) + Rx.autoDispose(2000) ) const state2 = Rx.make(0).pipe( - Rx.setIdleTTL(10000) + Rx.autoDispose(10000) ) const state3 = Rx.make(0).pipe( - Rx.setIdleTTL(3000) + Rx.autoDispose(3000) ) const r = Registry.make() r.set(state, 10) @@ -665,7 +670,7 @@ describe("Rx", () => { }) it("fn", async () => { - const count = Rx.fnSync((n: number) => n).pipe(Rx.keepAlive) + const count = Rx.fnSync((n: number) => n) const r = Registry.make() assert.deepEqual(r.get(count), Option.none()) @@ -688,8 +693,7 @@ describe("Rx", () => { Effect.delay(100) ) ).pipe( - Rx.withFallback(Rx.make(() => Effect.succeed(0))), - Rx.keepAlive + Rx.withFallback(Rx.make(() => Effect.succeed(0))) ) const r = Registry.make() assert.deepEqual(r.get(count), Result.waiting(Result.success(0))) @@ -699,7 +703,9 @@ describe("Rx", () => { }) it("failure with previousValue", async () => { - const count = Rx.fn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i)) + const count = Rx.fn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i)).pipe( + Rx.autoDispose() + ) const r = Registry.make() let result = r.get(count) @@ -707,6 +713,7 @@ describe("Rx", () => { r.set(count, 0) result = r.get(count) + console.log(result) assert(Result.isSuccess(result)) assert.strictEqual(result.value, 0) @@ -746,8 +753,8 @@ describe("Rx", () => { it("get.streamResult", async () => { const count = Rx.make(0) - const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2))) - const plusOne = Rx.make((get) => get.streamResult(multiplied).pipe(Stream.map((_) => _ + 1))) + const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2))).pipe(Rx.autoDispose()) + const plusOne = Rx.make((get) => get.streamResult(multiplied).pipe(Stream.map((_) => _ + 1))).pipe(Rx.autoDispose()) const r = Registry.make() const cancel = r.mount(plusOne) @@ -788,7 +795,7 @@ describe("Rx", () => { it("Subscribable/SubscriptionRef", async () => { vitest.useRealTimers() const ref = SubscriptionRef.make(123).pipe(Effect.runSync) - const rx = Rx.subscribable(ref) + const rx = Rx.subscribable(ref).pipe(Rx.autoDispose()) const r = Registry.make() assert.deepStrictEqual(r.get(rx), 123) await Effect.runPromise(SubscriptionRef.update(ref, (a) => a + 1)) @@ -901,7 +908,7 @@ const MultiplierLive = Layer.effect( Layer.provideMerge(CounterLive) ) -const buildCounterRuntime = Rx.runtime(BuildCounterLive) -const counterRuntime = Rx.runtime(CounterLive) -const multiplierRuntime = Rx.runtime(MultiplierLive) -const fiberRefRuntime = Rx.runtime(Layer.setRequestCaching(true)) +const buildCounterRuntime = Rx.runtime(BuildCounterLive).pipe(Rx.autoDispose()) +const counterRuntime = Rx.runtime(CounterLive).pipe(Rx.autoDispose()) +const multiplierRuntime = Rx.runtime(MultiplierLive).pipe(Rx.autoDispose()) +const fiberRefRuntime = Rx.runtime(Layer.setRequestCaching(true)).pipe(Rx.autoDispose())