Skip to content

Commit

Permalink
refactor: use ShardStateStore to store resume state
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Apr 29, 2024
1 parent 984305a commit 7ba6409
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 130 deletions.
16 changes: 0 additions & 16 deletions .vscode/snippets.code-snippets

This file was deleted.

103 changes: 53 additions & 50 deletions src/DiscordGateway/Shard.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import * as Chunk from "effect/Chunk"
import { GenericTag } from "effect/Context"
import * as Duration from "effect/Duration"
import { pipe } from "effect/Function"
import * as Option from "effect/Option"
import * as Secret from "effect/Secret"
import * as Effect from "effect/Effect"
import * as PubSub from "effect/PubSub"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Ref from "effect/Ref"
import { DiscordConfig } from "dfx/DiscordConfig"
import type { Message } from "dfx/DiscordGateway/DiscordWS"
import {
DiscordWS,
DiscordWSLive,
Reconnect,
} from "dfx/DiscordGateway/DiscordWS"
import { Messaging, MesssagingLive } from "dfx/DiscordGateway/Messaging"
import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore"
import { ShardStateStore } from "dfx/DiscordGateway/Shard/StateStore"
import * as Heartbeats from "dfx/DiscordGateway/Shard/heartbeats"
import * as Identify from "dfx/DiscordGateway/Shard/identify"
import * as InvalidSession from "dfx/DiscordGateway/Shard/invalidSession"
import * as Utils from "dfx/DiscordGateway/Shard/utils"
import { RateLimiterLive, RateLimiter } from "dfx/RateLimit"
import { RateLimiter, RateLimiterLive } from "dfx/RateLimit"
import * as Discord from "dfx/types"
import { Messaging, MesssagingLive } from "dfx/DiscordGateway/Messaging"
import * as Chunk from "effect/Chunk"
import { GenericTag } from "effect/Context"
import * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import { pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as PubSub from "effect/PubSub"
import * as Queue from "effect/Queue"
import * as Ref from "effect/Ref"
import * as Secret from "effect/Secret"
import type * as Types from "effect/Types"

const enum Phase {
Connecting,
Expand All @@ -35,13 +36,23 @@ export const make = Effect.gen(function* () {
const limiter = yield* RateLimiter
const dws = yield* DiscordWS
const { hub, sendQueue } = yield* Messaging
const shardState = yield* ShardStateStore

const connect = (shard: [id: number, count: number]) =>
Effect.gen(function* () {
Effect.gen(function* (_) {
const outboundQueue = yield* Queue.unbounded<Message>()
const pendingQueue = yield* Queue.unbounded<Message>()
const phase = yield* Ref.make(Phase.Connecting)
const setPhase = (p: Phase) =>
const stateStore = shardState.forShard(shard)
const resumeState: Types.Mutable<ShardState> = Option.getOrElse(
yield* stateStore.get,
() => ({
resumeUrl: "",
sessionId: "",
sequence: 0,
}),
)
const setPhase = (p: Phase): Effect.Effect<void> =>
Effect.zipLeft(
Ref.set(phase, p),
Effect.annotateLogs(Effect.logTrace("phase transition"), "phase", p),
Expand Down Expand Up @@ -94,29 +105,6 @@ export const make = Effect.gen(function* () {

const socket = yield* dws.connect({ outbound, onConnecting })

const isReady = Option.liftPredicate(
(
p: Discord.GatewayPayload,
): p is Discord.GatewayPayload<Discord.ReadyEvent> =>
p.op === Discord.GatewayOpcode.DISPATCH && p.t === "READY",
)

const [latestReady, updateLatestReady] = yield* Utils.latest(p =>
Option.map(isReady(p), p => p.d!),
)
const [latestSequence, updateLatestSequence] = yield* Utils.latest(p =>
Option.fromNullable(p.s),
)
const maybeUpdateUrl = (p: Discord.GatewayPayload) =>
Option.match(
Option.map(isReady(p), p => p.d!),
{
onNone: () => Effect.void,
onSome: ({ resume_gateway_url }) =>
socket.setUrl(resume_gateway_url),
},
)

const hellos = yield* Effect.acquireRelease(
Queue.unbounded<Discord.GatewayPayload>(),
Queue.shutdown,
Expand All @@ -127,7 +115,7 @@ export const make = Effect.gen(function* () {
)

// heartbeats
yield* Heartbeats.send(hellos, acks, latestSequence, heartbeatSend).pipe(
yield* Heartbeats.send(hellos, acks, stateStore.get, heartbeatSend).pipe(
Effect.forkScoped,
Effect.interruptible,
)
Expand All @@ -140,15 +128,29 @@ export const make = Effect.gen(function* () {
intents: gateway.intents,
presence: gateway.presence,
},
latestReady,
latestSequence,
stateStore.get,
)

const onPayload = (p: Discord.GatewayPayload) =>
pipe(
updateLatestReady(p),
Effect.zipRight(updateLatestSequence(p)),
Effect.zipRight(maybeUpdateUrl(p)),
Effect.suspend(() => {
if (typeof p.s === "number") {
resumeState.sequence = p.s
}
if (p.op === Discord.GatewayOpcode.DISPATCH && p.t === "READY") {
const payload = p.d as Discord.ReadyEvent
resumeState.sessionId = payload.session_id
resumeState.resumeUrl = payload.resume_gateway_url
return Effect.zipRight(
stateStore.set(resumeState),
socket.setUrl(payload.resume_gateway_url),
)
}
if (resumeState.resumeUrl !== "" && resumeState.sessionId !== "") {
return stateStore.set(resumeState)
}
return Effect.void
}),
Effect.tap(() => {
switch (p.op) {
case Discord.GatewayOpcode.HELLO: {
Expand All @@ -162,10 +164,11 @@ export const make = Effect.gen(function* () {
return Queue.offer(acks, p)
}
case Discord.GatewayOpcode.INVALID_SESSION: {
return Effect.tap(
InvalidSession.fromPayload(p, latestReady),
send,
)
if (p.d) {
return send(Reconnect)
}
resumeState.sessionId = ""
return Effect.zipRight(stateStore.clear, send(Reconnect))
}
case Discord.GatewayOpcode.DISPATCH: {
if (p.t === "READY" || p.t === "RESUMED") {
Expand Down
46 changes: 46 additions & 0 deletions src/DiscordGateway/Shard/StateStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Effect from "effect/Effect"
import * as Option from "effect/Option"
import * as Context from "effect/Context"
import * as Layer from "effect/Layer"

export interface ShardState {
readonly resumeUrl: string
readonly sequence: number | null
readonly sessionId: string
}

export interface ShardStateStore {
readonly _: unique symbol
}

export interface StateStore {
readonly get: Effect.Effect<Option.Option<ShardState>>
readonly set: (state: ShardState) => Effect.Effect<void>
readonly clear: Effect.Effect<void>
}

export const ShardStateStore = Context.GenericTag<
ShardStateStore,
{ readonly forShard: (id: [id: number, count: number]) => StateStore }
>("dfx/Shard/StateStore")

export const MemoryShardStateStoreLive = Layer.sync(ShardStateStore, () => {
const store = new Map<string, ShardState>()

return ShardStateStore.of({
forShard: ([id, count]) => {
const key = `${id}-${count}`
return {
get: Effect.sync(() => Option.fromNullable(store.get(key))),
set(state) {
return Effect.sync(() => {
store.set(key, state)
})
},
clear: Effect.sync(() => {
store.delete(key)
}),
}
},
})
})
15 changes: 8 additions & 7 deletions src/DiscordGateway/Shard/heartbeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,34 @@ import * as DiscordWS from "dfx/DiscordGateway/DiscordWS"
import * as SendEvents from "dfx/DiscordGateway/Shard/sendEvents"
import type * as Discord from "dfx/types"
import * as EffectU from "dfx/utils/Effect"
import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore"

const payload = (seqRef: Ref.Ref<Option.Option<number>>) =>
Effect.map(Ref.get(seqRef), seq =>
SendEvents.heartbeat(Option.getOrNull(seq)),
const payload = (state: Effect.Effect<Option.Option<ShardState>>) =>
Effect.map(state, state =>
SendEvents.heartbeat(Option.getOrNull(Option.map(state, s => s.sequence))),
)

const payloadOrReconnect = (
ref: Ref.Ref<boolean>,
seqRef: Ref.Ref<Option.Option<number>>,
state: Effect.Effect<Option.Option<ShardState>>,
) =>
Effect.flatMap(
Ref.get(ref),
(acked): Effect.Effect<DiscordWS.Message> =>
acked ? payload(seqRef) : Effect.succeed(DiscordWS.Reconnect),
acked ? payload(state) : Effect.succeed(DiscordWS.Reconnect),
)

export const send = (
hellos: Queue.Dequeue<Discord.GatewayPayload>,
acks: Queue.Dequeue<Discord.GatewayPayload>,
seqRef: Ref.Ref<Option.Option<number>>,
state: Effect.Effect<Option.Option<ShardState>>,
send: (p: DiscordWS.Message) => Effect.Effect<boolean>,
) =>
Effect.flatMap(Ref.make(true), ackedRef => {
const heartbeats = EffectU.foreverSwitch(
Effect.zipLeft(Queue.take(hellos), Ref.set(ackedRef, true)),
p =>
payloadOrReconnect(ackedRef, seqRef).pipe(
payloadOrReconnect(ackedRef, state).pipe(
Effect.zipLeft(Ref.set(ackedRef, false)),
Effect.tap(send),
Effect.schedule(
Expand Down
28 changes: 10 additions & 18 deletions src/DiscordGateway/Shard/identify.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as Option from "effect/Option"
import * as Effect from "effect/Effect"
import * as Ref from "effect/Ref"
import * as SendEvents from "dfx/DiscordGateway/Shard/sendEvents"
import type * as Discord from "dfx/types"
import * as OS from "os"
import type { ShardState } from "dfx/DiscordGateway/Shard/StateStore"

export interface Options {
readonly token: string
Expand All @@ -12,11 +12,6 @@ export interface Options {
readonly presence?: Discord.UpdatePresence
}

export interface Requirements {
readonly latestReady: Ref.Ref<Option.Option<Discord.ReadyEvent>>
readonly latestSequence: Ref.Ref<Option.Option<number>>
}

const identify = ({ intents, presence, shard, token }: Options) =>
SendEvents.identify({
token,
Expand All @@ -30,27 +25,24 @@ const identify = ({ intents, presence, shard, token }: Options) =>
presence,
})

const resume = (token: string, ready: Discord.ReadyEvent, seq: number) =>
const resume = (token: string, session_id: string, seq: number | null) =>
SendEvents.resume({
token,
session_id: ready.session_id,
seq,
session_id,
seq: seq!,
})

export const identifyOrResume = (
opts: Options,
ready: Ref.Ref<Option.Option<Discord.ReadyEvent>>,
seq: Ref.Ref<Option.Option<number>>,
state: Effect.Effect<Option.Option<ShardState>>,
): Effect.Effect<
| Discord.GatewayPayload<Discord.Identify>
| Discord.GatewayPayload<Discord.Resume>
> =>
Effect.map(
Effect.all([Ref.get(ready), Ref.get(seq)]),
([readyEvent, seqNumber]) =>
Option.match(Option.all({ readyEvent, seqNumber }), {
onNone: () => identify(opts),
onSome: ({ readyEvent, seqNumber }) =>
resume(opts.token, readyEvent, seqNumber),
}),
state,
Option.match({
onNone: () => identify(opts),
onSome: state => resume(opts.token, state.sessionId, state.sequence),
}),
)
11 changes: 0 additions & 11 deletions src/DiscordGateway/Shard/invalidSession.ts

This file was deleted.

29 changes: 1 addition & 28 deletions src/DiscordGateway/Shard/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import * as Option from "effect/Option"
import * as Effect from "effect/Effect"
import * as Ref from "effect/Ref"
import * as Stream from "effect/Stream"
import type * as Discord from "dfx/types"
import * as Stream from "effect/Stream"

export const opCode =
<R, E>(source: Stream.Stream<Discord.GatewayPayload, E, R>) =>
Expand All @@ -12,27 +9,3 @@ export const opCode =
source.pipe(
Stream.filter((p): p is Discord.GatewayPayload<T> => p.op === code),
)

const maybeUpdateRef =
<T>(
f: (p: Discord.GatewayPayload) => Option.Option<T>,
ref: Ref.Ref<Option.Option<T>>,
) =>
(_: Discord.GatewayPayload): Effect.Effect<void> =>
Option.match(f(_), {
onNone: () => Effect.void,
onSome: a => Ref.set(ref, Option.some(a)),
})

export const latest = <T>(
f: (p: Discord.GatewayPayload) => Option.Option<T>,
): Effect.Effect<
readonly [
Ref.Ref<Option.Option<T>>,
(_: Discord.GatewayPayload<any>) => Effect.Effect<void>,
]
> =>
Effect.map(
Ref.make<Option.Option<T>>(Option.none()),
ref => [ref, maybeUpdateRef(f, ref)] as const,
)
2 changes: 2 additions & 0 deletions src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DiscordGatewayLive } from "dfx/DiscordGateway"
import * as DiscordWS from "dfx/DiscordGateway/DiscordWS"
import { JsonDiscordWSCodecLive } from "dfx/DiscordGateway/DiscordWS"
import * as Shard from "dfx/DiscordGateway/Shard"
import { MemoryShardStateStoreLive } from "dfx/DiscordGateway/Shard/StateStore"
import * as SendEvent from "dfx/DiscordGateway/Shard/sendEvents"
import * as ShardStore from "dfx/DiscordGateway/ShardStore"
import { MemoryShardStoreLive } from "dfx/DiscordGateway/ShardStore"
Expand Down Expand Up @@ -31,6 +32,7 @@ export const DiscordLive = Layer.mergeAll(
Layer.provide(JsonDiscordWSCodecLive),
Layer.provide(MemoryRateLimitStoreLive),
Layer.provide(MemoryShardStoreLive),
Layer.provide(MemoryShardStateStoreLive),
)

export const DiscordIxLive = InteractionsRegistryLive.pipe(
Expand Down

0 comments on commit 7ba6409

Please sign in to comment.