Skip to content

Commit

Permalink
streamPull wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Sep 19, 2023
1 parent 7d19e1e commit d22ead7
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* @since 1.0.0
*/
import * as Result from "@effect-rx/rx/Result"
import type * as Chunk from "@effect/data/Chunk"
import * as EffectContext from "@effect/data/Context"
import * as Equal from "@effect/data/Equal"
import { pipe } from "@effect/data/Function"
Expand Down Expand Up @@ -544,6 +545,33 @@ export const stream: {
return makeStream(ctx, stream as Stream.Stream<RxContext, E, A>)
})

/**
* @since 1.0.0
* @category constructors
*/
export const streamPull = <E, A, RE>(
stream: Stream.Stream<RxContext, E, A>
) => {
const pullRx = scoped(Stream.toPull(stream))
const counter = state(0)
return writable<Result.Result<E | NoSuchElementException, Chunk.Chunk<A>>, void>(function(ctx) {
ctx.get(counter)
const pull = pipe(
ctx.get(pullRx),
Effect.catchAllCause(Option.match({

Check failure on line 561 in packages/rx/src/Rx.ts

View workflow job for this annotation

GitHub Actions / build (16.17.1)

Argument of type '<R, A>(self: Effect<R, unknown, A>) => Effect<unknown, unknown, unknown>' is not assignable to parameter of type '(a: Result<never, Effect<RxContext, Option<E>, Chunk<A>>>) => Effect<unknown, unknown, unknown>'.

Check failure on line 561 in packages/rx/src/Rx.ts

View workflow job for this annotation

GitHub Actions / build (16.17.1)

Argument of type '(self: Option<unknown>) => unknown' is not assignable to parameter of type '(cause: Cause<unknown>) => Effect<unknown, unknown, unknown>'.
onNone: () => Effect.fail(NoSuchElementException()),
onSome: (pull) => pull
}))
)
return makeEffect(ctx, pull)

Check failure on line 566 in packages/rx/src/Rx.ts

View workflow job for this annotation

GitHub Actions / build (16.17.1)

Argument of type 'unknown' is not assignable to parameter of type 'Effect<RxContext, NoSuchElementException | E, Chunk<A>>'.
}, function(get, set, _setSelf, _) {
set(counter, get(counter) + 1)
}, function(refresh) {
refresh(counter)
refresh(pullRx)
})
}

/**
* @since 1.0.0
* @category combinators
Expand Down

0 comments on commit d22ead7

Please sign in to comment.