diff --git a/.changeset/hot-jokes-push.md b/.changeset/hot-jokes-push.md new file mode 100644 index 0000000..e2914e1 --- /dev/null +++ b/.changeset/hot-jokes-push.md @@ -0,0 +1,6 @@ +--- +"@effect-rx/rx": minor +"@effect-rx/rx-react": minor +--- + +consolidate Rx constructors diff --git a/docs/rx-react/index.ts.md b/docs/rx-react/index.ts.md index 26bceef..54f67fb 100644 --- a/docs/rx-react/index.ts.md +++ b/docs/rx-react/index.ts.md @@ -95,7 +95,7 @@ Added in v1.0.0 **Signature** ```ts -export declare const useRxRefresh: (rx: any) => () => void +export declare const useRxRefresh: (rx: Rx.Rx & Rx.Refreshable) => () => void ``` Added in v1.0.0 @@ -144,7 +144,7 @@ Added in v1.0.0 export declare const useRxSuspense: ( rx: Rx.Rx>, options?: { readonly suspendOnWaiting?: boolean } -) => any +) => Result.Success | Result.Failure ``` Added in v1.0.0 @@ -181,7 +181,7 @@ Re-exports all named exports from the "@effect-rx/rx/Registry" module as `Regist **Signature** ```ts -export * as Registry from '@effect-rx/rx/Registry' +export * as Registry from "@effect-rx/rx/Registry" ``` Added in v1.0.0 @@ -193,7 +193,7 @@ Re-exports all named exports from the "@effect-rx/rx/Result" module as `Result`. **Signature** ```ts -export * as Result from '@effect-rx/rx/Result' +export * as Result from "@effect-rx/rx/Result" ``` Added in v1.0.0 @@ -205,7 +205,7 @@ Re-exports all named exports from the "@effect-rx/rx/Rx" module as `Rx`. **Signature** ```ts -export * as Rx from '@effect-rx/rx/Rx' +export * as Rx from "@effect-rx/rx/Rx" ``` Added in v1.0.0 @@ -217,7 +217,7 @@ Re-exports all named exports from the "@effect-rx/rx/RxRef" module as `RxRef`. **Signature** ```ts -export * as RxRef from '@effect-rx/rx/RxRef' +export * as RxRef from "@effect-rx/rx/RxRef" ``` Added in v1.0.0 diff --git a/docs/rx/Result.ts.md b/docs/rx/Result.ts.md index 19cdb64..130310c 100644 --- a/docs/rx/Result.ts.md +++ b/docs/rx/Result.ts.md @@ -237,7 +237,7 @@ Added in v1.0.0 ```ts export interface Failure extends Result.Proto { - readonly _tag: 'Failure' + readonly _tag: "Failure" readonly cause: Cause.Cause readonly previousValue: Option.Option } @@ -251,7 +251,7 @@ Added in v1.0.0 ```ts export interface Initial extends Result.Proto { - readonly _tag: 'Initial' + readonly _tag: "Initial" } ``` @@ -315,10 +315,10 @@ Added in v1.0.0 export type With, E, A> = R extends Initial ? Initial : R extends Success - ? Success - : R extends Failure - ? Failure - : never + ? Success + : R extends Failure + ? Failure + : never ``` Added in v1.0.0 @@ -329,7 +329,7 @@ Added in v1.0.0 ```ts export interface Success extends Result.Proto { - readonly _tag: 'Success' + readonly _tag: "Success" readonly value: A } ``` diff --git a/docs/rx/Rx.ts.md b/docs/rx/Rx.ts.md index fe30f69..44fb8e9 100644 --- a/docs/rx/Rx.ts.md +++ b/docs/rx/Rx.ts.md @@ -24,23 +24,18 @@ Added in v1.0.0 - [withFallback](#withfallback) - [withLabel](#withlabel) - [constructors](#constructors) - - [effect](#effect) - - [effectFn](#effectfn) - [family](#family) - [fn](#fn) + - [fnSync](#fnsync) + - [make](#make) + - [pull](#pull) - [readable](#readable) - - [runtime](#runtime) - - [scoped](#scoped) - - [scopedFn](#scopedfn) - - [state](#state) - - [stream](#stream) - - [streamFn](#streamfn) - - [streamPull](#streampull) - [writable](#writable) - [context](#context) - [Context (interface)](#context-interface) - [WriteContext (interface)](#writecontext-interface) - [models](#models) + - [PullResult (type alias)](#pullresult-type-alias) - [Refreshable (interface)](#refreshable-interface) - [Rx (interface)](#rx-interface) - [Rx (namespace)](#rx-namespace) @@ -59,7 +54,6 @@ Added in v1.0.0 - [Write (type alias)](#write-type-alias) - [RxResultFn (interface)](#rxresultfn-interface) - [RxRuntime (interface)](#rxruntime-interface) - - [StreamPullResult (type alias)](#streampullresult-type-alias) - [Writable (interface)](#writable-interface) - [refinements](#refinements) - [isWritable](#iswritable) @@ -177,14 +171,17 @@ Added in v1.0.0 ```ts export declare const withFallback: { - (fallback: Rx>): >>( + ( + fallback: Rx> + ): >>( self: R ) => [R] extends [Writable] ? Writable>, A2 | Result.Result.InferA>>, RW> : Rx>, A2 | Result.Result.InferA>>> - >, E2, A2>(self: R, fallback: Rx>): [R] extends [ - Writable - ] + >, E2, A2>( + self: R, + fallback: Rx> + ): [R] extends [Writable] ? Writable>, A2 | Result.Result.InferA>>, RW> : Rx>, A2 | Result.Result.InferA>>> } @@ -207,60 +204,41 @@ Added in v1.0.0 # constructors -## effect +## family **Signature** ```ts -export declare const effect: { - ( - create: Rx.Read>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined - ): Rx> - ( - create: Rx.Read>, - options: { readonly runtime: RxRuntime; readonly initialValue?: A | undefined } - ): Rx> -} +export declare const family: >(f: (arg: Arg) => T) => (arg: Arg) => T ``` Added in v1.0.0 -## effectFn +## fn **Signature** ```ts -export declare const effectFn: { +export declare const fn: { ( - fn: Rx.ReadFn>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined + fn: Rx.ReadFn>, + options?: { readonly initialValue?: A | undefined } | undefined ): RxResultFn - ( - fn: Rx.ReadFn>, - options: { readonly runtime: RxRuntime; readonly initialValue?: A | undefined } - ): RxResultFn + ( + fn: Rx.ReadFn>, + options?: { readonly initialValue?: A | undefined } | undefined + ): RxResultFn } ``` Added in v1.0.0 -## family - -**Signature** - -```ts -export declare const family: >(f: (arg: Arg) => T) => (arg: Arg) => T -``` - -Added in v1.0.0 - -## fn +## fnSync **Signature** ```ts -export declare const fn: { +export declare const fnSync: { (f: Rx.ReadFn): Writable, Arg> (f: Rx.ReadFn, options: { readonly initialValue: A }): Writable } @@ -268,156 +246,58 @@ export declare const fn: { Added in v1.0.0 -## readable +## make **Signature** ```ts -export declare const readable: (read: Rx.Read, refresh?: Rx.Refresh) => Rx -``` - -Added in v1.0.0 - -## runtime - -**Signature** - -```ts -export declare const runtime: { +export declare const make: { ( - create: (get: Context) => Layer.Layer, - options?: { - readonly autoDispose?: boolean - readonly idleTTL?: Duration.DurationInput - readonly runtime?: undefined - } - ): RxRuntime - ( - create: (get: Context) => Layer.Layer, - options?: - | { - readonly autoDispose?: boolean | undefined - readonly idleTTL?: Duration.DurationInput | undefined - readonly runtime: RxRuntime - } - | undefined - ): RxRuntime -} -``` - -Added in v1.0.0 - -## scoped - -**Signature** - -```ts -export declare const scoped: { + effect: Effect.Effect, + options?: { readonly initialValue?: A | undefined } | undefined + ): Rx> ( create: Rx.Read>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined + options?: { readonly initialValue?: A | undefined } | undefined + ): Rx> + ( + stream: Stream.Stream, + options?: { readonly initialValue?: A | undefined } | undefined ): Rx> - ( - create: Rx.Read>, - options: { readonly initialValue?: A | undefined; readonly runtime: RxRuntime } - ): Rx> -} -``` - -Added in v1.0.0 - -## scopedFn - -**Signature** - -```ts -export declare const scopedFn: { - ( - fn: Rx.ReadFn>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined - ): RxResultFn - ( - fn: Rx.ReadFn>, - options: { readonly initialValue?: A | undefined; readonly runtime: RxRuntime } - ): RxResultFn -} -``` - -Added in v1.0.0 - -## state - -**Signature** - -```ts -export declare const state: (initialValue: A) => Writable -``` - -Added in v1.0.0 - -## stream - -**Signature** - -```ts -export declare const stream: { ( create: Rx.Read>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined - ): Rx> - ( - create: Rx.Read>, - options: { readonly initialValue?: A | undefined; readonly runtime: RxRuntime } - ): Rx> + options?: { readonly initialValue?: A | undefined } | undefined + ): Rx> + (layer: Layer.Layer): RxRuntime + (create: Rx.Read>): RxRuntime + (create: Rx.Read): Rx + (initialValue: A): Writable } ``` Added in v1.0.0 -## streamFn +## pull **Signature** ```ts -export declare const streamFn: { - ( - fn: Rx.ReadFn>, - options?: { readonly initialValue?: A | undefined; readonly runtime?: undefined } | undefined - ): RxResultFn - ( - fn: Rx.ReadFn>, - options: { readonly runtime: RxRuntime; readonly initialValue?: A | undefined } - ): RxResultFn -} +export declare const pull: ( + create: Stream.Stream | Rx.Read>, + options?: + | { readonly disableAccumulation?: boolean | undefined; readonly initialValue?: readonly A[] | undefined } + | undefined +) => Writable, void> ``` Added in v1.0.0 -## streamPull +## readable **Signature** ```ts -export declare const streamPull: { - ( - create: Rx.Read>, - options?: - | { - readonly disableAccumulation?: boolean | undefined - readonly initialValue?: readonly A[] | undefined - readonly runtime?: undefined - } - | undefined - ): Writable, void> - ( - create: Rx.Read>, - options: { - readonly runtime: RxRuntime - readonly disableAccumulation?: boolean | undefined - readonly initialValue?: readonly A[] | undefined - } - ): Writable, void> -} +export declare const readable: (read: Rx.Read, refresh?: Rx.Refresh) => Rx ``` Added in v1.0.0 @@ -484,6 +364,22 @@ Added in v1.0.0 # models +## PullResult (type alias) + +**Signature** + +```ts +export type PullResult = Result.Result< + E | NoSuchElementException, + { + readonly done: boolean + readonly items: Array + } +> +``` + +Added in v1.0.0 + ## Refreshable (interface) **Signature** @@ -668,23 +564,59 @@ Added in v1.0.0 **Signature** ```ts -export interface RxRuntime extends Rx>> {} -``` - -Added in v1.0.0 - -## StreamPullResult (type alias) - -**Signature** +export interface RxRuntime extends Rx>> { + readonly rx: { + ( + effect: Effect.Effect, + options?: { + readonly initialValue?: A + } + ): Rx> + ( + create: Rx.Read>, + options?: { + readonly initialValue?: A + } + ): Rx> + ( + stream: Stream.Stream, + options?: { + readonly initialValue?: A + } + ): Rx> + ( + create: Rx.Read>, + options?: { + readonly initialValue?: A + } + ): Rx> + (layer: Layer.Layer): RxRuntime + (create: Rx.Read>): RxRuntime + } -```ts -export type StreamPullResult = Result.Result< - E | NoSuchElementException, - { - readonly done: boolean - readonly items: Array + readonly fn: { + ( + fn: Rx.ReadFn>, + options?: { + readonly initialValue?: A + } + ): RxResultFn + ( + fn: Rx.ReadFn>, + options?: { + readonly initialValue?: A + } + ): RxResultFn } -> + + readonly pull: ( + create: Rx.Read> | Stream.Stream, + options?: { + readonly disableAccumulation?: boolean + readonly initialValue?: ReadonlyArray + } + ) => Writable, void> +} ``` Added in v1.0.0 diff --git a/docs/rx/index.ts.md b/docs/rx/index.ts.md index 225c45f..4bf23e5 100644 --- a/docs/rx/index.ts.md +++ b/docs/rx/index.ts.md @@ -13,59 +13,59 @@ Added in v1.0.0

Table of contents

- [exports](#exports) - - [From "@effect-rx/rx/Registry"](#from-effect-rxrxregistry) - - [From "@effect-rx/rx/Result"](#from-effect-rxrxresult) - - [From "@effect-rx/rx/Rx"](#from-effect-rxrxrx) - - [From "@effect-rx/rx/RxRef"](#from-effect-rxrxrxref) + - [From "./Registry.js"](#from-registryjs) + - [From "./Result.js"](#from-resultjs) + - [From "./Rx.js"](#from-rxjs) + - [From "./RxRef.js"](#from-rxrefjs) --- # exports -## From "@effect-rx/rx/Registry" +## From "./Registry.js" -Re-exports all named exports from the "@effect-rx/rx/Registry" module as `Registry`. +Re-exports all named exports from the "./Registry.js" module as `Registry`. **Signature** ```ts -export * as Registry from '@effect-rx/rx/Registry' +export * as Registry from "./Registry.js" ``` Added in v1.0.0 -## From "@effect-rx/rx/Result" +## From "./Result.js" -Re-exports all named exports from the "@effect-rx/rx/Result" module as `Result`. +Re-exports all named exports from the "./Result.js" module as `Result`. **Signature** ```ts -export * as Result from '@effect-rx/rx/Result' +export * as Result from "./Result.js" ``` Added in v1.0.0 -## From "@effect-rx/rx/Rx" +## From "./Rx.js" -Re-exports all named exports from the "@effect-rx/rx/Rx" module as `Rx`. +Re-exports all named exports from the "./Rx.js" module as `Rx`. **Signature** ```ts -export * as Rx from '@effect-rx/rx/Rx' +export * as Rx from "./Rx.js" ``` Added in v1.0.0 -## From "@effect-rx/rx/RxRef" +## From "./RxRef.js" -Re-exports all named exports from the "@effect-rx/rx/RxRef" module as `RxRef`. +Re-exports all named exports from the "./RxRef.js" module as `RxRef`. **Signature** ```ts -export * as RxRef from '@effect-rx/rx/RxRef' +export * as RxRef from "./RxRef.js" ``` Added in v1.0.0 diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index d789953..f165925 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -12,11 +12,11 @@ import * as Inspectable from "effect/Inspectable" import * as Layer from "effect/Layer" import * as Option from "effect/Option" import { type Pipeable, pipeArguments } from "effect/Pipeable" -import type * as Runtime from "effect/Runtime" +import * as Runtime from "effect/Runtime" import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" import * as internalRegistry from "./internal/registry.js" -import { runCallbackSync, runCallbackSyncDefault } from "./internal/runtime.js" +import { runCallbackSync } from "./internal/runtime.js" import * as Result from "./Result.js" /** @@ -211,7 +211,6 @@ const RxProto = { pipe() { return pipeArguments(this, arguments) }, - toJSON(this: Rx) { return { _id: "Rx", @@ -224,6 +223,51 @@ const RxProto = { }, [Inspectable.NodeInspectSymbol](this: Rx) { return this.toJSON() + }, + + // runtime api + rx(this: RxRuntime, arg: any, options?: { readonly initialValue?: unknown }) { + const read = makeRead(arg, options) + return readable((get) => { + const previous = get.self>() + const runtimeResult = get(this) + if (runtimeResult._tag !== "Success") { + return Result.replacePrevious(runtimeResult, previous) + } + return read(runtimeResult.value)(get) + }) + }, + + fn(this: RxRuntime, arg: any, options?: { readonly initialValue?: unknown }) { + const [makeRead, write] = makeResultFn(arg, options) + return writable((get) => { + const previous = get.self>() + const runtimeResult = get(this) + if (runtimeResult._tag !== "Success") { + return Result.replacePrevious(runtimeResult, previous) + } + return makeRead(runtimeResult.value)(get) + }, write) + }, + + pull(this: RxRuntime, arg: any, options?: { + readonly disableAccumulation?: boolean + readonly initialValue?: ReadonlyArray + }) { + const pullRx = readable((get) => { + const previous = get.self>() + const runtimeResult = get(this) + if (runtimeResult._tag !== "Success") { + return Result.replacePrevious(runtimeResult, previous) + } + return makeEffect( + get, + makeStreamPullEffect(get, arg, options, runtimeResult.value), + Result.initial(true), + runtimeResult.value + ) + }) + return makeStreamPull(pullRx as any, options) } } as const @@ -274,27 +318,145 @@ function constSetSelf
(ctx: WriteContext, value: A) { ctx.setSelf(value) } +// ----------------------------------------------------------------------------- +// constructors +// ----------------------------------------------------------------------------- + /** * @since 1.0.0 * @category constructors */ -export const state = ( +export const make: { + (effect: Effect.Effect, options?: { + readonly initialValue?: A + }): Rx> + (create: Rx.Read>, options?: { + readonly initialValue?: A + }): Rx> + (stream: Stream.Stream, options?: { + readonly initialValue?: A + }): Rx> + (create: Rx.Read>, options?: { + readonly initialValue?: A + }): Rx> + (layer: Layer.Layer): RxRuntime + (create: Rx.Read>): RxRuntime + (create: Rx.Read): Rx + (initialValue: A): Writable +} = (arg: any, options?: { readonly initialValue?: unknown }) => { + const readOrRx = makeRead(arg, options)() + if (TypeId in readOrRx) { + return readOrRx as any + } + return readable(readOrRx) +} + +// ----------------------------------------------------------------------------- +// constructors - effect +// ----------------------------------------------------------------------------- + +const makeRead: { + (effect: Effect.Effect, options?: { + readonly initialValue?: A + }): (runtime?: Runtime.Runtime) => Rx.Read> + (create: Rx.Read>, options?: { + readonly initialValue?: A + }): (runtime?: Runtime.Runtime) => Rx.Read> + (stream: Stream.Stream, options?: { + readonly initialValue?: A + }): (runtime?: Runtime.Runtime) => Rx.Read> + (create: Rx.Read>, options?: { + readonly initialValue?: A + }): (runtime?: Runtime.Runtime) => Rx.Read> + ( + layer: Layer.Layer + ): (runtime?: Runtime.Runtime) => Rx.Read>> + ( + create: Rx.Read> + ): (runtime?: Runtime.Runtime) => Rx.Read>> + (create: Rx.Read): (runtime?: Runtime.Runtime) => Rx.Read + (initialValue: A): (runtime?: Runtime.Runtime) => Writable +} = ( + arg: + | Effect.Effect + | Rx.Read> + | Stream.Stream + | Rx.Read> + | Layer.Layer + | Rx.Read> + | Rx.Read + | A, + options?: { readonly initialValue?: unknown } +) => +(providedRuntime?: Runtime.Runtime) => { + if (typeof arg === "function") { + const create = arg as Rx.Read + return function(get: Context) { + const value = create(get) + if (Effect.EffectTypeId in value) { + return effect(get, value, options, providedRuntime) + } else if (Stream.StreamTypeId in value) { + return stream(get, value, options, providedRuntime) + } else if (Layer.LayerTypeId in value) { + return runtime(get, value, providedRuntime) + } + return value + } + } else if (typeof arg === "object" && arg !== null) { + if (Effect.EffectTypeId in arg) { + return function(get: Context) { + return effect(get, arg, options, providedRuntime) + } + } else if (Stream.StreamTypeId in arg) { + return function(get: Context) { + return stream(get, arg, options, providedRuntime) + } + } else if (Layer.LayerTypeId in arg) { + return function(get: Context) { + return runtime(get, arg, providedRuntime) + } + } + } + + return state(arg) as any +} + +const state = ( initialValue: A ): Writable => writable(function(_get) { return initialValue }, constSetSelf) +const effect = ( + get: Context, + effect: Effect.Effect, + options?: { readonly initialValue?: A }, + runtime?: Runtime.Runtime +): Result.Result => { + const initialValue = options?.initialValue !== undefined + ? Result.success(options.initialValue) + : Result.initial() + return makeEffect(get, effect, initialValue, runtime) +} + function makeEffect( ctx: Context, - create: Rx.Read>, + effect: Effect.Effect, initialValue: Result.Result, - runCallback = runCallbackSyncDefault + runtime = Runtime.defaultRuntime ): Result.Result { const previous = ctx.self>() - const cancel = runCallback( - create(ctx), + const scope = Effect.runSync(Scope.make()) + ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit))) + const scopedEffect = Effect.provideService( + effect, + Scope.Scope, + scope + ) + const cancel = runCallbackSync(runtime)( + scopedEffect, function(exit) { if (!Exit.isInterrupted(exit)) { ctx.setSelfSync(Result.fromExitWithPrevious(exit, previous)) @@ -311,284 +473,89 @@ function makeEffect( return Result.waiting(initialValue) } -function makeEffectRuntime( - ctx: Context, - create: Rx.Read>, - initialValue: Result.Result, - runtime: RxRuntime -): Result.Result { - const previous = ctx.self>() - const runtimeResult = ctx.get(runtime) - - if (runtimeResult._tag !== "Success") { - return Result.replacePrevious(runtimeResult, previous) - } - - return makeEffect(ctx, create as any, initialValue, runCallbackSync(runtimeResult.value)) -} - -function makeScoped( - ctx: Context, - create: Rx.Read>, - initialValue: Result.Result, - options?: { - readonly runtime?: RxRuntime - } -): Result.Result { - function createScoped(ctx: Context) { - const scope = Effect.runSync(Scope.make()) - ctx.addFinalizer(() => Effect.runFork(Scope.close(scope, Exit.unit))) - return Effect.provideService( - create(ctx), - Scope.Scope, - scope - ) - } - return options?.runtime - ? makeEffectRuntime(ctx, createScoped, initialValue, options.runtime) - : makeEffect(ctx, createScoped as any, initialValue) -} - -/** - * @since 1.0.0 - * @category constructors - */ -export const fn: { - ( - f: Rx.ReadFn - ): Writable, Arg> - ( - f: Rx.ReadFn, - options: { readonly initialValue: A } - ): Writable -} = (f: Rx.ReadFn, options?: { - readonly initialValue?: A -}): Writable | A, Arg> => { - const argRx = state<[number, Arg]>([0, undefined as any]) - const hasInitialValue = options?.initialValue !== undefined - return writable(function(get) { - const [counter, arg] = get(argRx) - if (counter === 0) { - return hasInitialValue ? options.initialValue : Option.none() - } - return hasInitialValue ? f(arg, get) : Option.some(f(arg, get)) - }, function(ctx, arg) { - ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) - }) -} - -function makeFn( - f: Rx.ReadFn, - transform: (get: Context, _: Option.Option>) => B -) { - const argRx = state<[number, Arg]>([0, undefined as any]) - return writable(function(get) { - const [counter, arg] = get(argRx) - if (counter === 0) { - return transform(get, Option.none()) - } - return transform( - get, - Option.some(function(ctx) { - return f(arg, ctx) - }) - ) - }, function(ctx, arg) { - ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) - }) -} +// ----------------------------------------------------------------------------- +// constructors - layer +// ----------------------------------------------------------------------------- /** * @since 1.0.0 - * @category constructors + * @category models */ -export const effect: { - (create: Rx.Read>, options?: { - readonly initialValue?: A - readonly runtime?: undefined - }): Rx> - ( - create: Rx.Read>, - options: { - readonly runtime: RxRuntime +export interface RxRuntime extends Rx>> { + readonly rx: { + (effect: Effect.Effect, options?: { readonly initialValue?: A - } - ): Rx> -} = ( - create: Rx.Read>, - options?: { - readonly runtime?: RxRuntime - readonly initialValue?: A - } -) => { - const initialValue = options?.initialValue !== undefined - ? Result.success(options.initialValue) - : Result.initial() - return readable>(function(get) { - return options?.runtime - ? makeEffectRuntime(get, create, initialValue, options.runtime) - : makeEffect(get, create as any, initialValue) - }) -} - -/** - * @since 1.0.0 - * @category constructors - */ -export const scoped: { - (create: Rx.Read>, options?: { - readonly initialValue?: A - readonly runtime?: undefined - }): Rx> - ( - create: Rx.Read>, - options: { + }): Rx> + (create: Rx.Read>, options?: { readonly initialValue?: A - readonly runtime: RxRuntime - } - ): Rx> -} = ( - create: Rx.Read>, - options?: { - readonly initialValue?: A - readonly runtime?: RxRuntime - } -) => { - const initialValue = options?.initialValue !== undefined - ? Result.success(options.initialValue) - : Result.initial() - return readable>(function(get) { - return makeScoped(get, create, initialValue, options) - }) -} - -/** - * @since 1.0.0 - * @category models - */ -export interface RxResultFn extends Writable, Arg> {} - -/** - * @since 1.0.0 - * @category constructors - */ -export const effectFn: { - (fn: Rx.ReadFn>, options?: { - readonly initialValue?: A - readonly runtime?: undefined - }): RxResultFn - ( - fn: Rx.ReadFn>, - options: { - readonly runtime: RxRuntime + }): Rx> + (stream: Stream.Stream, options?: { readonly initialValue?: A - } - ): RxResultFn -} = ( - f: Rx.ReadFn>, - options?: { - readonly runtime?: RxRuntime - readonly initialValue?: A + }): Rx> + (create: Rx.Read>, options?: { + readonly initialValue?: A + }): Rx> + (layer: Layer.Layer): RxRuntime + (create: Rx.Read>): RxRuntime } -) => { - const initialValue = options?.initialValue !== undefined ? Result.success(options.initialValue) : Result.initial() - return makeFn(f, function(get, create) { - if (create._tag === "None") { - return initialValue - } - return options?.runtime - ? makeEffectRuntime(get, create.value, initialValue, options.runtime) - : makeEffect(get, create.value as any, initialValue) - }) -} -/** - * @since 1.0.0 - * @category constructors - */ -export const scopedFn: { - (fn: Rx.ReadFn>, options?: { - readonly initialValue?: A - readonly runtime?: undefined - }): RxResultFn - ( - fn: Rx.ReadFn>, - options: { + readonly fn: { + (fn: Rx.ReadFn>, options?: { readonly initialValue?: A - readonly runtime: RxRuntime - } - ): RxResultFn -} = ( - f: Rx.ReadFn>, - options?: { - readonly initialValue?: A - readonly runtime?: RxRuntime + }): RxResultFn + (fn: Rx.ReadFn>, options?: { + readonly initialValue?: A + }): RxResultFn } -) => { - const initialValue = options?.initialValue !== undefined ? Result.success(options.initialValue) : Result.initial() - return makeFn(f, function(get, create) { - if (create._tag === "None") { - return initialValue - } - return makeScoped(get, create.value, initialValue, options) - }) + + readonly pull: (create: Rx.Read> | Stream.Stream, options?: { + readonly disableAccumulation?: boolean + readonly initialValue?: ReadonlyArray + }) => Writable, void> } -/** - * @since 1.0.0 - * @category models - */ -export interface RxRuntime extends Rx>> {} +const runtime = ( + get: Context, + layer: Layer.Layer, + runtime?: Runtime.Runtime +): Result.Result> => { + const buildEffect = runtime ? + Effect.flatMap( + Layer.build(layer), + (context) => Effect.provide(Effect.runtime(), context) + ) : + Layer.toRuntime(layer) + + return effect(get, buildEffect, undefined, runtime) +} -/** - * @since 1.0.0 - * @category constructors - */ -export const runtime: { - (create: (get: Context) => Layer.Layer, options?: { - readonly autoDispose?: boolean - readonly idleTTL?: Duration.DurationInput - readonly runtime?: undefined - }): RxRuntime - (create: (get: Context) => Layer.Layer, options?: { - readonly autoDispose?: boolean - readonly idleTTL?: Duration.DurationInput - readonly runtime: RxRuntime - }): RxRuntime -} = (create: (get: Context) => Layer.Layer, options?: { - readonly autoDispose?: boolean - readonly idleTTL?: Duration.DurationInput - readonly runtime?: RxRuntime -}): RxRuntime => { - let rx = options?.runtime - ? scoped((get) => - Effect.flatMap( - Layer.build(create(get)), - (context) => Effect.provide(Effect.runtime(), context) - ), { runtime: options.runtime }) - : scoped((get) => Layer.toRuntime(create(get)) as Effect.Effect>) - - if (options?.idleTTL !== undefined) { - rx = setIdleTTL(rx, options.idleTTL) - } - if (options?.autoDispose !== true) { - rx = keepAlive(rx) - } +// ----------------------------------------------------------------------------- +// constructors - stream +// ----------------------------------------------------------------------------- - return rx +const stream = ( + get: Context, + stream: Stream.Stream, + options?: { readonly initialValue?: A }, + runtime?: Runtime.Runtime +): Result.Result => { + const initialValue = options?.initialValue !== undefined + ? Result.success(options.initialValue) + : Result.initial() + return makeStream(get, stream, initialValue, runtime) } function makeStream( ctx: Context, - create: Rx.Read>, + stream: Stream.Stream, initialValue: Result.Result, - runCallback = runCallbackSyncDefault + runtime = Runtime.defaultRuntime ): Result.Result { const previous = ctx.self>() - const cancel = runCallback( + const cancel = runCallbackSync(runtime)( Stream.runForEach( - create(ctx), + stream, (a) => Effect.sync(() => ctx.setSelfSync(Result.waiting(Result.success(a)))) ), (exit) => { @@ -618,55 +585,41 @@ function makeStream( return Result.waiting(initialValue) } -function makeStreamRuntime( - ctx: Context, - create: Rx.Read>, - initialValue: Result.Result, - runtime: RxRuntime -): Result.Result { - const previous = ctx.self>() - const runtimeResult = ctx.get(runtime) - - if (runtimeResult._tag !== "Success") { - return Result.replacePrevious(runtimeResult, previous) - } +// ----------------------------------------------------------------------------- +// constructors - functions +// ----------------------------------------------------------------------------- - return makeStream(ctx, create as any, initialValue, runCallbackSync(runtimeResult.value)) -} +/** + * @since 1.0.0 + * @category models + */ +export interface RxResultFn extends Writable, Arg> {} /** * @since 1.0.0 * @category constructors */ -export const stream: { - ( - create: Rx.Read>, - options?: { - readonly initialValue?: A - readonly runtime?: undefined - } - ): Rx> - ( - create: Rx.Read>, - options: { - readonly initialValue?: A - readonly runtime: RxRuntime +export const fnSync: { + ( + f: Rx.ReadFn + ): Writable, Arg> + ( + f: Rx.ReadFn, + options: { readonly initialValue: A } + ): Writable +} = (f: Rx.ReadFn, options?: { + readonly initialValue?: A +}): Writable | A, Arg> => { + const argRx = state<[number, Arg]>([0, undefined as any]) + const hasInitialValue = options?.initialValue !== undefined + return writable(function(get) { + const [counter, arg] = get(argRx) + if (counter === 0) { + return hasInitialValue ? options.initialValue : Option.none() } - ): Rx> -} = ( - create: Rx.Read>, - options?: { - readonly initialValue?: A - readonly runtime?: RxRuntime - } -) => { - const initialValue = options?.initialValue !== undefined - ? Result.success(options.initialValue) - : Result.initial() - return readable>(function(get) { - return options?.runtime - ? makeStreamRuntime(get, create, initialValue, options.runtime) - : makeStream(get, create as any, initialValue) + return hasInitialValue ? f(arg, get) : Option.some(f(arg, get)) + }, function(ctx, arg) { + ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) }) } @@ -674,43 +627,53 @@ export const stream: { * @since 1.0.0 * @category constructors */ -export const streamFn: { +export const fn: { + (fn: Rx.ReadFn>, options?: { + readonly initialValue?: A + }): RxResultFn (fn: Rx.ReadFn>, options?: { readonly initialValue?: A - readonly runtime?: undefined }): RxResultFn - ( - fn: Rx.ReadFn>, - options: { - readonly runtime: RxRuntime - readonly initialValue?: A - } - ): RxResultFn -} = ( - f: Rx.ReadFn>, - options?: { - readonly runtime?: RxRuntime - readonly initialValue?: A - } -) => { +} = (f: Rx.ReadFn | Effect.Effect>, options?: { + readonly initialValue?: A +}): RxResultFn => { + const [makeRead, write] = makeResultFn(f, options) + return writable(makeRead(), write) +} + +function makeResultFn( + f: Rx.ReadFn | Stream.Stream>, + options?: { readonly initialValue?: A } +) { + const argRx = state<[number, Arg]>([0, undefined as any]) const initialValue = options?.initialValue !== undefined ? Result.success(options.initialValue) : Result.initial() - return makeFn(f, function(get, create) { - if (create._tag === "None") { - return initialValue + + function makeRead(runtime?: Runtime.Runtime): Rx.Read> { + return function(get) { + const [counter, arg] = get(argRx) + if (counter === 0) { + return initialValue + } + const value = f(arg, get) + if (Effect.EffectTypeId in value) { + return makeEffect(get, value, initialValue, runtime) + } + return makeStream(get, value, initialValue, runtime) } - return options?.runtime - ? makeStreamRuntime(get, create.value, initialValue, options.runtime) - : makeStream(get, create.value as any, initialValue) - }) + } + function write(ctx: WriteContext>, arg: Arg) { + ctx.set(argRx, [ctx.get(argRx)[0] + 1, arg]) + } + return [makeRead, write] as const } /** * @since 1.0.0 * @category models */ -export type StreamPullResult = Result.Result = Result.Result }> @@ -719,49 +682,58 @@ export type StreamPullResult = Result.Result(create: Rx.Read>, options?: { - readonly disableAccumulation?: boolean - readonly initialValue?: ReadonlyArray - readonly runtime?: undefined - }): Writable, void> - ( - create: Rx.Read>, - options: { - readonly runtime: RxRuntime - readonly disableAccumulation?: boolean - readonly initialValue?: ReadonlyArray - } - ): Writable, void> -} = ( - create: Rx.Read>, - options?: { - readonly runtime?: RxRuntime - readonly disableAccumulation?: boolean - readonly initialValue?: ReadonlyArray - } +export const pull = (create: Rx.Read> | Stream.Stream, options?: { + readonly disableAccumulation?: boolean + readonly initialValue?: ReadonlyArray +}): Writable, void> => { + const pullRx = readable( + makeRead(function(get) { + return makeStreamPullEffect(get, create, options) + })() + ) + return makeStreamPull(pullRx, options) +} + +const makeStreamPullEffect = ( + get: Context, + create: Rx.Read> | Stream.Stream, + options?: { readonly disableAccumulation?: boolean }, + runtime?: Runtime.Runtime ) => { - const initialValue: Result.Result [pull, runtime] as const + ) +} + +const makeStreamPull = ( + pullRx: Rx< + Result.Result< + never, + readonly [Effect.Effect, Chunk.Chunk>, Runtime.Runtime | undefined] + > + >, + options?: { readonly initialValue?: ReadonlyArray } +) => { + const initialValue: Result.Result }> = options?.initialValue !== undefined ? Result.success({ done: false, items: options.initialValue as Array }) : Result.initial() - const pullRx = scoped(function(get) { - const stream = create(get) - return Stream.toPull( - options?.disableAccumulation ? stream : Stream.accumulateChunks(stream) - ) - }, options as { readonly runtime: RxRuntime }) - return writable, void>(function(get) { - const previous = get.self>() + return writable(function(get: Context): PullResult { + const previous = get.self>() const pullResult = get(pullRx) if (pullResult._tag !== "Success") { return Result.replacePrevious(pullResult, previous) } + const [pullEffect, runtime] = pullResult.value const pull = pipe( - pullResult.value, + pullEffect, Effect.map((_) => ({ done: false, items: Chunk.toReadonlyArray(_) as Array @@ -773,7 +745,7 @@ export const streamPull: { Option.match(error, { onNone: () => pipe( - get.self>(), + get.self>(), Option.flatMap(Result.value), Option.match({ onNone: () => Effect.fail(new NoSuchElementException()), @@ -788,9 +760,7 @@ export const streamPull: { }) ) ) - return options?.runtime - ? makeEffectRuntime(get, (_) => pull, initialValue, options.runtime) - : makeEffect(get, (_) => pull as any, initialValue) + return makeEffect(get, pull, initialValue, runtime) }, function(ctx, _) { ctx.refreshSelf() }, function(refresh) { diff --git a/packages/rx/src/internal/runtime.ts b/packages/rx/src/internal/runtime.ts index 8783bd0..c359247 100644 --- a/packages/rx/src/internal/runtime.ts +++ b/packages/rx/src/internal/runtime.ts @@ -52,6 +52,3 @@ export const runCallbackSync = Effect.runFork(fiberRuntime.interruptAsFork(fiberRuntime.id())) } } - -/** @internal */ -export const runCallbackSyncDefault = runCallbackSync(Runtime.defaultRuntime) diff --git a/packages/rx/test/Rx.test.ts b/packages/rx/test/Rx.test.ts index b89699a..e76dd46 100644 --- a/packages/rx/test/Rx.test.ts +++ b/packages/rx/test/Rx.test.ts @@ -19,7 +19,7 @@ describe("Rx", () => { }) it("get/set", () => { - const counter = Rx.state(0) + const counter = Rx.make(0) const r = Registry.make() expect(r.get(counter)).toEqual(0) r.set(counter, 1) @@ -27,7 +27,7 @@ describe("Rx", () => { }) it("keepAlive false", async () => { - const counter = Rx.state(0) + const counter = Rx.make(0) const r = Registry.make() r.set(counter, 1) expect(r.get(counter)).toEqual(1) @@ -36,7 +36,7 @@ describe("Rx", () => { }) it("keepAlive true", async () => { - const counter = Rx.state(0).pipe( + const counter = Rx.make(0).pipe( Rx.keepAlive ) const r = Registry.make() @@ -47,7 +47,7 @@ describe("Rx", () => { }) it("subscribe", async () => { - const counter = Rx.state(0) + const counter = Rx.make(0) const r = Registry.make() let count = 0 const cancel = r.subscribe(counter, (_) => { @@ -64,10 +64,7 @@ describe("Rx", () => { }) it("runtime", async () => { - const count = Rx.effect( - () => Effect.flatMap(Counter, (_) => _.get), - { runtime: counterRuntime } - ) + const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)) const r = Registry.make() const result = r.get(count) assert(Result.isSuccess(result)) @@ -75,20 +72,15 @@ describe("Rx", () => { }) it("runtime multiple", async () => { - const count = Rx.effect( - () => Effect.flatMap(Counter, (_) => _.get), - { runtime: counterRuntime } - ) - const timesTwo = Rx.effect( - (get) => - Effect.gen(function*(_) { - const counter = yield* _(Counter) - const multiplier = yield* _(Multiplier) - yield* _(counter.inc) - expect(yield* _(get.result(count))).toEqual(2) - return yield* _(multiplier.times(2)) - }), - { runtime: multiplierRuntime } + const count = counterRuntime.rx(Effect.flatMap(Counter, (_) => _.get)) + const timesTwo = multiplierRuntime.rx((get) => + Effect.gen(function*(_) { + const counter = yield* _(Counter) + const multiplier = yield* _(Multiplier) + yield* _(counter.inc) + expect(yield* _(get.result(count))).toEqual(2) + return yield* _(multiplier.times(2)) + }) ) const r = Registry.make() let result = r.get(timesTwo) @@ -107,10 +99,7 @@ describe("Rx", () => { }) it("runtime fiber ref", async () => { - const caching = Rx.effect( - () => FiberRef.get(FiberRef.currentRequestCacheEnabled), - { runtime: fiberRefRuntime } - ) + const caching = fiberRefRuntime.rx(FiberRef.get(FiberRef.currentRequestCacheEnabled)) const r = Registry.make() const result = r.get(caching) assert(Result.isSuccess(result)) @@ -118,10 +107,10 @@ describe("Rx", () => { }) it("effect initial", async () => { - const count = Rx.effect(() => - Effect.succeed(1).pipe( - Effect.delay(100) - ), { initialValue: 0 }).pipe(Rx.keepAlive) + const count = Rx.make( + Effect.succeed(1).pipe(Effect.delay(100)), + { initialValue: 0 } + ).pipe(Rx.keepAlive) const r = Registry.make() let result = r.get(count) assert(Result.isSuccess(result)) @@ -134,7 +123,7 @@ describe("Rx", () => { }) it("effectFn", async () => { - const count = Rx.effectFn((n: number) => Effect.succeed(n + 1)) + const count = Rx.fn((n: number) => Effect.succeed(n + 1)) const r = Registry.make() let result = r.get(count) assert(Result.isInitial(result)) @@ -145,7 +134,7 @@ describe("Rx", () => { }) it("effectFn initial", async () => { - const count = Rx.effectFn((n: number) => Effect.succeed(n + 1), { + const count = Rx.fn((n: number) => Effect.succeed(n + 1), { initialValue: 0 }) const r = Registry.make() @@ -159,7 +148,7 @@ describe("Rx", () => { }) it("effect mapResult", async () => { - const count = Rx.effectFn((n: number) => Effect.succeed(n + 1)).pipe( + const count = Rx.fn((n: number) => Effect.succeed(n + 1)).pipe( Rx.mapResult((_) => _ + 1) ) const r = Registry.make() @@ -172,8 +161,8 @@ describe("Rx", () => { }) it("effect double mapResult", async () => { - const seed = Rx.state(0) - const count = Rx.effect((get) => Effect.succeed(get(seed) + 1)).pipe( + const seed = Rx.make(0) + const count = Rx.make((get) => Effect.succeed(get(seed) + 1)).pipe( Rx.mapResult((_) => _ + 10), Rx.mapResult((_) => _ + 100) ) @@ -189,7 +178,7 @@ describe("Rx", () => { it("effect double mapResult refresh", async () => { let rebuilds = 0 - const count = Rx.effect(() => { + const count = Rx.make(() => { rebuilds++ return Effect.succeed(1) }).pipe( @@ -211,7 +200,7 @@ describe("Rx", () => { it("scopedFn", async () => { let finalized = 0 - const count = Rx.scopedFn((n: number) => + const count = Rx.fn((n: number) => Effect.succeed(n + 1).pipe( Effect.zipLeft( Effect.addFinalizer(() => @@ -240,7 +229,7 @@ describe("Rx", () => { }) it("stream", async () => { - const count = Rx.stream(() => + const count = Rx.make( Stream.range(0, 2).pipe( Stream.tap(() => Effect.sleep(50)) ) @@ -277,10 +266,12 @@ describe("Rx", () => { }) it("stream initial", async () => { - const count = Rx.stream(() => + const count = Rx.make( Stream.range(1, 2).pipe( Stream.tap(() => Effect.sleep(50)) - ), { initialValue: 0 }) + ), + { initialValue: 0 } + ) const r = Registry.make() const unmount = r.mount(count) let result = r.get(count) @@ -302,7 +293,7 @@ describe("Rx", () => { }) it("streamFn", async () => { - const count = Rx.streamFn((start: number) => + const count = Rx.fn((start: number) => Stream.range(start, start + 1).pipe( Stream.tap(() => Effect.sleep(50)) ) @@ -357,8 +348,8 @@ describe("Rx", () => { assert(Result.isInitial(result)) }) - it("streamPull", async () => { - const count = Rx.streamPull(() => + it("pull", async () => { + const count = Rx.pull( Stream.range(0, 1, 1).pipe( Stream.tap(() => Effect.sleep(50)) ) @@ -411,8 +402,65 @@ describe("Rx", () => { assert(Option.isNone(Result.value(result))) }) - it("streamPull initial", async () => { - const count = Rx.streamPull(() => + it("pull runtime", async () => { + const count = counterRuntime.pull( + Counter.pipe( + Effect.flatMap((_) => _.get), + Effect.map((_) => Stream.range(_, 2, 1)), + Stream.unwrap, + Stream.tap(() => Effect.sleep(50)) + ) + ).pipe(Rx.refreshable) + const r = Registry.make() + const unmount = r.mount(count) + + let result = r.get(count) + assert(result.waiting) + assert(Option.isNone(Result.value(result))) + + await vitest.advanceTimersByTimeAsync(50) + result = r.get(count) + assert(!result.waiting) + assert(Result.isSuccess(result)) + assert.deepEqual(result.value, { done: false, items: [1] }) + + r.set(count, void 0) + result = r.get(count) + assert(result.waiting) + assert.deepEqual(Result.value(result), Option.some({ done: false, items: [1] })) + + await vitest.advanceTimersByTimeAsync(50) + result = r.get(count) + assert(!result.waiting) + assert(Result.isSuccess(result)) + assert.deepEqual(result.value, { done: false, items: [1, 2] }) + + r.set(count, void 0) + result = r.get(count) + assert(!result.waiting) + assert(Result.isSuccess(result)) + assert.deepEqual(result.value, { done: true, items: [1, 2] }) + + r.refresh(count) + result = r.get(count) + assert(result.waiting) + assert.deepEqual(Result.value(result), Option.some({ done: true, items: [1, 2] })) + + await vitest.advanceTimersByTimeAsync(50) + result = r.get(count) + assert(!result.waiting) + assert(Result.isSuccess(result)) + assert.deepEqual(result.value, { done: false, items: [1] }) + + unmount() + await new Promise((resolve) => resolve(null)) + result = r.get(count) + assert(result.waiting) + assert(Option.isNone(Result.value(result))) + }) + + it("pull initial", async () => { + const count = Rx.pull(() => Stream.range(1, 2, 1).pipe( Stream.tap(() => Effect.sleep(50)) ), { initialValue: [0] }).pipe(Rx.refreshable) @@ -439,13 +487,13 @@ describe("Rx", () => { it("family", async () => { const r = Registry.make() - const count = Rx.family((n: number) => Rx.state(n)) + const count = Rx.family((n: number) => Rx.make(n)) const hash = Hash.hash(count(1)) assert.strictEqual(count(1), count(1)) r.set(count(1), 2) assert.strictEqual(r.get(count(1)), 2) - const countKeep = Rx.family((n: number) => Rx.state(n).pipe(Rx.keepAlive)) + const countKeep = Rx.family((n: number) => Rx.make(n).pipe(Rx.keepAlive)) assert.strictEqual(countKeep(1), countKeep(1)) r.get(countKeep(1)) const hashKeep = Hash.hash(countKeep(1)) @@ -461,14 +509,14 @@ describe("Rx", () => { it("label", async () => { expect( - Rx.state(0).pipe(Rx.withLabel("counter")).label![1] + Rx.make(0).pipe(Rx.withLabel("counter")).label![1] ).toMatch(/Rx.test.ts:\d+:\d+/) }) it("batching", async () => { const r = Registry.make() - const state = Rx.state(1).pipe(Rx.keepAlive) - const state2 = Rx.state("a").pipe(Rx.keepAlive) + const state = Rx.make(1).pipe(Rx.keepAlive) + const state2 = Rx.make("a").pipe(Rx.keepAlive) let count = 0 const derived = Rx.readable((get) => { count++ @@ -486,8 +534,8 @@ describe("Rx", () => { it("nested batch", async () => { const r = Registry.make() - const state = Rx.state(1).pipe(Rx.keepAlive) - const state2 = Rx.state("a").pipe(Rx.keepAlive) + const state = Rx.make(1).pipe(Rx.keepAlive) + const state2 = Rx.make("a").pipe(Rx.keepAlive) let count = 0 const derived = Rx.readable((get) => { count++ @@ -507,8 +555,8 @@ describe("Rx", () => { it("read correct updated state in batch", async () => { const r = Registry.make() - const state = Rx.state(1).pipe(Rx.keepAlive) - const state2 = Rx.state("a").pipe(Rx.keepAlive) + const state = Rx.make(1).pipe(Rx.keepAlive) + const state2 = Rx.make("a").pipe(Rx.keepAlive) let count = 0 const derived = Rx.readable((get) => { count++ @@ -528,8 +576,8 @@ describe("Rx", () => { it("notifies listeners after batch commit", async () => { const r = Registry.make() - const state = Rx.state(1).pipe(Rx.keepAlive) - const state2 = Rx.state("a").pipe(Rx.keepAlive) + const state = Rx.make(1).pipe(Rx.keepAlive) + const state2 = Rx.make("a").pipe(Rx.keepAlive) let count = 0 const derived = Rx.readable((get) => { return get(state) + get(state2) @@ -548,7 +596,7 @@ describe("Rx", () => { }) it("initialValues", async () => { - const state = Rx.state(0) + const state = Rx.make(0) const r = Registry.make({ initialValues: [ Rx.initialValue(state, 10) @@ -560,13 +608,13 @@ describe("Rx", () => { }) it("idleTTL", async () => { - const state = Rx.state(0).pipe( + const state = Rx.make(0).pipe( Rx.setIdleTTL(2000) ) - const state2 = Rx.state(0).pipe( + const state2 = Rx.make(0).pipe( Rx.setIdleTTL(10000) ) - const state3 = Rx.state(0).pipe( + const state3 = Rx.make(0).pipe( Rx.setIdleTTL(3000) ) const r = Registry.make() @@ -595,7 +643,7 @@ describe("Rx", () => { }) it("fn", async () => { - const count = Rx.fn((n: number) => n).pipe(Rx.keepAlive) + const count = Rx.fnSync((n: number) => n).pipe(Rx.keepAlive) const r = Registry.make() assert.deepEqual(r.get(count), Option.none()) @@ -604,7 +652,7 @@ describe("Rx", () => { }) it("fn initial", async () => { - const count = Rx.fn((n: number) => n, { initialValue: 0 }) + const count = Rx.fnSync((n: number) => n, { initialValue: 0 }) const r = Registry.make() assert.deepEqual(r.get(count), 0) @@ -613,12 +661,12 @@ describe("Rx", () => { }) it("withFallback", async () => { - const count = Rx.effect(() => + const count = Rx.make(() => Effect.succeed(1).pipe( Effect.delay(100) ) ).pipe( - Rx.withFallback(Rx.effect(() => Effect.succeed(0))), + Rx.withFallback(Rx.make(() => Effect.succeed(0))), Rx.keepAlive ) const r = Registry.make() @@ -629,7 +677,7 @@ describe("Rx", () => { }) it("failure with previousValue", async () => { - const count = Rx.effectFn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i)) + const count = Rx.fn((i: number) => i === 1 ? Effect.fail("fail") : Effect.succeed(i)) const r = Registry.make() let result = r.get(count) @@ -681,14 +729,6 @@ const MultiplierLive = Layer.effect( }) ) -const counterRuntime = Rx.runtime(() => CounterLive, { - autoDispose: true -}) -const multiplierRuntime = Rx.runtime(() => MultiplierLive, { - runtime: counterRuntime, - autoDispose: true -}) -const fiberRefRuntime = Rx.runtime(() => Layer.setRequestCaching(true), { - runtime: counterRuntime, - autoDispose: true -}) +const counterRuntime = Rx.make(CounterLive) +const multiplierRuntime = counterRuntime.rx(MultiplierLive) +const fiberRefRuntime = counterRuntime.rx(Layer.setRequestCaching(true))