Skip to content

Commit

Permalink
Merge pull request #1 from tim-smart/refactor
Browse files Browse the repository at this point in the history
refactor: use fibers
  • Loading branch information
tim-smart authored Dec 3, 2023
2 parents aa3da41 + 063a797 commit f331d9a
Show file tree
Hide file tree
Showing 21 changed files with 483 additions and 368 deletions.
4 changes: 2 additions & 2 deletions examples/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
InteractionsRegistryLive,
} from "dfx/gateway"
import Dotenv from "dotenv"
import { Config, Effect, Layer } from "effect"
import { Config, Effect, Layer, LogLevel, Logger } from "effect"

Dotenv.config()

Expand Down Expand Up @@ -58,12 +58,12 @@ const MainLive = GreetLive.pipe(
Layer.provide(
DiscordConfig.layerConfig({
token: Config.secret("DISCORD_BOT_TOKEN"),
debug: Config.withDefault(Config.boolean("DEBUG"), false),
}),
),
)

Layer.launch(MainLive).pipe(
Effect.catchAllCause(Effect.logError),
Logger.withMinimumLogLevel(LogLevel.Trace),
Effect.runFork,
)
16 changes: 7 additions & 9 deletions src/DiscordConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ export interface DiscordConfig {
readonly _: unique symbol
}

export interface DiscordConfigValue {
export interface DiscordConfigService {
readonly token: ConfigSecret.ConfigSecret
readonly debug: boolean
readonly rest: {
readonly baseUrl: string
readonly globalRateLimit: {
Expand All @@ -31,23 +30,22 @@ export interface DiscordConfigValue {
readonly identifyRateLimit: readonly [window: number, limit: number]
}
}
export const DiscordConfig = Tag<DiscordConfig, DiscordConfigValue>()
export const DiscordConfig = Tag<DiscordConfig, DiscordConfigService>(
"dfx/DiscordConfig",
)

export interface MakeOpts {
readonly token: ConfigSecret.ConfigSecret
readonly debug?: boolean
readonly rest?: Partial<DiscordConfigValue["rest"]>
readonly gateway?: Partial<DiscordConfigValue["gateway"]>
readonly rest?: Partial<DiscordConfigService["rest"]>
readonly gateway?: Partial<DiscordConfigService["gateway"]>
}

export const make = ({
debug = false,
gateway,
rest,
token,
}: MakeOpts): DiscordConfigValue => ({
}: MakeOpts): DiscordConfigService => ({
token,
debug,
rest: {
baseUrl: `https://discord.com/api/v${VERSION}`,
...(rest ?? {}),
Expand Down
93 changes: 22 additions & 71 deletions src/DiscordGateway.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,19 @@
import { Messaging, MesssagingLive } from "dfx/DiscordGateway/Messaging"
import type { RunningShard } from "dfx/DiscordGateway/Shard"
import { Sharder, SharderLive } from "dfx/DiscordGateway/Sharder"
import type * as Discord from "dfx/types"
import { Tag } from "effect/Context"
import type * as HashSet from "effect/HashSet"
import * as Effect from "effect/Effect"
import * as PubSub from "effect/PubSub"
import type * as HashSet from "effect/HashSet"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream"
import type { RunningShard } from "dfx/DiscordGateway/Shard"
import { SharedLive, Sharder } from "dfx/DiscordGateway/Sharder"
import type * as Discord from "dfx/types"
import * as EffectUtils from "dfx/utils/Effect"
import * as Schedule from "effect/Schedule"

const fromDispatchFactory =
<R, E>(
source: Stream.Stream<R, E, Discord.GatewayPayload<Discord.ReceiveEvent>>,
) =>
<K extends keyof Discord.ReceiveEvents>(
event: K,
): Stream.Stream<R, E, Discord.ReceiveEvents[K]> =>
Stream.map(
Stream.filter(source, p => p.t === event),
p => p.d! as any,
)
import type * as Stream from "effect/Stream"

const handleDispatchFactory =
(hub: PubSub.PubSub<Discord.GatewayPayload<Discord.ReceiveEvent>>) =>
<K extends keyof Discord.ReceiveEvents, R, E, A>(
event: K,
handle: (event: Discord.ReceiveEvents[K]) => Effect.Effect<R, E, A>,
): Effect.Effect<R, E, never> =>
EffectUtils.subscribeForEachPar(hub, _ => {
if (_.t === event) {
return handle(_.d as any)
}
return Effect.unit as any
})
export const TypeId = Symbol.for("dfx/DiscordGateway")
export type TypeId = typeof TypeId

export interface DiscordGateway {
readonly [TypeId]: TypeId

readonly dispatch: Stream.Stream<
never,
never,
Expand All @@ -54,50 +31,24 @@ export interface DiscordGateway {
) => Effect.Effect<never, never, boolean>
readonly shards: Effect.Effect<never, never, HashSet.HashSet<RunningShard>>
}
export const DiscordGateway = Tag<DiscordGateway>()

export const DiscordGateway = Tag<DiscordGateway>(TypeId)

export const make = Effect.gen(function* (_) {
const sharder = yield* _(Sharder)
const hub = yield* _(
PubSub.unbounded<Discord.GatewayPayload<Discord.ReceiveEvent>>(),
)

const sendQueue = yield* _(
Queue.unbounded<Discord.GatewayPayload<Discord.SendEvent>>(),
)
const send = (payload: Discord.GatewayPayload<Discord.SendEvent>) =>
sendQueue.offer(payload)

const dispatch = Stream.fromPubSub(hub)
const fromDispatch = fromDispatchFactory(dispatch)
const handleDispatch = handleDispatchFactory(hub)

yield* _(
sharder.run(hub, sendQueue),
Effect.tapErrorCause(_ => Effect.logError("fatal error, restarting", _)),
Effect.retry(
Schedule.exponential("1 seconds").pipe(
Schedule.union(Schedule.spaced("30 seconds")),
),
),
Effect.forkScoped,
)
const messaging = yield* _(Messaging)

return DiscordGateway.of({
dispatch,
fromDispatch,
handleDispatch,
send,
[TypeId]: TypeId,
dispatch: messaging.dispatch,
fromDispatch: messaging.fromDispatch,
handleDispatch: messaging.handleDispatch,
send: messaging.send,
shards: sharder.shards,
})
}).pipe(
Effect.annotateLogs({
package: "dfx",
service: "DiscordGateway",
}),
)
})

export const DiscordGatewayLive = Layer.provide(
Layer.scoped(DiscordGateway, make),
SharedLive,
export const DiscordGatewayLive = Layer.effect(DiscordGateway, make).pipe(
Layer.provide(MesssagingLive),
Layer.provide(SharderLive),
)
36 changes: 23 additions & 13 deletions src/DiscordGateway/DiscordWS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ export interface OpenOpts {
onConnecting?: Effect.Effect<never, never, void>
}

export interface DiscordWSCodec {
export interface DiscordWSCodecService {
type: "json" | "etf"
encode: (p: Discord.GatewayPayload) => string
decode: (p: WebSocket.Data) => Discord.GatewayPayload
}
export const DiscordWSCodec = Tag<DiscordWSCodec>()
export interface DiscordWSCodec {
readonly _: unique symbol
}
export const DiscordWSCodec = Tag<DiscordWSCodec, DiscordWSCodecService>(
"dfx/DiscordGateway/DiscordWS/Codec",
)
export const JsonDiscordWSCodecLive = Layer.succeed(DiscordWSCodec, {
type: "json",
encode: p => JSON.stringify(p),
Expand All @@ -47,18 +52,19 @@ const make = Effect.gen(function* (_) {
const takeOutbound = Effect.map(outbound, msg =>
msg === Reconnect ? msg : encoding.encode(msg),
)
const socket = yield* _(ws.connect(urlRef, takeOutbound, onConnecting))
const take = Effect.map(socket.take, encoding.decode)

const run = Effect.retryWhile(
socket.run,
e =>
(e._tag === "WebSocketCloseError" && e.code < 2000) ||
(e._tag === "WebSocketError" && e.reason === "open-timeout"),
const socket = yield* _(
ws.connect({
urlRef,
takeOutbound,
onConnecting,
reconnectWhen: e =>
(e._tag === "WebSocketCloseError" && e.code < 2000) ||
(e._tag === "WebSocketError" && e.reason === "open-timeout"),
}),
)
const take = Effect.map(socket.take, encoding.decode)

return {
run,
take,
setUrl,
} as const
Expand All @@ -67,8 +73,12 @@ const make = Effect.gen(function* (_) {
return { connect } as const
})

export interface DiscordWS extends Effect.Effect.Success<typeof make> {}
export const DiscordWS = Tag<DiscordWS>()
export interface DiscordWS {
readonly _: unique symbol
}
export const DiscordWS = Tag<DiscordWS, Effect.Effect.Success<typeof make>>(
"dfx/DiscordGateway/DiscordWS",
)
export const DiscordWSLive = Layer.provide(
Layer.effect(DiscordWS, make),
WSLive,
Expand Down
72 changes: 72 additions & 0 deletions src/DiscordGateway/Messaging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { Tag } from "effect/Context"
import * as Effect from "effect/Effect"
import * as PubSub from "effect/PubSub"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream"
import type * as Discord from "dfx/types"
import * as EffectUtils from "dfx/utils/Effect"

const fromDispatchFactory =
<R, E>(
source: Stream.Stream<R, E, Discord.GatewayPayload<Discord.ReceiveEvent>>,
) =>
<K extends keyof Discord.ReceiveEvents>(
event: K,
): Stream.Stream<R, E, Discord.ReceiveEvents[K]> =>
Stream.map(
Stream.filter(source, p => p.t === event),
p => p.d! as any,
)

const handleDispatchFactory =
(hub: PubSub.PubSub<Discord.GatewayPayload<Discord.ReceiveEvent>>) =>
<K extends keyof Discord.ReceiveEvents, R, E, A>(
event: K,
handle: (event: Discord.ReceiveEvents[K]) => Effect.Effect<R, E, A>,
): Effect.Effect<R, E, never> =>
EffectUtils.subscribeForEachPar(hub, _ => {
if (_.t === event) {
return handle(_.d as any)
}
return Effect.unit as any
})

export const make = Effect.gen(function* (_) {
const hub = yield* _(
Effect.acquireRelease(
PubSub.unbounded<Discord.GatewayPayload<Discord.ReceiveEvent>>(),
PubSub.shutdown,
),
)

const sendQueue = yield* _(
Effect.acquireRelease(
Queue.unbounded<Discord.GatewayPayload<Discord.SendEvent>>(),
Queue.shutdown,
),
)
const send = (payload: Discord.GatewayPayload<Discord.SendEvent>) =>
sendQueue.offer(payload)

const dispatch = Stream.fromPubSub(hub)
const fromDispatch = fromDispatchFactory(dispatch)
const handleDispatch = handleDispatchFactory(hub)

return {
hub,
sendQueue,
dispatch,
fromDispatch,
handleDispatch,
send,
} as const
})

export interface Messsaging {
readonly _: unique symbol
}
export const Messaging = Tag<Messsaging, Effect.Effect.Success<typeof make>>(
"dfx/DiscordGateway/Messaging",
)
export const MesssagingLive = Layer.scoped(Messaging, make)
Loading

0 comments on commit f331d9a

Please sign in to comment.