diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index 9c6d5e9..241eb03 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -2,6 +2,7 @@ * @since 1.0.0 */ import * as Result from "@effect-rx/rx/Result" +import type * as Chunk from "@effect/data/Chunk" import * as EffectContext from "@effect/data/Context" import * as Equal from "@effect/data/Equal" import { pipe } from "@effect/data/Function" @@ -544,6 +545,33 @@ export const stream: { return makeStream(ctx, stream as Stream.Stream) }) +/** + * @since 1.0.0 + * @category constructors + */ +export const streamPull = ( + stream: Stream.Stream +) => { + const pullRx = scoped(Stream.toPull(stream)) + const counter = state(0) + return writable>, void>(function(ctx) { + ctx.get(counter) + const pull = pipe( + ctx.get(pullRx), + Effect.catchAllCause(Option.match({ + onNone: () => Effect.fail(NoSuchElementException()), + onSome: (pull) => pull + })) + ) + return makeEffect(ctx, pull) + }, function(get, set, _setSelf, _) { + set(counter, get(counter) + 1) + }, function(refresh) { + refresh(counter) + refresh(pullRx) + }) +} + /** * @since 1.0.0 * @category combinators