Skip to content

Commit

Permalink
allow interruptions in Result
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 7, 2024
1 parent 1d6a8e0 commit ea179de
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/soft-crabs-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

allow interruptions in Result
7 changes: 7 additions & 0 deletions packages/rx/src/Result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ export interface Failure<E, A> extends Result.Proto<E, A> {
*/
export const isFailure = <E, A>(result: Result<E, A>): result is Failure<E, A> => result._tag === "Failure"

/**
* @since 1.0.0
* @category refinements
*/
export const isInterrupted = <E, A>(result: Result<E, A>): result is Failure<E, A> =>
result._tag === "Failure" && Cause.isInterruptedOnly(result.cause)

/**
* @since 1.0.0
* @category constructors
Expand Down
8 changes: 2 additions & 6 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,7 @@ function makeEffect<E, A>(
const cancel = runCallbackSync(runtime)(
scopedEffect,
function(exit) {
if (!Exit.isInterrupted(exit)) {
ctx.setSelfSync(Result.fromExitWithPrevious(exit, previous))
}
ctx.setSelfSync(Result.fromExitWithPrevious(exit, previous))
}
)
if (cancel !== undefined) {
Expand Down Expand Up @@ -592,9 +590,7 @@ function makeStream<E, A>(
),
(exit) => {
if (exit._tag === "Failure") {
if (!Exit.isInterrupted(exit)) {
ctx.setSelfSync(Result.failureWithPrevious(exit.cause, previous))
}
ctx.setSelfSync(Result.failureWithPrevious(exit.cause, previous))
} else {
pipe(
ctx.self<Result.Result<E | NoSuchElementException, A>>(),
Expand Down
5 changes: 3 additions & 2 deletions packages/rx/src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @since 1.0.0
*/
import { NoSuchElementException } from "effect/Cause"
import * as Effect from "effect/Effect"
import type * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as Runtime from "effect/Runtime"
import { SyncScheduler } from "effect/Scheduler"
Expand Down Expand Up @@ -49,6 +49,7 @@ export const runCallbackSync =
}
fiberRuntime.addObserver(onExit)
return function() {
Effect.runFork(fiberRuntime.interruptAsFork(fiberRuntime.id()))
fiberRuntime.removeObserver(onExit)
fiberRuntime.unsafeInterruptAsFork(fiberRuntime.id())
}
}

0 comments on commit ea179de

Please sign in to comment.