Skip to content

Commit

Permalink
add idleTTL for non-keepAlive Rx's
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Oct 12, 2023
1 parent 5727398 commit bb9c0f2
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/slow-crabs-wash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

add idleTTL for non-keepAlive Rx's
34 changes: 31 additions & 3 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
import { NoSuchElementException } from "effect/Cause"
import * as Chunk from "effect/Chunk"
import * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import { dual, pipe } from "effect/Function"
Expand Down Expand Up @@ -40,6 +41,7 @@ export interface Rx<A> extends Pipeable, Inspectable.Inspectable {
readonly read: Rx.Read<A>
readonly refresh?: Rx.Refresh
readonly label?: readonly [name: string, stack: string]
readonly idleTTL?: number
}

/**
Expand Down Expand Up @@ -484,24 +486,34 @@ export interface RxRuntime<E, A> extends Rx<Result.Result<E, Runtime.Runtime<A>>
export const runtime: {
<E, A>(layer: Layer.Layer<never, E, A>, options?: {
readonly autoDispose?: boolean
readonly idleTTL?: Duration.DurationInput
}): RxRuntime<E, A>
<R, E, A, RR extends R, RE>(layer: Layer.Layer<R, E, A>, options?: {
readonly autoDispose?: boolean
readonly idleTTL?: Duration.DurationInput
readonly runtime: RxRuntime<RE, RR>
}): RxRuntime<E, A | RR>
} = <R, E, A, RE>(layer: Layer.Layer<R, E, A>, options?: {
readonly autoDispose?: boolean
readonly idleTTL?: Duration.DurationInput
readonly runtime?: RxRuntime<RE, R>
}): RxRuntime<E | RE, A> => {
const rx = options?.runtime
let rx = options?.runtime
? scoped(() =>
Effect.flatMap(
Layer.build(layer),
(context) => Effect.provide(Effect.runtime<A>(), context)
), { runtime: options.runtime })
: scoped(() => Layer.toRuntime(layer) as Effect.Effect<Scope.Scope, E, Runtime.Runtime<A>>)

return options?.autoDispose ? rx : keepAlive(rx)
if (options?.idleTTL !== undefined) {
rx = setIdleTTL(rx, options.idleTTL)
}
if (options?.autoDispose !== true) {
rx = keepAlive(rx)
}

return rx
}

function makeStream<E, A>(
Expand Down Expand Up @@ -766,7 +778,7 @@ export const refreshable = <T extends Rx<any>>(
*/
export const withLabel: {
(name: string): <A extends Rx<any>>(self: A) => A
<A extends Rx<any>>(self: A, name: string): <A extends Rx<any>>(self: A) => A
<A extends Rx<any>>(self: A, name: string): A
} = dual<
(name: string) => <A extends Rx<any>>(self: A) => A,
<A extends Rx<any>>(self: A, name: string) => A
Expand All @@ -776,6 +788,22 @@ export const withLabel: {
label: [name, new Error().stack?.split("\n")[5] ?? ""]
}))

/**
* @since 1.0.0
* @category combinators
*/
export const setIdleTTL: {
(duration: Duration.DurationInput): <A extends Rx<any>>(self: A) => A
<A extends Rx<any>>(self: A, duration: Duration.DurationInput): A
} = dual<
(duration: Duration.DurationInput) => <A extends Rx<any>>(self: A) => A,
<A extends Rx<any>>(self: A, duration: Duration.DurationInput) => A
>(2, (self, duration) =>
Object.assign(Object.create(Object.getPrototypeOf(self)), {
...self,
idleTTL: Duration.toMillis(duration)
}))

/**
* @since 1.0.0
* @category combinators
Expand Down
89 changes: 81 additions & 8 deletions packages/rx/src/internal/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export const make = (options?: {

class RegistryImpl implements Registry.Registry {
readonly [TypeId]: Registry.TypeId
constructor(initialValues?: Iterable<readonly [Rx.Rx<any>, any]>) {
constructor(
initialValues?: Iterable<readonly [Rx.Rx<any>, any]>,
readonly timeoutResolution = 5000
) {
this[TypeId] = TypeId
if (initialValues !== undefined) {
for (const [rx, value] of initialValues) {
Expand All @@ -31,6 +34,9 @@ class RegistryImpl implements Registry.Registry {
}

private readonly nodes = new Map<Rx.Rx<any>, Node<any>>()
private readonly timeoutBuckets = new Map<number, readonly [nodes: Set<Node<any>>, handle: NodeJS.Timeout]>()
private readonly nodeTimeoutBucket = new Map<Node<any>, number>()
private disposed = false

get<A>(rx: Rx.Rx<A>): A {
return this.ensureNode(rx).value()
Expand Down Expand Up @@ -73,11 +79,17 @@ class RegistryImpl implements Registry.Registry {
if (node === undefined) {
node = this.createNode(rx)
this.nodes.set(rx, node)
} else if (!rx.keepAlive && rx.idleTTL) {
this.removeNodeTimeout(node)
}
return node
}

createNode<A>(rx: Rx.Rx<A>): Node<A> {
if (this.disposed) {
throw new Error(`Cannot access Rx ${rx}: registry is disposed`)
}

if (!rx.keepAlive) {
this.scheduleRxRemoval(rx)
}
Expand Down Expand Up @@ -106,14 +118,74 @@ class RegistryImpl implements Registry.Registry {
}

removeNode(node: Node<any>): void {
const parents = node.parents
this.nodes.delete(node.rx)
node.remove()
for (let i = 0; i < parents.length; i++) {
if (parents[i].canBeRemoved) {
this.removeNode(parents[i])
}
if (node.rx.idleTTL) {
this.setNodeTimeout(node)
} else {
this.nodes.delete(node.rx)
node.remove()
}
}

setNodeTimeout(node: Node<any>): void {
if (this.nodeTimeoutBucket.has(node)) {
return
}

const ttl = Math.ceil(node.rx.idleTTL! / this.timeoutResolution) * this.timeoutResolution
const timestamp = Date.now() + ttl
const bucket = timestamp - (timestamp % this.timeoutResolution) + this.timeoutResolution

let entry = this.timeoutBuckets.get(bucket)
if (entry === undefined) {
entry = [
new Set<Node<any>>(),
setTimeout(() => this.sweepBucket(bucket), bucket - Date.now())
]
this.timeoutBuckets.set(bucket, entry)
}
entry[0].add(node)
this.nodeTimeoutBucket.set(node, bucket)
}

removeNodeTimeout(node: Node<any>): void {
const bucket = this.nodeTimeoutBucket.get(node)
if (bucket === undefined) {
return
}
this.nodeTimeoutBucket.delete(node)
this.scheduleNodeRemoval(node)

const [nodes, handle] = this.timeoutBuckets.get(bucket)!
nodes.delete(node)
if (nodes.size === 0) {
clearTimeout(handle)
this.timeoutBuckets.delete(bucket)
}
}

sweepBucket(bucket: number): void {
const nodes = this.timeoutBuckets.get(bucket)![0]
this.timeoutBuckets.delete(bucket)

nodes.forEach((node) => {
if (!node.canBeRemoved) {
return
}
this.nodeTimeoutBucket.delete(node)
this.nodes.delete(node.rx)
node.remove()
})
}

dispose(): void {
this.disposed = true

this.timeoutBuckets.forEach(([, handle]) => clearTimeout(handle))
this.timeoutBuckets.clear()
this.nodeTimeoutBucket.clear()

this.nodes.forEach((node) => node.remove())
this.nodes.clear()
}
}

Expand Down Expand Up @@ -279,6 +351,7 @@ class Node<A> {

remove() {
this.state = NodeState.removed
this.listeners = []

if (this.lifetime === undefined) {
return
Expand Down
59 changes: 51 additions & 8 deletions packages/rx/test/Rx.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import * as Option from "effect/Option"
import * as Stream from "effect/Stream"

describe("Rx", () => {
beforeEach(() => {
vitest.useFakeTimers()
})
afterEach(() => {
vitest.useRealTimers()
})

it("get/set", () => {
const counter = Rx.state(0)
const r = Registry.make()
Expand Down Expand Up @@ -236,7 +243,7 @@ describe("Rx", () => {
let result = r.get(count)
assert.strictEqual(result._tag, "Initial")

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert.strictEqual(result._tag, "Initial")

Expand All @@ -245,12 +252,12 @@ describe("Rx", () => {
assert(Result.isWaiting(result))
assert.strictEqual(result.previous._tag, "Initial")

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isWaiting(result))
assert.deepEqual(Result.value(result), Option.some(1))

await new Promise((resolve) => setTimeout(resolve, 50))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isSuccess(result))
assert.deepEqual(Result.value(result), Option.some(2))
Expand All @@ -260,12 +267,12 @@ describe("Rx", () => {
assert(Result.isWaiting(result))
assert.deepEqual(Result.value(result), Option.some(2))

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isWaiting(result))
assert.deepEqual(Result.value(result), Option.some(5))

await new Promise((resolve) => setTimeout(resolve, 50))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isSuccess(result))
assert.deepEqual(Result.value(result), Option.some(6))
Expand All @@ -289,7 +296,7 @@ describe("Rx", () => {
assert(Result.isWaiting(result))
assert(Option.isNone(Result.value(result)))

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isSuccess(result))
assert.deepEqual(result.value, { done: false, items: [0] })
Expand All @@ -299,7 +306,7 @@ describe("Rx", () => {
assert(Result.isWaiting(result))
assert.deepEqual(Result.value(result), Option.some({ done: false, items: [0] }))

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isSuccess(result))
assert.deepEqual(result.value, { done: false, items: [0, 1] })
Expand All @@ -314,7 +321,7 @@ describe("Rx", () => {
assert(Result.isWaiting(result))
assert.deepEqual(Result.value(result), Option.some({ done: true, items: [0, 1] }))

await new Promise((resolve) => setTimeout(resolve, 55))
await vitest.advanceTimersByTimeAsync(50)
result = r.get(count)
assert(Result.isSuccess(result))
assert.deepEqual(result.value, { done: false, items: [0] })
Expand Down Expand Up @@ -448,6 +455,42 @@ describe("Rx", () => {
await new Promise((resolve) => resolve(null))
expect(r.get(state)).toEqual(0)
})

it("idleTTL", async () => {
const state = Rx.state(0).pipe(
Rx.setIdleTTL(2000)
)
const state2 = Rx.state(0).pipe(
Rx.setIdleTTL(10000)
)
const state3 = Rx.state(0).pipe(
Rx.setIdleTTL(3000)
)
const r = Registry.make()
r.set(state, 10)
r.set(state2, 10)
r.set(state3, 10)
expect(r.get(state)).toEqual(10)
expect(r.get(state2)).toEqual(10)
expect(r.get(state3)).toEqual(10)
await new Promise((resolve) => resolve(null))
expect(r.get(state)).toEqual(10)
expect(r.get(state2)).toEqual(10)
expect(r.get(state3)).toEqual(10)

await new Promise((resolve) => resolve(null))
console.log(r)
await vitest.advanceTimersByTimeAsync(10000)
expect(r.get(state)).toEqual(0)
expect(r.get(state2)).toEqual(10)
expect(r.get(state3)).toEqual(0)

await new Promise((resolve) => resolve(null))
await vitest.advanceTimersByTimeAsync(20000)
expect(r.get(state)).toEqual(0)
expect(r.get(state2)).toEqual(0)
expect(r.get(state3)).toEqual(0)
})
})

interface Counter {
Expand Down
1 change: 1 addition & 0 deletions scripts/clean.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as Glob from "glob";
".ultra.cache.json",
"build",
"coverage",
"src/tsconfig.json",
...(pkg === "." ? [] : ["docs"]),
...files,
]
Expand Down
4 changes: 2 additions & 2 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
"removeComments": false,
"baseUrl": ".",
"paths": {
"@effect-rx/rx/test/*": ["./packages/rx/test/*"],
"@effect-rx/rx/test/*": ["./packages/rx/test/*.ts"],
"@effect-rx/rx/*": ["./packages/rx/src/*.ts"],
"@effect-rx/rx": ["./packages/rx/src/index.ts"],
"@effect-rx/rx-react/test/*": ["./packages/rx-react/test/*"],
"@effect-rx/rx-react/test/*": ["./packages/rx-react/test/*.ts"],
"@effect-rx/rx-react/*": ["./packages/rx-react/src/*.ts"],
"@effect-rx/rx-react": ["./packages/rx-react/src/index.ts"]
},
Expand Down

0 comments on commit bb9c0f2

Please sign in to comment.