diff --git a/.changeset/shy-terms-dream.md b/.changeset/shy-terms-dream.md
new file mode 100644
index 0000000..7c0a8f3
--- /dev/null
+++ b/.changeset/shy-terms-dream.md
@@ -0,0 +1,7 @@
+---
+"@effect-rx/rx": minor
+"@effect-rx/rx-react": minor
+"@effect-rx/rx-vue": minor
+---
+
+if an Rx has no finilizers, use lazy calculation
diff --git a/.changeset/ten-ghosts-scream.md b/.changeset/ten-ghosts-scream.md
new file mode 100644
index 0000000..743cfaf
--- /dev/null
+++ b/.changeset/ten-ghosts-scream.md
@@ -0,0 +1,7 @@
+---
+"@effect-rx/rx": minor
+"@effect-rx/rx-react": minor
+"@effect-rx/rx-vue": minor
+---
+
+replace keepAlive with autoDispose
diff --git a/packages/rx/src/Registry.ts b/packages/rx/src/Registry.ts
index 942efcc..f239256 100644
--- a/packages/rx/src/Registry.ts
+++ b/packages/rx/src/Registry.ts
@@ -27,8 +27,6 @@ export interface Registry {
readonly refresh: Rx.Rx.RefreshRxSync
readonly set: Rx.Rx.Set
readonly subscribe: Rx.Rx.Subscribe
- readonly reset: () => void
- readonly dispose: () => void
}
/**
diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts
index 5d6af2e..40fd724 100644
--- a/packages/rx/src/Rx.ts
+++ b/packages/rx/src/Rx.ts
@@ -14,6 +14,7 @@ import * as Inspectable from "effect/Inspectable"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import { type Pipeable, pipeArguments } from "effect/Pipeable"
+import { hasProperty } from "effect/Predicate"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
@@ -40,11 +41,11 @@ export type TypeId = typeof TypeId
*/
export interface Rx extends Pipeable, Inspectable.Inspectable {
readonly [TypeId]: TypeId
- readonly keepAlive: boolean
+ readonly autoDispose: boolean
+ readonly idleTTL?: number
readonly read: Rx.Read
readonly refresh?: Rx.Refresh
readonly label?: readonly [name: string, stack: string]
- readonly idleTTL?: number
}
/**
@@ -241,7 +242,8 @@ const RxProto = {
toJSON(this: Rx) {
return {
_id: "Rx",
- keepAlive: this.keepAlive,
+ autoDispose: this.autoDispose,
+ idleTTL: this.idleTTL,
label: this.label
}
},
@@ -296,40 +298,44 @@ const RxRuntimeProto = {
Result.initial(true),
runtimeResult.value
)
- })
+ }).pipe(autoDispose())
return makeStreamPull(pullRx as any, options)
},
subscriptionRef(this: RxRuntime, arg: any) {
- return makeSubRef(readable((get) => {
- const previous = get.self>()
- const runtimeResult = get(this)
- if (runtimeResult._tag !== "Success") {
- return Result.replacePrevious(runtimeResult, previous)
- }
- return makeEffect(
- get,
- arg,
- Result.initial(true),
- runtimeResult.value
- )
- }))
+ return makeSubRef(autoDispose(
+ readable((get) => {
+ const previous = get.self>()
+ const runtimeResult = get(this)
+ if (runtimeResult._tag !== "Success") {
+ return Result.replacePrevious(runtimeResult, previous)
+ }
+ return makeEffect(
+ get,
+ arg,
+ Result.initial(true),
+ runtimeResult.value
+ )
+ })
+ ) as any)
},
subscribable(this: RxRuntime, arg: any) {
- return makeSubscribable(readable((get) => {
- const previous = get.self>()
- const runtimeResult = get(this)
- if (runtimeResult._tag !== "Success") {
- return Result.replacePrevious(runtimeResult, previous)
- }
- return makeEffect(
- get,
- arg,
- Result.initial(true),
- runtimeResult.value
- )
- }))
+ return makeSubscribable(autoDispose(
+ readable((get) => {
+ const previous = get.self>()
+ const runtimeResult = get(this)
+ if (runtimeResult._tag !== "Success") {
+ return Result.replacePrevious(runtimeResult, previous)
+ }
+ return makeEffect(
+ get,
+ arg,
+ Result.initial(true),
+ runtimeResult.value
+ )
+ })
+ ) as any)
}
}
@@ -353,7 +359,7 @@ export const readable = (
refresh?: Rx.Refresh
): Rx => {
const rx = Object.create(RxProto)
- rx.keepAlive = false
+ rx.autoDispose = false
rx.read = read
rx.refresh = refresh
return rx
@@ -369,7 +375,7 @@ export const writable = (
refresh?: Rx.Refresh
): Writable => {
const rx = Object.create(WritableProto)
- rx.keepAlive = false
+ rx.autoDispose = false
rx.read = read
rx.write = write
rx.refresh = refresh
@@ -411,6 +417,23 @@ export const make: {
return readable(readOrRx)
}
+/**
+ * @since 1.0.0
+ * @category combinators
+ */
+export const autoDispose: {
+ (idleTTL?: Duration.DurationInput | undefined): >(self: A) => A
+ >(self: A, idleTTL?: Duration.DurationInput | undefined): A
+} = dual(
+ (args) => hasProperty(args[0], TypeId),
+ >(self: A, idleTTL?: Duration.DurationInput | undefined): A =>
+ Object.assign(Object.create(Object.getPrototypeOf(self)), {
+ ...self,
+ autoDispose: true,
+ idleTTL: idleTTL !== undefined ? Duration.toMillis(idleTTL) : undefined
+ })
+)
+
// -----------------------------------------------------------------------------
// constructors - effect
// -----------------------------------------------------------------------------
@@ -584,13 +607,13 @@ export interface RxRuntime extends Rx, E
export const context: () => (
create: Layer.Layer | Rx.Read>
) => RxRuntime = () => {
- const memoMapRx = make(Layer.makeMemoMap)
+ const memoMapRx = make(Layer.makeMemoMap).pipe(autoDispose())
return (create: Layer.Layer | Rx.Read>): RxRuntime => {
const rx = Object.create(RxRuntimeProto)
- rx.keepAlive = false
+ rx.autoDispose = false
rx.refresh = undefined
- const layerRx = keepAlive(typeof create === "function" ? readable(create) : readable(() => create))
+ const layerRx = typeof create === "function" ? readable(create) : readable(() => create)
rx.layer = layerRx
rx.read = function read(get: Context) {
@@ -730,16 +753,18 @@ export const subscriptionRef: {
| Effect.Effect, any, never>
| Rx.Read, any, never>>
) =>
- makeSubRef(readable((get) => {
- let value: SubscriptionRef.SubscriptionRef | Effect.Effect, any, any>
- if (typeof ref === "function") {
- value = ref(get)
- } else {
- value = ref
- }
+ makeSubRef(
+ readable((get) => {
+ let value: SubscriptionRef.SubscriptionRef | Effect.Effect, any, any>
+ if (typeof ref === "function") {
+ value = ref(get)
+ } else {
+ value = ref
+ }
- return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
- }))
+ return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
+ }).pipe(autoDispose())
+ )
// -----------------------------------------------------------------------------
// constructors - subscribable
@@ -767,7 +792,7 @@ export const subscribable: {
| Effect.Effect, any, never>
| Rx.Read, any, never>>
) =>
- makeSubscribable(readable((get) => {
+ makeSubscribable(autoDispose(readable((get) => {
let value: Subscribable.Subscribable | Effect.Effect, any, any>
if (typeof ref === "function") {
value = ref(get)
@@ -776,7 +801,7 @@ export const subscribable: {
}
return Effect.isEffect(value) ? makeEffect(get, value, Result.initial(true)) : value
- }))
+ })))
// -----------------------------------------------------------------------------
// constructors - functions
@@ -797,7 +822,7 @@ export const fnSync: {
} = (f: Rx.ReadFn, options?: {
readonly initialValue?: A
}): Writable | A, RxResultFn.ArgToVoid> => {
- const argRx = state<[number, Arg]>([0, undefined as any])
+ const argRx = state<[number, Arg]>([0, undefined as any]).pipe(autoDispose())
const hasInitialValue = options?.initialValue !== undefined
return writable(function(get) {
const [counter, arg] = get(argRx)
@@ -860,7 +885,7 @@ function makeResultFn(
f: Rx.ReadFn | Stream.Stream>,
options?: { readonly initialValue?: A }
) {
- const argRx = state<[number, Arg]>([0, undefined as any])
+ const argRx = state<[number, Arg]>([0, undefined as any]).pipe(autoDispose())
const initialValue = options?.initialValue !== undefined
? Result.success(options.initialValue)
: Result.initial()
@@ -907,7 +932,7 @@ export const pull = (create: Rx.Read> | Stream.Stream<
makeRead(function(get) {
return makeStreamPullEffect(get, create, options)
})
- )
+ ).pipe(autoDispose())
return makeStreamPull(pullRx, options)
}
@@ -1056,7 +1081,7 @@ export const withFallback: {
}
return result
}
- return isWritable(self)
+ const rx = isWritable(self)
? writable(
withFallback,
self.write,
@@ -1070,18 +1095,9 @@ export const withFallback: {
refresh(self)
}
) as any
+ return self.autoDispose ? autoDispose(rx, self.idleTTL) : rx
})
-/**
- * @since 1.0.0
- * @category combinators
- */
-export const keepAlive = >(self: A): A =>
- Object.assign(Object.create(Object.getPrototypeOf(self)), {
- ...self,
- keepAlive: true
- })
-
/**
* @since 1.0.0
* @category combinators
@@ -1114,23 +1130,7 @@ export const withLabel: {
* @since 1.0.0
* @category combinators
*/
-export const setIdleTTL: {
- (duration: Duration.DurationInput): >(self: A) => A
- >(self: A, duration: Duration.DurationInput): A
-} = dual<
- (duration: Duration.DurationInput) => >(self: A) => A,
- >(self: A, duration: Duration.DurationInput) => A
->(2, (self, duration) =>
- Object.assign(Object.create(Object.getPrototypeOf(self)), {
- ...self,
- idleTTL: Duration.toMillis(duration)
- }))
-
-/**
- * @since 1.0.0
- * @category combinators
- */
-export const initialValue: {
+export const toInitialValue: {
(initialValue: A): (self: Rx) => readonly [Rx, A]
(self: Rx, initialValue: A): readonly [Rx, A]
} = dual<
@@ -1152,8 +1152,8 @@ export const transform: {
): [R] extends [Writable] ? Writable : Rx
} = dual(
2,
- ((self: Rx, f: (get: Context) => B): Rx =>
- isWritable(self)
+ ((self: Rx, f: (get: Context) => B): Rx => {
+ const rx = isWritable(self)
? writable(
f,
function(ctx, value) {
@@ -1168,7 +1168,9 @@ export const transform: {
self.refresh ?? function(refresh) {
refresh(self)
}
- )) as any
+ )
+ return self.autoDispose ? autoDispose(rx, self.idleTTL) : rx
+ }) as any
)
/**
diff --git a/packages/rx/src/internal/registry.ts b/packages/rx/src/internal/registry.ts
index a2160f8..067d650 100644
--- a/packages/rx/src/internal/registry.ts
+++ b/packages/rx/src/internal/registry.ts
@@ -33,7 +33,7 @@ class RegistryImpl implements Registry.Registry {
constructor(
initialValues?: Iterable, any]>,
readonly scheduleTask = queueMicrotask,
- readonly timeoutResolution = 5000
+ readonly timeoutResolution = 1000
) {
this[TypeId] = TypeId
if (initialValues !== undefined) {
@@ -43,7 +43,7 @@ class RegistryImpl implements Registry.Registry {
}
}
- private readonly nodes = new Map, Node>()
+ private readonly nodes = new WeakMap, Node>()
private readonly timeoutBuckets = new Map>, handle: number]>()
private readonly nodeTimeoutBucket = new Map, number>()
private disposed = false
@@ -89,7 +89,7 @@ class RegistryImpl implements Registry.Registry {
if (node === undefined) {
node = this.createNode(rx)
this.nodes.set(rx, node)
- } else if (!rx.keepAlive && rx.idleTTL) {
+ } else if (rx.autoDispose && rx.idleTTL) {
this.removeNodeTimeout(node)
}
return node
@@ -100,7 +100,7 @@ class RegistryImpl implements Registry.Registry {
throw new Error(`Cannot access Rx ${rx}: registry is disposed`)
}
- if (!rx.keepAlive) {
+ if (rx.autoDispose) {
this.scheduleRxRemoval(rx)
}
return new Node(this, rx)
@@ -186,32 +186,17 @@ class RegistryImpl implements Registry.Registry {
node.remove()
})
}
-
- reset(): void {
- this.timeoutBuckets.forEach(([, handle]) => clearTimeout(handle))
- this.timeoutBuckets.clear()
- this.nodeTimeoutBucket.clear()
-
- this.nodes.forEach((node) => node.remove())
- this.nodes.clear()
- }
-
- dispose(): void {
- this.disposed = true
- this.reset()
- }
}
const enum NodeFlags {
alive = 1 << 0,
initialized = 1 << 1,
- waitingForValue = 1 << 2
+ waitingForValue = 1 << 2,
+ notified = 1 << 3
}
const enum NodeState {
uninitialized = NodeFlags.alive | NodeFlags.waitingForValue,
- stale = NodeFlags.alive | NodeFlags.initialized | NodeFlags.waitingForValue,
- valid = NodeFlags.alive | NodeFlags.initialized,
removed = 0
}
@@ -233,10 +218,19 @@ class Node {
listeners: Array<() => void> = []
get canBeRemoved(): boolean {
- return !this.rx.keepAlive && this.listeners.length === 0 && this.children.length === 0 &&
+ return this.rx.autoDispose && this.listeners.length === 0 && this.children.length === 0 &&
this.state !== 0
}
+ get debugFlags() {
+ return {
+ alive: (this.state & NodeFlags.alive) !== 0,
+ initialized: (this.state & NodeFlags.initialized) !== 0,
+ waitingForValue: (this.state & NodeFlags.waitingForValue) !== 0,
+ notified: (this.state & NodeFlags.notified) !== 0
+ }
+ }
+
_value: A = undefined as any
value(): A {
if ((this.state & NodeFlags.waitingForValue) !== 0) {
@@ -270,7 +264,8 @@ class Node {
setValue(value: A): void {
if ((this.state & NodeFlags.initialized) === 0) {
- this.state = NodeState.valid
+ this.state |= NodeFlags.initialized
+ this.state &= ~NodeFlags.waitingForValue
this._value = value
if (batchState.phase !== BatchPhase.collect) {
@@ -280,10 +275,11 @@ class Node {
return
}
- this.state = NodeState.valid
+ this.state &= ~NodeFlags.waitingForValue
if (Equal.equals(this._value, value)) {
return
}
+ this.state &= ~NodeFlags.notified
this._value = value
this.invalidateChildren()
@@ -319,16 +315,24 @@ class Node {
}
invalidate(): void {
- if (this.state === NodeState.valid) {
- this.state = NodeState.stale
+ const hadFinalizers = this.lifetime !== undefined && this.lifetime.hasFinalizers
+ if ((this.state & NodeFlags.waitingForValue) === 0) {
+ this.state |= NodeFlags.waitingForValue
this.disposeLifetime()
}
if (batchState.phase === BatchPhase.collect) {
- batchState.stale.push(this)
+ batchState.stale.push([this, hadFinalizers])
this.invalidateChildren()
- } else {
+ if (!hadFinalizers) {
+ this.state &= ~NodeFlags.notified
+ }
+ } else if (hadFinalizers) {
this.value()
+ } else {
+ this.invalidateChildren()
+ this.state &= ~NodeFlags.notified
+ this.notify()
}
}
@@ -345,6 +349,10 @@ class Node {
}
notify(): void {
+ if ((this.state & NodeFlags.notified) !== 0) {
+ return
+ }
+ this.state |= NodeFlags.notified
for (let i = 0; i < this.listeners.length; i++) {
this.listeners[i]()
}
@@ -400,6 +408,7 @@ class Node {
interface Lifetime extends Rx.Context {
readonly node: Node
+ readonly hasFinalizers: boolean
finalizers: Array<() => void> | undefined
disposed: boolean
readonly dispose: () => void
@@ -408,6 +417,10 @@ interface Lifetime extends Rx.Context {
const disposedError = (rx: Rx.Rx): Error => new Error(`Cannot use context of disposed Rx: ${rx}`)
const LifetimeProto: Omit, "node" | "finalizers" | "disposed"> = {
+ get hasFinalizers() {
+ return (this as Lifetime).finalizers !== undefined && (this as Lifetime).finalizers!.length !== 0
+ },
+
addFinalizer(this: Lifetime, f: () => void): void {
if (this.disposed) {
throw disposedError(this.node.rx)
@@ -654,7 +667,7 @@ export const enum BatchPhase {
export const batchState = globalValue("@effect-rx/rx/Registry/batchState", () => ({
phase: BatchPhase.disabled,
depth: 0,
- stale: [] as Array>
+ stale: [] as Array<[Node, hadFinalizers: boolean]>
}))
/** @internal */
@@ -666,7 +679,8 @@ export function batch(f: () => void): void {
if (batchState.depth === 1) {
batchState.phase = BatchPhase.commit
for (let i = 0; i < batchState.stale.length; i++) {
- batchRebuildNode(batchState.stale[i])
+ const [node, hadFinalizers] = batchState.stale[i]
+ batchRebuildNode(node, hadFinalizers)
}
}
} finally {
@@ -678,20 +692,25 @@ export function batch(f: () => void): void {
}
}
-function batchRebuildNode(node: Node) {
- if (node.state === NodeState.valid) {
+function batchRebuildNode(node: Node, hadFinalizers: boolean) {
+ if ((node.state & NodeFlags.waitingForValue) === 0) {
return
}
for (let i = 0; i < node.parents.length; i++) {
const parent = node.parents[i]
- if (parent.state !== NodeState.valid) {
- batchRebuildNode(parent)
+ if ((parent.state & NodeFlags.waitingForValue) === 0) {
+ batchRebuildNode(parent, false)
}
}
// @ts-ignore
- if (node.state !== NodeState.valid) {
- node.value()
+ if ((node.state & NodeFlags.waitingForValue) !== 0) {
+ if (hadFinalizers) {
+ node.value()
+ } else {
+ node.invalidateChildren()
+ node.notify()
+ }
}
}
diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts
index a9d3ca5..a831980 100644
--- a/packages/rx/test/Rx.test.ts
+++ b/packages/rx/test/Rx.test.ts
@@ -26,8 +26,8 @@ describe("Rx", () => {
expect(r.get(counter)).toEqual(1)
})
- it("keepAlive false", async () => {
- const counter = Rx.make(0)
+ it("autoDispose true", async () => {
+ const counter = Rx.make(0).pipe(Rx.autoDispose())
const r = Registry.make()
r.set(counter, 1)
expect(r.get(counter)).toEqual(1)
@@ -35,10 +35,8 @@ describe("Rx", () => {
expect(r.get(counter)).toEqual(0)
})
- it("keepAlive true", async () => {
- const counter = Rx.make(0).pipe(
- Rx.keepAlive
- )
+ it("autoDispose false", async () => {
+ const counter = Rx.make(0)
const r = Registry.make()
r.set(counter, 1)
expect(r.get(counter)).toEqual(1)
@@ -47,7 +45,7 @@ describe("Rx", () => {
})
it("subscribe", async () => {
- const counter = Rx.make(0)
+ const counter = Rx.make(0).pipe(Rx.autoDispose())
const r = Registry.make()
let count = 0
const cancel = r.subscribe(counter, (_) => {
@@ -72,9 +70,11 @@ describe("Rx", () => {
})
it("runtime replacement", async () => {
- const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get))
+ const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)).pipe(
+ Rx.autoDispose
+ )
const r = Registry.make({
- initialValues: [Rx.initialValue(counterRuntime.layer, CounterTest)]
+ initialValues: [Rx.toInitialValue(counterRuntime.layer, CounterTest)]
})
const result = r.get(count)
assert(Result.isSuccess(result))
@@ -82,8 +82,10 @@ describe("Rx", () => {
})
it("runtime multiple", async () => {
- const buildCount = buildCounterRuntime.fn((_: void) => Effect.flatMap(BuildCounter, (_) => _.get))
- const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get))
+ const buildCount = buildCounterRuntime.fn((_: void) => Effect.flatMap(BuildCounter, (_) => _.get)).pipe(
+ Rx.autoDispose()
+ )
+ const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)).pipe(Rx.autoDispose())
const timesTwo = multiplierRuntime.rx((get) =>
Effect.gen(function*() {
const counter = yield* Counter
@@ -92,7 +94,7 @@ describe("Rx", () => {
expect(yield* get.result(count)).toEqual(2)
return yield* multiplier.times(2)
})
- )
+ ).pipe(Rx.autoDispose())
const r = Registry.make()
const cancel = r.mount(buildCount)
@@ -132,7 +134,7 @@ describe("Rx", () => {
const count = Rx.make(
Effect.succeed(1).pipe(Effect.delay(100)),
{ initialValue: 0 }
- ).pipe(Rx.keepAlive)
+ )
const r = Registry.make()
let result = r.get(count)
assert(Result.isSuccess(result))
@@ -232,7 +234,7 @@ describe("Rx", () => {
)
)
)
- ).pipe(Rx.keepAlive)
+ )
const r = Registry.make()
let result = r.get(count)
assert(Result.isInitial(result))
@@ -255,7 +257,7 @@ describe("Rx", () => {
Stream.range(0, 2).pipe(
Stream.tap(() => Effect.sleep(50))
)
- )
+ ).pipe(Rx.autoDispose())
const r = Registry.make()
const unmount = r.mount(count)
let result = r.get(count)
@@ -319,7 +321,7 @@ describe("Rx", () => {
Stream.range(start, start + 1).pipe(
Stream.tap(() => Effect.sleep(50))
)
- )
+ ).pipe(Rx.autoDispose())
const r = Registry.make()
const unmount = r.mount(count)
let result = r.get(count)
@@ -375,7 +377,7 @@ describe("Rx", () => {
Stream.range(0, 1, 1).pipe(
Stream.tap(() => Effect.sleep(50))
)
- ).pipe(Rx.refreshable)
+ ).pipe(Rx.refreshable, Rx.autoDispose())
const r = Registry.make()
const unmount = r.mount(count)
@@ -432,7 +434,7 @@ describe("Rx", () => {
Stream.unwrap,
Stream.tap(() => Effect.sleep(50))
)
- ).pipe(Rx.refreshable)
+ ).pipe(Rx.refreshable, Rx.autoDispose())
const r = Registry.make()
const unmount = r.mount(count)
@@ -485,7 +487,7 @@ describe("Rx", () => {
const count = Rx.pull(() =>
Stream.range(1, 2, 1).pipe(
Stream.tap(() => Effect.sleep(50))
- ), { initialValue: [0] }).pipe(Rx.refreshable)
+ ), { initialValue: [0] }).pipe(Rx.refreshable, Rx.autoDispose())
const r = Registry.make()
const unmount = r.mount(count)
@@ -515,7 +517,7 @@ describe("Rx", () => {
r.set(count(1), 2)
assert.strictEqual(r.get(count(1)), 2)
- const countKeep = Rx.family((n: number) => Rx.make(n).pipe(Rx.keepAlive))
+ const countKeep = Rx.family((n: number) => Rx.make(n))
assert.strictEqual(countKeep(1), countKeep(1))
r.get(countKeep(1))
const hashKeep = Hash.hash(countKeep(1))
@@ -537,8 +539,8 @@ describe("Rx", () => {
it("batching", async () => {
const r = Registry.make()
- const state = Rx.make(1).pipe(Rx.keepAlive)
- const state2 = Rx.make("a").pipe(Rx.keepAlive)
+ const state = Rx.make(1)
+ const state2 = Rx.make("a")
let count = 0
const derived = Rx.readable((get) => {
count++
@@ -550,14 +552,15 @@ describe("Rx", () => {
r.set(state, 2)
r.set(state2, "b")
})
- expect(count).toEqual(2)
+ expect(count).toEqual(1)
expect(r.get(derived)).toEqual("2b")
+ expect(count).toEqual(2)
})
it("nested batch", async () => {
const r = Registry.make()
- const state = Rx.make(1).pipe(Rx.keepAlive)
- const state2 = Rx.make("a").pipe(Rx.keepAlive)
+ const state = Rx.make(1)
+ const state2 = Rx.make("a")
let count = 0
const derived = Rx.readable((get) => {
count++
@@ -571,14 +574,15 @@ describe("Rx", () => {
r.set(state2, "b")
})
})
- expect(count).toEqual(2)
+ expect(count).toEqual(1)
expect(r.get(derived)).toEqual("2b")
+ expect(count).toEqual(2)
})
it("read correct updated state in batch", async () => {
const r = Registry.make()
- const state = Rx.make(1).pipe(Rx.keepAlive)
- const state2 = Rx.make("a").pipe(Rx.keepAlive)
+ const state = Rx.make(1)
+ const state2 = Rx.make("a")
let count = 0
const derived = Rx.readable((get) => {
count++
@@ -591,15 +595,15 @@ describe("Rx", () => {
expect(r.get(derived)).toEqual("2a")
r.set(state2, "b")
})
- expect(count).toEqual(3)
+ expect(count).toEqual(2)
expect(r.get(derived)).toEqual("2b")
expect(count).toEqual(3)
})
it("notifies listeners after batch commit", async () => {
const r = Registry.make()
- const state = Rx.make(1).pipe(Rx.keepAlive)
- const state2 = Rx.make("a").pipe(Rx.keepAlive)
+ const state = Rx.make(1).pipe(Rx.autoDispose())
+ const state2 = Rx.make("a").pipe(Rx.autoDispose())
let count = 0
const derived = Rx.readable((get) => {
return get(state) + get(state2)
@@ -615,13 +619,14 @@ describe("Rx", () => {
})
expect(count).toEqual(1)
expect(r.get(derived)).toEqual("2b")
+ expect(count).toEqual(2)
})
it("initialValues", async () => {
- const state = Rx.make(0)
+ const state = Rx.make(0).pipe(Rx.autoDispose())
const r = Registry.make({
initialValues: [
- Rx.initialValue(state, 10)
+ Rx.toInitialValue(state, 10)
]
})
expect(r.get(state)).toEqual(10)
@@ -631,13 +636,13 @@ describe("Rx", () => {
it("idleTTL", async () => {
const state = Rx.make(0).pipe(
- Rx.setIdleTTL(2000)
+ Rx.autoDispose(2000)
)
const state2 = Rx.make(0).pipe(
- Rx.setIdleTTL(10000)
+ Rx.autoDispose(10000)
)
const state3 = Rx.make(0).pipe(
- Rx.setIdleTTL(3000)
+ Rx.autoDispose(3000)
)
const r = Registry.make()
r.set(state, 10)
@@ -665,7 +670,7 @@ describe("Rx", () => {
})
it("fn", async () => {
- const count = Rx.fnSync((n: number) => n).pipe(Rx.keepAlive)
+ const count = Rx.fnSync((n: number) => n)
const r = Registry.make()
assert.deepEqual(r.get(count), Option.none())
@@ -688,8 +693,7 @@ describe("Rx", () => {
Effect.delay(100)
)
).pipe(
- Rx.withFallback(Rx.make(() => Effect.succeed(0))),
- Rx.keepAlive
+ Rx.withFallback(Rx.make(() => Effect.succeed(0)))
)
const r = Registry.make()
assert.deepEqual(r.get(count), Result.waiting(Result.success(0)))
@@ -699,7 +703,9 @@ describe("Rx", () => {
})
it("failure with previousValue", async () => {
- const count = Rx.fn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i))
+ const count = Rx.fn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i)).pipe(
+ Rx.autoDispose()
+ )
const r = Registry.make()
let result = r.get(count)
@@ -707,6 +713,7 @@ describe("Rx", () => {
r.set(count, 0)
result = r.get(count)
+ console.log(result)
assert(Result.isSuccess(result))
assert.strictEqual(result.value, 0)
@@ -746,8 +753,8 @@ describe("Rx", () => {
it("get.streamResult", async () => {
const count = Rx.make(0)
- const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2)))
- const plusOne = Rx.make((get) => get.streamResult(multiplied).pipe(Stream.map((_) => _ + 1)))
+ const multiplied = Rx.make((get) => get.stream(count).pipe(Stream.map((_) => _ * 2))).pipe(Rx.autoDispose())
+ const plusOne = Rx.make((get) => get.streamResult(multiplied).pipe(Stream.map((_) => _ + 1))).pipe(Rx.autoDispose())
const r = Registry.make()
const cancel = r.mount(plusOne)
@@ -788,7 +795,7 @@ describe("Rx", () => {
it("Subscribable/SubscriptionRef", async () => {
vitest.useRealTimers()
const ref = SubscriptionRef.make(123).pipe(Effect.runSync)
- const rx = Rx.subscribable(ref)
+ const rx = Rx.subscribable(ref).pipe(Rx.autoDispose())
const r = Registry.make()
assert.deepStrictEqual(r.get(rx), 123)
await Effect.runPromise(SubscriptionRef.update(ref, (a) => a + 1))
@@ -901,7 +908,7 @@ const MultiplierLive = Layer.effect(
Layer.provideMerge(CounterLive)
)
-const buildCounterRuntime = Rx.runtime(BuildCounterLive)
-const counterRuntime = Rx.runtime(CounterLive)
-const multiplierRuntime = Rx.runtime(MultiplierLive)
-const fiberRefRuntime = Rx.runtime(Layer.setRequestCaching(true))
+const buildCounterRuntime = Rx.runtime(BuildCounterLive).pipe(Rx.autoDispose())
+const counterRuntime = Rx.runtime(CounterLive).pipe(Rx.autoDispose())
+const multiplierRuntime = Rx.runtime(MultiplierLive).pipe(Rx.autoDispose())
+const fiberRefRuntime = Rx.runtime(Layer.setRequestCaching(true)).pipe(Rx.autoDispose())