diff --git a/packages/rx/src/Result.ts b/packages/rx/src/Result.ts index 70b6861..0d3e1de 100644 --- a/packages/rx/src/Result.ts +++ b/packages/rx/src/Result.ts @@ -4,7 +4,7 @@ import * as Data from "@effect/data/Data" import { identity } from "@effect/data/Function" import * as Option from "@effect/data/Option" -import type * as Cause from "@effect/io/Cause" +import * as Cause from "@effect/io/Cause" import type * as Exit from "@effect/io/Exit" /** @@ -183,6 +183,12 @@ export const failure = (cause: Cause.Cause): Failure => { return result } +/** + * @since 1.0.0 + * @category constructors + */ +export const fail = (error: E): Failure => failure(Cause.fail(error)) + /** * @since 1.0.0 * @category accessors diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index a16b517..9c6d5e9 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -4,9 +4,10 @@ import * as Result from "@effect-rx/rx/Result" import * as EffectContext from "@effect/data/Context" import * as Equal from "@effect/data/Equal" +import { pipe } from "@effect/data/Function" import * as Hash from "@effect/data/Hash" import * as Inspectable from "@effect/data/Inspectable" -import type * as Option from "@effect/data/Option" +import * as Option from "@effect/data/Option" import { type Pipeable, pipeArguments } from "@effect/data/Pipeable" import { NoSuchElementException } from "@effect/io/Cause" import * as Effect from "@effect/io/Effect" @@ -262,12 +263,6 @@ export const state = ( } ) -/** - * @since 1.0.0 - * @category models - */ -export interface RxResult extends Rx> {} - function makeEffect( ctx: Context>, effect: Effect.Effect, @@ -316,11 +311,11 @@ function makeEffectRuntime( * @category constructors */ export const effect: { - (effect: Effect.Effect): RxResult + (effect: Effect.Effect): Rx> ( effect: Effect.Effect, runtime: RxRuntime - ): RxResult + ): Rx> } = ( effect: Effect.Effect, runtime?: RxRuntime @@ -337,11 +332,11 @@ export const effect: { * @category constructors */ export const scoped: { - (effect: Effect.Effect): RxResult + (effect: Effect.Effect): Rx> ( effect: Effect.Effect, runtime: RxRuntime - ): RxResult + ): Rx> } = ( effect: Effect.Effect, runtime?: RxRuntime @@ -474,6 +469,81 @@ export const runtime: { ) } +function makeStream( + ctx: Context>, + stream: Stream.Stream, + runCallback = Effect.runCallback +): Result.Result { + const previous = ctx.self() + + const cancel = runCallback( + Effect.provideService( + Stream.runForEach(stream, (a) => Effect.sync(() => ctx.setSelf(Result.waiting(Option.some(Result.success(a)))))), + Context, + ctx as Context + ), + (exit) => { + if (exit._tag === "Failure") { + ctx.setSelf(Result.failure(exit.cause)) + } else { + pipe( + ctx.self(), + Option.flatMap(Result.value), + Option.match({ + onNone: () => ctx.setSelf(Result.fail(NoSuchElementException())), + onSome: (a) => ctx.setSelf(Result.success(a)) + }) + ) + } + } + ) + ctx.addFinalizer(cancel) + + if (previous._tag === "Some") { + return Result.waitingFrom(previous) + } + return Result.initial() +} + +function makeStreamRuntime( + ctx: Context>, + stream: Stream.Stream, + runtime: RxRuntime +): Result.Result { + const previous = ctx.self() + const runtimeResult = ctx.get(runtime) + + if (runtimeResult._tag !== "Success") { + if (runtimeResult._tag === "Waiting") { + return Result.waitingFrom(previous) + } + return runtimeResult as any + } + + return makeStream(ctx, stream as any, Runtime.runCallback(runtimeResult.value)) +} + +/** + * @since 1.0.0 + * @category constructors + */ +export const stream: { + (stream: Stream.Stream): Rx> + ( + stream: Effect.Effect, + runtime: RxRuntime + ): Rx> +} = ( + stream: Stream.Stream, + runtime?: RxRuntime +) => + readable>(function(ctx) { + if (runtime !== undefined) { + return makeStreamRuntime(ctx, stream, runtime) + } + return makeStream(ctx, stream as Stream.Stream) + }) + /** * @since 1.0.0 * @category combinators @@ -510,7 +580,9 @@ export const access = (rx: Rx): Effect.Effect => * @since 1.0.0 * @category accessors */ -export const accessResult = (rx: RxResult): Effect.Effect => +export const accessResult = ( + rx: Rx> +): Effect.Effect => Effect.flatMap( Context, (ctx): Effect.Effect => {