Skip to content

Commit

Permalink
stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Sep 19, 2023
1 parent d790ad1 commit 7d19e1e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 13 deletions.
8 changes: 7 additions & 1 deletion packages/rx/src/Result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import * as Data from "@effect/data/Data"
import { identity } from "@effect/data/Function"
import * as Option from "@effect/data/Option"
import type * as Cause from "@effect/io/Cause"
import * as Cause from "@effect/io/Cause"
import type * as Exit from "@effect/io/Exit"

/**
Expand Down Expand Up @@ -183,6 +183,12 @@ export const failure = <E, A>(cause: Cause.Cause<E>): Failure<E, A> => {
return result
}

/**
* @since 1.0.0
* @category constructors
*/
export const fail = <E, A>(error: E): Failure<E, A> => failure(Cause.fail(error))

/**
* @since 1.0.0
* @category accessors
Expand Down
96 changes: 84 additions & 12 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import * as Result from "@effect-rx/rx/Result"
import * as EffectContext from "@effect/data/Context"
import * as Equal from "@effect/data/Equal"
import { pipe } from "@effect/data/Function"
import * as Hash from "@effect/data/Hash"
import * as Inspectable from "@effect/data/Inspectable"
import type * as Option from "@effect/data/Option"
import * as Option from "@effect/data/Option"
import { type Pipeable, pipeArguments } from "@effect/data/Pipeable"
import { NoSuchElementException } from "@effect/io/Cause"
import * as Effect from "@effect/io/Effect"
Expand Down Expand Up @@ -262,12 +263,6 @@ export const state = <A>(
}
)

/**
* @since 1.0.0
* @category models
*/
export interface RxResult<E, A> extends Rx<Result.Result<E, A>> {}

function makeEffect<E, A>(
ctx: Context<Result.Result<E, A>>,
effect: Effect.Effect<RxContext, E, A>,
Expand Down Expand Up @@ -316,11 +311,11 @@ function makeEffectRuntime<R, E, A, RE>(
* @category constructors
*/
export const effect: {
<E, A>(effect: Effect.Effect<RxContext, E, A>): RxResult<E, A>
<E, A>(effect: Effect.Effect<RxContext, E, A>): Rx<Result.Result<E, A>>
<RR, R extends (RR | RxContext), E, A, RE>(
effect: Effect.Effect<R, E, A>,
runtime: RxRuntime<RE, RR>
): RxResult<RE | E, A>
): Rx<Result.Result<RE | E, A>>
} = <R, E, A, RE>(
effect: Effect.Effect<R, E, A>,
runtime?: RxRuntime<RE, R>
Expand All @@ -337,11 +332,11 @@ export const effect: {
* @category constructors
*/
export const scoped: {
<E, A>(effect: Effect.Effect<Scope.Scope | RxContext, E, A>): RxResult<E, A>
<E, A>(effect: Effect.Effect<Scope.Scope | RxContext, E, A>): Rx<Result.Result<E, A>>
<RR, R extends (RR | RxContext | Scope.Scope), E, A, RE>(
effect: Effect.Effect<R, E, A>,
runtime: RxRuntime<RE, RR>
): RxResult<RE | E, A>
): Rx<Result.Result<RE | E, A>>
} = <R, E, A, RE>(
effect: Effect.Effect<R | Scope.Scope | RxContext, E, A>,
runtime?: RxRuntime<RE, R>
Expand Down Expand Up @@ -474,6 +469,81 @@ export const runtime: {
)
}

function makeStream<E, A>(
ctx: Context<Result.Result<E | NoSuchElementException, A>>,
stream: Stream.Stream<RxContext, E, A>,
runCallback = Effect.runCallback
): Result.Result<E | NoSuchElementException, A> {
const previous = ctx.self()

const cancel = runCallback(
Effect.provideService(
Stream.runForEach(stream, (a) => Effect.sync(() => ctx.setSelf(Result.waiting(Option.some(Result.success(a)))))),
Context,
ctx as Context<unknown>
),
(exit) => {
if (exit._tag === "Failure") {
ctx.setSelf(Result.failure(exit.cause))
} else {
pipe(
ctx.self(),
Option.flatMap(Result.value),
Option.match({
onNone: () => ctx.setSelf(Result.fail(NoSuchElementException())),
onSome: (a) => ctx.setSelf(Result.success(a))
})
)
}
}
)
ctx.addFinalizer(cancel)

if (previous._tag === "Some") {
return Result.waitingFrom(previous)
}
return Result.initial()
}

function makeStreamRuntime<R, E, A, RE>(
ctx: Context<Result.Result<E | NoSuchElementException, A>>,
stream: Stream.Stream<R | RxContext, E, A>,
runtime: RxRuntime<RE, R>
): Result.Result<E | NoSuchElementException, A> {
const previous = ctx.self()
const runtimeResult = ctx.get(runtime)

if (runtimeResult._tag !== "Success") {
if (runtimeResult._tag === "Waiting") {
return Result.waitingFrom(previous)
}
return runtimeResult as any
}

return makeStream(ctx, stream as any, Runtime.runCallback(runtimeResult.value))
}

/**
* @since 1.0.0
* @category constructors
*/
export const stream: {
<E, A>(stream: Stream.Stream<RxContext, E, A>): Rx<Result.Result<E | NoSuchElementException, A>>
<RR, R extends (RR | RxContext), E, A, RE>(
stream: Effect.Effect<R, E, A>,
runtime: RxRuntime<RE, RR>
): Rx<Result.Result<RE | E | NoSuchElementException, A>>
} = <R, E, A, RE>(
stream: Stream.Stream<R, E, A>,
runtime?: RxRuntime<RE, R>
) =>
readable<Result.Result<E | NoSuchElementException, A>>(function(ctx) {
if (runtime !== undefined) {
return makeStreamRuntime(ctx, stream, runtime)
}
return makeStream(ctx, stream as Stream.Stream<RxContext, E, A>)
})

/**
* @since 1.0.0
* @category combinators
Expand Down Expand Up @@ -510,7 +580,9 @@ export const access = <A>(rx: Rx<A>): Effect.Effect<RxContext, never, A> =>
* @since 1.0.0
* @category accessors
*/
export const accessResult = <E, A>(rx: RxResult<E, A>): Effect.Effect<RxContext, E | NoSuchElementException, A> =>
export const accessResult = <E, A>(
rx: Rx<Result.Result<E, A>>
): Effect.Effect<RxContext, E | NoSuchElementException, A> =>
Effect.flatMap(
Context,
(ctx): Effect.Effect<never, E | NoSuchElementException, A> => {
Expand Down

0 comments on commit 7d19e1e

Please sign in to comment.