From bb9c0f24258c6073439d5b817e137a61ae75ca41 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 13 Oct 2023 11:59:36 +1300 Subject: [PATCH] add idleTTL for non-keepAlive Rx's --- .changeset/slow-crabs-wash.md | 5 ++ packages/rx/src/Rx.ts | 34 ++++++++++- packages/rx/src/internal/registry.ts | 89 +++++++++++++++++++++++++--- packages/rx/test/Rx.test.ts | 59 +++++++++++++++--- scripts/clean.mjs | 1 + tsconfig.base.json | 4 +- 6 files changed, 171 insertions(+), 21 deletions(-) create mode 100644 .changeset/slow-crabs-wash.md diff --git a/.changeset/slow-crabs-wash.md b/.changeset/slow-crabs-wash.md new file mode 100644 index 0000000..3f5dcab --- /dev/null +++ b/.changeset/slow-crabs-wash.md @@ -0,0 +1,5 @@ +--- +"@effect-rx/rx": patch +--- + +add idleTTL for non-keepAlive Rx's diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index ff41ef6..7e088da 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -3,6 +3,7 @@ */ import { NoSuchElementException } from "effect/Cause" import * as Chunk from "effect/Chunk" +import * as Duration from "effect/Duration" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import { dual, pipe } from "effect/Function" @@ -40,6 +41,7 @@ export interface Rx extends Pipeable, Inspectable.Inspectable { readonly read: Rx.Read readonly refresh?: Rx.Refresh readonly label?: readonly [name: string, stack: string] + readonly idleTTL?: number } /** @@ -484,16 +486,19 @@ export interface RxRuntime extends Rx> export const runtime: { (layer: Layer.Layer, options?: { readonly autoDispose?: boolean + readonly idleTTL?: Duration.DurationInput }): RxRuntime (layer: Layer.Layer, options?: { readonly autoDispose?: boolean + readonly idleTTL?: Duration.DurationInput readonly runtime: RxRuntime }): RxRuntime } = (layer: Layer.Layer, options?: { readonly autoDispose?: boolean + readonly idleTTL?: Duration.DurationInput readonly runtime?: RxRuntime }): RxRuntime => { - const rx = options?.runtime + let rx = options?.runtime ? scoped(() => Effect.flatMap( Layer.build(layer), @@ -501,7 +506,14 @@ export const runtime: { ), { runtime: options.runtime }) : scoped(() => Layer.toRuntime(layer) as Effect.Effect>) - return options?.autoDispose ? rx : keepAlive(rx) + if (options?.idleTTL !== undefined) { + rx = setIdleTTL(rx, options.idleTTL) + } + if (options?.autoDispose !== true) { + rx = keepAlive(rx) + } + + return rx } function makeStream( @@ -766,7 +778,7 @@ export const refreshable = >( */ export const withLabel: { (name: string): >(self: A) => A - >(self: A, name: string): >(self: A) => A + >(self: A, name: string): A } = dual< (name: string) => >(self: A) => A, >(self: A, name: string) => A @@ -776,6 +788,22 @@ export const withLabel: { label: [name, new Error().stack?.split("\n")[5] ?? ""] })) +/** + * @since 1.0.0 + * @category combinators + */ +export const setIdleTTL: { + (duration: Duration.DurationInput): >(self: A) => A + >(self: A, duration: Duration.DurationInput): A +} = dual< + (duration: Duration.DurationInput) => >(self: A) => A, + >(self: A, duration: Duration.DurationInput) => A +>(2, (self, duration) => + Object.assign(Object.create(Object.getPrototypeOf(self)), { + ...self, + idleTTL: Duration.toMillis(duration) + })) + /** * @since 1.0.0 * @category combinators diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts index 9d3e3d1..9e6f353 100644 --- a/packages/rx/src/internal/registry.ts +++ b/packages/rx/src/internal/registry.ts @@ -21,7 +21,10 @@ export const make = (options?: { class RegistryImpl implements Registry.Registry { readonly [TypeId]: Registry.TypeId - constructor(initialValues?: Iterable, any]>) { + constructor( + initialValues?: Iterable, any]>, + readonly timeoutResolution = 5000 + ) { this[TypeId] = TypeId if (initialValues !== undefined) { for (const [rx, value] of initialValues) { @@ -31,6 +34,9 @@ class RegistryImpl implements Registry.Registry { } private readonly nodes = new Map, Node>() + private readonly timeoutBuckets = new Map>, handle: NodeJS.Timeout]>() + private readonly nodeTimeoutBucket = new Map, number>() + private disposed = false get(rx: Rx.Rx): A { return this.ensureNode(rx).value() @@ -73,11 +79,17 @@ class RegistryImpl implements Registry.Registry { if (node === undefined) { node = this.createNode(rx) this.nodes.set(rx, node) + } else if (!rx.keepAlive && rx.idleTTL) { + this.removeNodeTimeout(node) } return node } createNode(rx: Rx.Rx): Node { + if (this.disposed) { + throw new Error(`Cannot access Rx ${rx}: registry is disposed`) + } + if (!rx.keepAlive) { this.scheduleRxRemoval(rx) } @@ -106,14 +118,74 @@ class RegistryImpl implements Registry.Registry { } removeNode(node: Node): void { - const parents = node.parents - this.nodes.delete(node.rx) - node.remove() - for (let i = 0; i < parents.length; i++) { - if (parents[i].canBeRemoved) { - this.removeNode(parents[i]) - } + if (node.rx.idleTTL) { + this.setNodeTimeout(node) + } else { + this.nodes.delete(node.rx) + node.remove() + } + } + + setNodeTimeout(node: Node): void { + if (this.nodeTimeoutBucket.has(node)) { + return + } + + const ttl = Math.ceil(node.rx.idleTTL! / this.timeoutResolution) * this.timeoutResolution + const timestamp = Date.now() + ttl + const bucket = timestamp - (timestamp % this.timeoutResolution) + this.timeoutResolution + + let entry = this.timeoutBuckets.get(bucket) + if (entry === undefined) { + entry = [ + new Set>(), + setTimeout(() => this.sweepBucket(bucket), bucket - Date.now()) + ] + this.timeoutBuckets.set(bucket, entry) } + entry[0].add(node) + this.nodeTimeoutBucket.set(node, bucket) + } + + removeNodeTimeout(node: Node): void { + const bucket = this.nodeTimeoutBucket.get(node) + if (bucket === undefined) { + return + } + this.nodeTimeoutBucket.delete(node) + this.scheduleNodeRemoval(node) + + const [nodes, handle] = this.timeoutBuckets.get(bucket)! + nodes.delete(node) + if (nodes.size === 0) { + clearTimeout(handle) + this.timeoutBuckets.delete(bucket) + } + } + + sweepBucket(bucket: number): void { + const nodes = this.timeoutBuckets.get(bucket)![0] + this.timeoutBuckets.delete(bucket) + + nodes.forEach((node) => { + if (!node.canBeRemoved) { + return + } + this.nodeTimeoutBucket.delete(node) + this.nodes.delete(node.rx) + node.remove() + }) + } + + dispose(): void { + this.disposed = true + + this.timeoutBuckets.forEach(([, handle]) => clearTimeout(handle)) + this.timeoutBuckets.clear() + this.nodeTimeoutBucket.clear() + + this.nodes.forEach((node) => node.remove()) + this.nodes.clear() } } @@ -279,6 +351,7 @@ class Node { remove() { this.state = NodeState.removed + this.listeners = [] if (this.lifetime === undefined) { return diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts index 9f261d5..cf7988a 100644 --- a/packages/rx/test/Rx.test.ts +++ b/packages/rx/test/Rx.test.ts @@ -9,6 +9,13 @@ import * as Option from "effect/Option" import * as Stream from "effect/Stream" describe("Rx", () => { + beforeEach(() => { + vitest.useFakeTimers() + }) + afterEach(() => { + vitest.useRealTimers() + }) + it("get/set", () => { const counter = Rx.state(0) const r = Registry.make() @@ -236,7 +243,7 @@ describe("Rx", () => { let result = r.get(count) assert.strictEqual(result._tag, "Initial") - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert.strictEqual(result._tag, "Initial") @@ -245,12 +252,12 @@ describe("Rx", () => { assert(Result.isWaiting(result)) assert.strictEqual(result.previous._tag, "Initial") - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isWaiting(result)) assert.deepEqual(Result.value(result), Option.some(1)) - await new Promise((resolve) => setTimeout(resolve, 50)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isSuccess(result)) assert.deepEqual(Result.value(result), Option.some(2)) @@ -260,12 +267,12 @@ describe("Rx", () => { assert(Result.isWaiting(result)) assert.deepEqual(Result.value(result), Option.some(2)) - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isWaiting(result)) assert.deepEqual(Result.value(result), Option.some(5)) - await new Promise((resolve) => setTimeout(resolve, 50)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isSuccess(result)) assert.deepEqual(Result.value(result), Option.some(6)) @@ -289,7 +296,7 @@ describe("Rx", () => { assert(Result.isWaiting(result)) assert(Option.isNone(Result.value(result))) - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isSuccess(result)) assert.deepEqual(result.value, { done: false, items: [0] }) @@ -299,7 +306,7 @@ describe("Rx", () => { assert(Result.isWaiting(result)) assert.deepEqual(Result.value(result), Option.some({ done: false, items: [0] })) - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isSuccess(result)) assert.deepEqual(result.value, { done: false, items: [0, 1] }) @@ -314,7 +321,7 @@ describe("Rx", () => { assert(Result.isWaiting(result)) assert.deepEqual(Result.value(result), Option.some({ done: true, items: [0, 1] })) - await new Promise((resolve) => setTimeout(resolve, 55)) + await vitest.advanceTimersByTimeAsync(50) result = r.get(count) assert(Result.isSuccess(result)) assert.deepEqual(result.value, { done: false, items: [0] }) @@ -448,6 +455,42 @@ describe("Rx", () => { await new Promise((resolve) => resolve(null)) expect(r.get(state)).toEqual(0) }) + + it("idleTTL", async () => { + const state = Rx.state(0).pipe( + Rx.setIdleTTL(2000) + ) + const state2 = Rx.state(0).pipe( + Rx.setIdleTTL(10000) + ) + const state3 = Rx.state(0).pipe( + Rx.setIdleTTL(3000) + ) + const r = Registry.make() + r.set(state, 10) + r.set(state2, 10) + r.set(state3, 10) + expect(r.get(state)).toEqual(10) + expect(r.get(state2)).toEqual(10) + expect(r.get(state3)).toEqual(10) + await new Promise((resolve) => resolve(null)) + expect(r.get(state)).toEqual(10) + expect(r.get(state2)).toEqual(10) + expect(r.get(state3)).toEqual(10) + + await new Promise((resolve) => resolve(null)) + console.log(r) + await vitest.advanceTimersByTimeAsync(10000) + expect(r.get(state)).toEqual(0) + expect(r.get(state2)).toEqual(10) + expect(r.get(state3)).toEqual(0) + + await new Promise((resolve) => resolve(null)) + await vitest.advanceTimersByTimeAsync(20000) + expect(r.get(state)).toEqual(0) + expect(r.get(state2)).toEqual(0) + expect(r.get(state3)).toEqual(0) + }) }) interface Counter { diff --git a/scripts/clean.mjs b/scripts/clean.mjs index 5f964d9..d5630d9 100644 --- a/scripts/clean.mjs +++ b/scripts/clean.mjs @@ -9,6 +9,7 @@ import * as Glob from "glob"; ".ultra.cache.json", "build", "coverage", + "src/tsconfig.json", ...(pkg === "." ? [] : ["docs"]), ...files, ] diff --git a/tsconfig.base.json b/tsconfig.base.json index 25fdd3f..cfc3adc 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -35,10 +35,10 @@ "removeComments": false, "baseUrl": ".", "paths": { - "@effect-rx/rx/test/*": ["./packages/rx/test/*"], + "@effect-rx/rx/test/*": ["./packages/rx/test/*.ts"], "@effect-rx/rx/*": ["./packages/rx/src/*.ts"], "@effect-rx/rx": ["./packages/rx/src/index.ts"], - "@effect-rx/rx-react/test/*": ["./packages/rx-react/test/*"], + "@effect-rx/rx-react/test/*": ["./packages/rx-react/test/*.ts"], "@effect-rx/rx-react/*": ["./packages/rx-react/src/*.ts"], "@effect-rx/rx-react": ["./packages/rx-react/src/index.ts"] },