diff --git a/src/ws/proxy/shard.ts b/src/ws/proxy/shard.ts index 419bf9535..9f8b60796 100644 --- a/src/ws/proxy/shard.ts +++ b/src/ws/proxy/shard.ts @@ -81,14 +81,16 @@ export async function identify(shardID: number, maxShards: number) { }, }); - socket.send( - JSON.stringify( - { - op: GatewayOpcode.Identify, - d: { ...ws.identifyPayload, shard: [shardID, maxShards] }, - }, - ), - ); + socket.onopen = () => { + socket.send( + JSON.stringify( + { + op: GatewayOpcode.Identify, + d: { ...ws.identifyPayload, shard: [shardID, maxShards] }, + }, + ), + ); + }; return new Promise((resolve, reject) => { ws.loadingShards.set(shardID, { @@ -151,93 +153,7 @@ export async function createShard(shardID: number) { ws.log("ERROR", { shardID, error: errorEvent }); }; - socket.onmessage = ({ data: message }) => { - if (message instanceof ArrayBuffer) { - message = new Uint8Array(message); - } - - if (message instanceof Uint8Array) { - message = decompressWith( - message, - 0, - (slice: Uint8Array) => ws.utf8decoder.decode(slice), - ); - } - - if (typeof message !== "string") return; - - const messageData = JSON.parse(message); - ws.log("RAW", messageData); - - switch (messageData.op) { - case GatewayOpcode.Hello: - ws.heartbeat( - shardID, - (messageData.d as DiscordHeartbeatPayload).heartbeat_interval, - ); - break; - case GatewayOpcode.HeartbeatACK: - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.heartbeat.acknowledged = true; - } - break; - case GatewayOpcode.Reconnect: - ws.log("RECONNECT", { shardID }); - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = true; - } - - resume(shardID); - break; - case GatewayOpcode.InvalidSession: - ws.log("INVALID_SESSION", { shardID, payload: messageData }); - - // When d is false we need to reidentify - if (!messageData.d) { - identify(shardID, ws.maxShards); - break; - } - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = true; - } - - resume(shardID); - break; - default: - if (messageData.t === "RESUMED") { - ws.log("RESUMED", { shardID }); - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = false; - } - break; - } - - // Important for RESUME - if (messageData.t === "READY") { - const shard = ws.shards.get(shardID); - if (shard) { - shard.sessionID = (messageData.d as ReadyPayload).session_id; - } - - ws.loadingShards.get(shardID)?.resolve(true); - ws.loadingShards.delete(shardID); - } - - // Update the sequence number if it is present - if (messageData.s) { - const shard = ws.shards.get(shardID); - if (shard) { - shard.previousSequenceNumber = messageData.s; - } - } - - ws.handleDiscordPayload(messageData, shardID); - break; - } - }; + socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID); socket.onclose = (event) => { ws.log("CLOSED", { shardID, payload: event }); @@ -273,6 +189,96 @@ export async function createShard(shardID: number) { return socket; } +/** Handler for handling every message event from websocket. */ +// deno-lint-ignore no-explicit-any +export function handleOnMessage(message: any, shardID: number) { + if (message instanceof ArrayBuffer) { + message = new Uint8Array(message); + } + + if (message instanceof Uint8Array) { + message = decompressWith( + message, + 0, + (slice: Uint8Array) => ws.utf8decoder.decode(slice), + ); + } + + if (typeof message !== "string") return; + + const messageData = JSON.parse(message); + ws.log("RAW", messageData); + + switch (messageData.op) { + case GatewayOpcode.Hello: + ws.heartbeat( + shardID, + (messageData.d as DiscordHeartbeatPayload).heartbeat_interval, + ); + break; + case GatewayOpcode.HeartbeatACK: + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.heartbeat.acknowledged = true; + } + break; + case GatewayOpcode.Reconnect: + ws.log("RECONNECT", { shardID }); + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + case GatewayOpcode.InvalidSession: + ws.log("INVALID_SESSION", { shardID, payload: messageData }); + + // When d is false we need to reidentify + if (!messageData.d) { + identify(shardID, ws.maxShards); + break; + } + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + default: + if (messageData.t === "RESUMED") { + ws.log("RESUMED", { shardID }); + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = false; + } + break; + } + + // Important for RESUME + if (messageData.t === "READY") { + const shard = ws.shards.get(shardID); + if (shard) { + shard.sessionID = (messageData.d as ReadyPayload).session_id; + } + + ws.loadingShards.get(shardID)?.resolve(true); + ws.loadingShards.delete(shardID); + } + + // Update the sequence number if it is present + if (messageData.s) { + const shard = ws.shards.get(shardID); + if (shard) { + shard.previousSequenceNumber = messageData.s; + } + } + + ws.handleDiscordPayload(messageData, shardID); + break; + } +} + /** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */ export async function handleDiscordPayload( data: DiscordPayload,