From 7ba6409bbff8758128602e6b9ca24d42ec756210 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 29 Apr 2024 14:58:17 +1200 Subject: [PATCH] refactor: use ShardStateStore to store resume state --- .vscode/snippets.code-snippets | 16 ---- src/DiscordGateway/Shard.ts | 103 +++++++++++---------- src/DiscordGateway/Shard/StateStore.ts | 46 +++++++++ src/DiscordGateway/Shard/heartbeats.ts | 15 +-- src/DiscordGateway/Shard/identify.ts | 28 ++---- src/DiscordGateway/Shard/invalidSession.ts | 11 --- src/DiscordGateway/Shard/utils.ts | 29 +----- src/gateway.ts | 2 + 8 files changed, 120 insertions(+), 130 deletions(-) delete mode 100644 .vscode/snippets.code-snippets create mode 100644 src/DiscordGateway/Shard/StateStore.ts delete mode 100644 src/DiscordGateway/Shard/invalidSession.ts diff --git a/.vscode/snippets.code-snippets b/.vscode/snippets.code-snippets deleted file mode 100644 index 6408432..0000000 --- a/.vscode/snippets.code-snippets +++ /dev/null @@ -1,16 +0,0 @@ -{ - "Gen": { - "prefix": "gg", - "body": [ - "Effect.gen(function*(_) {", - " $0", - "})" - ], - "description": "Begins an Effect.gen syntax block" - }, - "Yield": { - "prefix": "yy", - "body": ["const $1 = yield* _($0)"], - "description": "Yields a value from a generator" - } -} diff --git a/src/DiscordGateway/Shard.ts b/src/DiscordGateway/Shard.ts index fa837a4..059ee98 100644 --- a/src/DiscordGateway/Shard.ts +++ b/src/DiscordGateway/Shard.ts @@ -1,14 +1,3 @@ -import * as Chunk from "effect/Chunk" -import { GenericTag } from "effect/Context" -import * as Duration from "effect/Duration" -import { pipe } from "effect/Function" -import * as Option from "effect/Option" -import * as Secret from "effect/Secret" -import * as Effect from "effect/Effect" -import * as PubSub from "effect/PubSub" -import * as Layer from "effect/Layer" -import * as Queue from "effect/Queue" -import * as Ref from "effect/Ref" import { DiscordConfig } from "dfx/DiscordConfig" import type { Message } from "dfx/DiscordGateway/DiscordWS" import { @@ -16,13 +5,25 @@ import { DiscordWSLive, Reconnect, } from "dfx/DiscordGateway/DiscordWS" +import { Messaging, MesssagingLive } from "dfx/DiscordGateway/Messaging" +import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore" +import { ShardStateStore } from "dfx/DiscordGateway/Shard/StateStore" import * as Heartbeats from "dfx/DiscordGateway/Shard/heartbeats" import * as Identify from "dfx/DiscordGateway/Shard/identify" -import * as InvalidSession from "dfx/DiscordGateway/Shard/invalidSession" -import * as Utils from "dfx/DiscordGateway/Shard/utils" -import { RateLimiterLive, RateLimiter } from "dfx/RateLimit" +import { RateLimiter, RateLimiterLive } from "dfx/RateLimit" import * as Discord from "dfx/types" -import { Messaging, MesssagingLive } from "dfx/DiscordGateway/Messaging" +import * as Chunk from "effect/Chunk" +import { GenericTag } from "effect/Context" +import * as Duration from "effect/Duration" +import * as Effect from "effect/Effect" +import { pipe } from "effect/Function" +import * as Layer from "effect/Layer" +import * as Option from "effect/Option" +import * as PubSub from "effect/PubSub" +import * as Queue from "effect/Queue" +import * as Ref from "effect/Ref" +import * as Secret from "effect/Secret" +import type * as Types from "effect/Types" const enum Phase { Connecting, @@ -35,13 +36,23 @@ export const make = Effect.gen(function* () { const limiter = yield* RateLimiter const dws = yield* DiscordWS const { hub, sendQueue } = yield* Messaging + const shardState = yield* ShardStateStore const connect = (shard: [id: number, count: number]) => - Effect.gen(function* () { + Effect.gen(function* (_) { const outboundQueue = yield* Queue.unbounded() const pendingQueue = yield* Queue.unbounded() const phase = yield* Ref.make(Phase.Connecting) - const setPhase = (p: Phase) => + const stateStore = shardState.forShard(shard) + const resumeState: Types.Mutable = Option.getOrElse( + yield* stateStore.get, + () => ({ + resumeUrl: "", + sessionId: "", + sequence: 0, + }), + ) + const setPhase = (p: Phase): Effect.Effect => Effect.zipLeft( Ref.set(phase, p), Effect.annotateLogs(Effect.logTrace("phase transition"), "phase", p), @@ -94,29 +105,6 @@ export const make = Effect.gen(function* () { const socket = yield* dws.connect({ outbound, onConnecting }) - const isReady = Option.liftPredicate( - ( - p: Discord.GatewayPayload, - ): p is Discord.GatewayPayload => - p.op === Discord.GatewayOpcode.DISPATCH && p.t === "READY", - ) - - const [latestReady, updateLatestReady] = yield* Utils.latest(p => - Option.map(isReady(p), p => p.d!), - ) - const [latestSequence, updateLatestSequence] = yield* Utils.latest(p => - Option.fromNullable(p.s), - ) - const maybeUpdateUrl = (p: Discord.GatewayPayload) => - Option.match( - Option.map(isReady(p), p => p.d!), - { - onNone: () => Effect.void, - onSome: ({ resume_gateway_url }) => - socket.setUrl(resume_gateway_url), - }, - ) - const hellos = yield* Effect.acquireRelease( Queue.unbounded(), Queue.shutdown, @@ -127,7 +115,7 @@ export const make = Effect.gen(function* () { ) // heartbeats - yield* Heartbeats.send(hellos, acks, latestSequence, heartbeatSend).pipe( + yield* Heartbeats.send(hellos, acks, stateStore.get, heartbeatSend).pipe( Effect.forkScoped, Effect.interruptible, ) @@ -140,15 +128,29 @@ export const make = Effect.gen(function* () { intents: gateway.intents, presence: gateway.presence, }, - latestReady, - latestSequence, + stateStore.get, ) const onPayload = (p: Discord.GatewayPayload) => pipe( - updateLatestReady(p), - Effect.zipRight(updateLatestSequence(p)), - Effect.zipRight(maybeUpdateUrl(p)), + Effect.suspend(() => { + if (typeof p.s === "number") { + resumeState.sequence = p.s + } + if (p.op === Discord.GatewayOpcode.DISPATCH && p.t === "READY") { + const payload = p.d as Discord.ReadyEvent + resumeState.sessionId = payload.session_id + resumeState.resumeUrl = payload.resume_gateway_url + return Effect.zipRight( + stateStore.set(resumeState), + socket.setUrl(payload.resume_gateway_url), + ) + } + if (resumeState.resumeUrl !== "" && resumeState.sessionId !== "") { + return stateStore.set(resumeState) + } + return Effect.void + }), Effect.tap(() => { switch (p.op) { case Discord.GatewayOpcode.HELLO: { @@ -162,10 +164,11 @@ export const make = Effect.gen(function* () { return Queue.offer(acks, p) } case Discord.GatewayOpcode.INVALID_SESSION: { - return Effect.tap( - InvalidSession.fromPayload(p, latestReady), - send, - ) + if (p.d) { + return send(Reconnect) + } + resumeState.sessionId = "" + return Effect.zipRight(stateStore.clear, send(Reconnect)) } case Discord.GatewayOpcode.DISPATCH: { if (p.t === "READY" || p.t === "RESUMED") { diff --git a/src/DiscordGateway/Shard/StateStore.ts b/src/DiscordGateway/Shard/StateStore.ts new file mode 100644 index 0000000..6a678a8 --- /dev/null +++ b/src/DiscordGateway/Shard/StateStore.ts @@ -0,0 +1,46 @@ +import * as Effect from "effect/Effect" +import * as Option from "effect/Option" +import * as Context from "effect/Context" +import * as Layer from "effect/Layer" + +export interface ShardState { + readonly resumeUrl: string + readonly sequence: number | null + readonly sessionId: string +} + +export interface ShardStateStore { + readonly _: unique symbol +} + +export interface StateStore { + readonly get: Effect.Effect> + readonly set: (state: ShardState) => Effect.Effect + readonly clear: Effect.Effect +} + +export const ShardStateStore = Context.GenericTag< + ShardStateStore, + { readonly forShard: (id: [id: number, count: number]) => StateStore } +>("dfx/Shard/StateStore") + +export const MemoryShardStateStoreLive = Layer.sync(ShardStateStore, () => { + const store = new Map() + + return ShardStateStore.of({ + forShard: ([id, count]) => { + const key = `${id}-${count}` + return { + get: Effect.sync(() => Option.fromNullable(store.get(key))), + set(state) { + return Effect.sync(() => { + store.set(key, state) + }) + }, + clear: Effect.sync(() => { + store.delete(key) + }), + } + }, + }) +}) diff --git a/src/DiscordGateway/Shard/heartbeats.ts b/src/DiscordGateway/Shard/heartbeats.ts index c7a36b1..34a5e4a 100644 --- a/src/DiscordGateway/Shard/heartbeats.ts +++ b/src/DiscordGateway/Shard/heartbeats.ts @@ -8,33 +8,34 @@ import * as DiscordWS from "dfx/DiscordGateway/DiscordWS" import * as SendEvents from "dfx/DiscordGateway/Shard/sendEvents" import type * as Discord from "dfx/types" import * as EffectU from "dfx/utils/Effect" +import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore" -const payload = (seqRef: Ref.Ref>) => - Effect.map(Ref.get(seqRef), seq => - SendEvents.heartbeat(Option.getOrNull(seq)), +const payload = (state: Effect.Effect>) => + Effect.map(state, state => + SendEvents.heartbeat(Option.getOrNull(Option.map(state, s => s.sequence))), ) const payloadOrReconnect = ( ref: Ref.Ref, - seqRef: Ref.Ref>, + state: Effect.Effect>, ) => Effect.flatMap( Ref.get(ref), (acked): Effect.Effect => - acked ? payload(seqRef) : Effect.succeed(DiscordWS.Reconnect), + acked ? payload(state) : Effect.succeed(DiscordWS.Reconnect), ) export const send = ( hellos: Queue.Dequeue, acks: Queue.Dequeue, - seqRef: Ref.Ref>, + state: Effect.Effect>, send: (p: DiscordWS.Message) => Effect.Effect, ) => Effect.flatMap(Ref.make(true), ackedRef => { const heartbeats = EffectU.foreverSwitch( Effect.zipLeft(Queue.take(hellos), Ref.set(ackedRef, true)), p => - payloadOrReconnect(ackedRef, seqRef).pipe( + payloadOrReconnect(ackedRef, state).pipe( Effect.zipLeft(Ref.set(ackedRef, false)), Effect.tap(send), Effect.schedule( diff --git a/src/DiscordGateway/Shard/identify.ts b/src/DiscordGateway/Shard/identify.ts index 809cb97..c0230d9 100644 --- a/src/DiscordGateway/Shard/identify.ts +++ b/src/DiscordGateway/Shard/identify.ts @@ -1,9 +1,9 @@ import * as Option from "effect/Option" import * as Effect from "effect/Effect" -import * as Ref from "effect/Ref" import * as SendEvents from "dfx/DiscordGateway/Shard/sendEvents" import type * as Discord from "dfx/types" import * as OS from "os" +import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore" export interface Options { readonly token: string @@ -12,11 +12,6 @@ export interface Options { readonly presence?: Discord.UpdatePresence } -export interface Requirements { - readonly latestReady: Ref.Ref> - readonly latestSequence: Ref.Ref> -} - const identify = ({ intents, presence, shard, token }: Options) => SendEvents.identify({ token, @@ -30,27 +25,24 @@ const identify = ({ intents, presence, shard, token }: Options) => presence, }) -const resume = (token: string, ready: Discord.ReadyEvent, seq: number) => +const resume = (token: string, session_id: string, seq: number | null) => SendEvents.resume({ token, - session_id: ready.session_id, - seq, + session_id, + seq: seq!, }) export const identifyOrResume = ( opts: Options, - ready: Ref.Ref>, - seq: Ref.Ref>, + state: Effect.Effect>, ): Effect.Effect< | Discord.GatewayPayload | Discord.GatewayPayload > => Effect.map( - Effect.all([Ref.get(ready), Ref.get(seq)]), - ([readyEvent, seqNumber]) => - Option.match(Option.all({ readyEvent, seqNumber }), { - onNone: () => identify(opts), - onSome: ({ readyEvent, seqNumber }) => - resume(opts.token, readyEvent, seqNumber), - }), + state, + Option.match({ + onNone: () => identify(opts), + onSome: state => resume(opts.token, state.sessionId, state.sequence), + }), ) diff --git a/src/DiscordGateway/Shard/invalidSession.ts b/src/DiscordGateway/Shard/invalidSession.ts deleted file mode 100644 index ead80c6..0000000 --- a/src/DiscordGateway/Shard/invalidSession.ts +++ /dev/null @@ -1,11 +0,0 @@ -import * as Option from "effect/Option" -import * as Effect from "effect/Effect" -import * as Ref from "effect/Ref" -import { Reconnect, type Message } from "dfx/DiscordGateway/DiscordWS" -import type * as Discord from "dfx/types" - -export const fromPayload = ( - p: Discord.GatewayPayload, - latestReady: Ref.Ref>, -): Effect.Effect => - Effect.as(p.d ? Effect.void : Ref.set(latestReady, Option.none()), Reconnect) diff --git a/src/DiscordGateway/Shard/utils.ts b/src/DiscordGateway/Shard/utils.ts index 58914d6..c413e10 100644 --- a/src/DiscordGateway/Shard/utils.ts +++ b/src/DiscordGateway/Shard/utils.ts @@ -1,8 +1,5 @@ -import * as Option from "effect/Option" -import * as Effect from "effect/Effect" -import * as Ref from "effect/Ref" -import * as Stream from "effect/Stream" import type * as Discord from "dfx/types" +import * as Stream from "effect/Stream" export const opCode = (source: Stream.Stream) => @@ -12,27 +9,3 @@ export const opCode = source.pipe( Stream.filter((p): p is Discord.GatewayPayload => p.op === code), ) - -const maybeUpdateRef = - ( - f: (p: Discord.GatewayPayload) => Option.Option, - ref: Ref.Ref>, - ) => - (_: Discord.GatewayPayload): Effect.Effect => - Option.match(f(_), { - onNone: () => Effect.void, - onSome: a => Ref.set(ref, Option.some(a)), - }) - -export const latest = ( - f: (p: Discord.GatewayPayload) => Option.Option, -): Effect.Effect< - readonly [ - Ref.Ref>, - (_: Discord.GatewayPayload) => Effect.Effect, - ] -> => - Effect.map( - Ref.make>(Option.none()), - ref => [ref, maybeUpdateRef(f, ref)] as const, - ) diff --git a/src/gateway.ts b/src/gateway.ts index c48e999..a913c36 100644 --- a/src/gateway.ts +++ b/src/gateway.ts @@ -3,6 +3,7 @@ import { DiscordGatewayLive } from "dfx/DiscordGateway" import * as DiscordWS from "dfx/DiscordGateway/DiscordWS" import { JsonDiscordWSCodecLive } from "dfx/DiscordGateway/DiscordWS" import * as Shard from "dfx/DiscordGateway/Shard" +import { MemoryShardStateStoreLive } from "dfx/DiscordGateway/Shard/StateStore" import * as SendEvent from "dfx/DiscordGateway/Shard/sendEvents" import * as ShardStore from "dfx/DiscordGateway/ShardStore" import { MemoryShardStoreLive } from "dfx/DiscordGateway/ShardStore" @@ -31,6 +32,7 @@ export const DiscordLive = Layer.mergeAll( Layer.provide(JsonDiscordWSCodecLive), Layer.provide(MemoryRateLimitStoreLive), Layer.provide(MemoryShardStoreLive), + Layer.provide(MemoryShardStateStoreLive), ) export const DiscordIxLive = InteractionsRegistryLive.pipe(