diff --git a/src/ws/cleanup_loading_shards.ts b/src/ws/cleanup_loading_shards.ts new file mode 100644 index 000000000..7b194e8b2 --- /dev/null +++ b/src/ws/cleanup_loading_shards.ts @@ -0,0 +1,24 @@ +import { delay } from "../util/utils.ts"; +import { ws } from "./ws.ts"; + +/** The handler to clean up shards that identified but never received a READY. */ +export async function cleanupLoadingShards() { + while (ws.loadingShards.size) { + const now = Date.now(); + ws.loadingShards.forEach((loadingShard) => { + console.log( + now > loadingShard.startedAt + 60000, + now, + loadingShard.startedAt + ); + // Not a minute yet. Max should be few seconds but do a minute to be safe. + if (now < loadingShard.startedAt + 60000) return; + + loadingShard.reject( + `[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.` + ); + }); + + await delay(1000); + } +} diff --git a/src/ws/create_shard.ts b/src/ws/create_shard.ts new file mode 100644 index 000000000..268a6da58 --- /dev/null +++ b/src/ws/create_shard.ts @@ -0,0 +1,49 @@ +import { identify } from "./identify.ts"; +import { handleOnMessage } from "./proxy/shard.ts"; +import { resume } from "./resume.ts"; +import { ws } from "./ws.ts"; + +// deno-lint-ignore require-await +export async function createShard(shardID: number) { + const socket = new WebSocket(ws.botGatewayData.url); + socket.binaryType = "arraybuffer"; + + socket.onerror = (errorEvent) => { + ws.log("ERROR", { shardID, error: errorEvent }); + }; + + socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID); + + socket.onclose = (event) => { + ws.log("CLOSED", { shardID, payload: event }); + + // TODO: ENUM FOR THESE CODES? + switch (event.code) { + case 4001: + case 4002: + case 4004: + case 4005: + case 4010: + case 4011: + case 4012: + case 4013: + case 4014: + throw new Error( + event.reason || "Discord gave no reason! GG! You broke Discord!" + ); + // THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY! + case 4003: + case 4007: + case 4008: + case 4009: + ws.log("CLOSED_RECONNECT", { shardID, payload: event }); + identify(shardID, ws.maxShards); + break; + default: + resume(shardID); + break; + } + }; + + return socket; +} diff --git a/src/ws/proxy/events.ts b/src/ws/events.ts similarity index 96% rename from src/ws/proxy/events.ts rename to src/ws/events.ts index 1411ceee6..6932cecf5 100644 --- a/src/ws/proxy/events.ts +++ b/src/ws/events.ts @@ -1,4 +1,3 @@ -import { DiscordPayload } from "../../types/discord.ts"; import { DiscordenoShard } from "./ws.ts"; /** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */ diff --git a/src/ws/handle_discord_payload.ts b/src/ws/handle_discord_payload.ts new file mode 100644 index 000000000..80b7562ef --- /dev/null +++ b/src/ws/handle_discord_payload.ts @@ -0,0 +1,18 @@ +import { ws } from "./ws.ts"; + +/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */ +export async function handleDiscordPayload( + data: DiscordPayload, + shardID: number +) { + await fetch(ws.url, { + headers: { + authorization: ws.secretKey, + }, + method: "post", + body: JSON.stringify({ + shardID, + data, + }), + }).catch(console.error); +} diff --git a/src/ws/handle_on_message.ts b/src/ws/handle_on_message.ts new file mode 100644 index 000000000..aacdd4b57 --- /dev/null +++ b/src/ws/handle_on_message.ts @@ -0,0 +1,94 @@ +import { identify } from "./identify.ts"; +import { resume } from "./resume.ts"; +import { ws } from "./ws.ts"; +import { decompressWith } from "./deps.ts"; +import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; +import { DiscordReady } from "../types/gateway/ready.ts"; + +/** Handler for handling every message event from websocket. */ +// deno-lint-ignore no-explicit-any +export function handleOnMessage(message: any, shardID: number) { + if (message instanceof ArrayBuffer) { + message = new Uint8Array(message); + } + + if (message instanceof Uint8Array) { + message = decompressWith(message, 0, (slice: Uint8Array) => + ws.utf8decoder.decode(slice) + ); + } + + if (typeof message !== "string") return; + + const messageData = JSON.parse(message); + ws.log("RAW", messageData); + + switch (messageData.op) { + case DiscordGatewayOpcodes.Hello: + ws.heartbeat( + shardID, + (messageData.d as DiscordHeartbeat).heartbeat_interval + ); + break; + case DiscordGatewayOpcodes.HeartbeatACK: + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.heartbeat.acknowledged = true; + } + break; + case DiscordGatewayOpcodes.Reconnect: + ws.log("RECONNECT", { shardID }); + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + case DiscordGatewayOpcodes.InvalidSession: + ws.log("INVALID_SESSION", { shardID, payload: messageData }); + + // When d is false we need to reidentify + if (!messageData.d) { + identify(shardID, ws.maxShards); + break; + } + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + default: + if (messageData.t === "RESUMED") { + ws.log("RESUMED", { shardID }); + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = false; + } + break; + } + + // Important for RESUME + if (messageData.t === "READY") { + const shard = ws.shards.get(shardID); + if (shard) { + shard.sessionID = (messageData.d as DiscordReady).session_id; + } + + ws.loadingShards.get(shardID)?.resolve(true); + ws.loadingShards.delete(shardID); + } + + // Update the sequence number if it is present + if (messageData.s) { + const shard = ws.shards.get(shardID); + if (shard) { + shard.previousSequenceNumber = messageData.s; + } + } + + ws.handleDiscordPayload(messageData, shardID); + break; + } +} diff --git a/src/ws/heartbeat.ts b/src/ws/heartbeat.ts new file mode 100644 index 000000000..b9a399678 --- /dev/null +++ b/src/ws/heartbeat.ts @@ -0,0 +1,40 @@ +import { ws } from "./ws.ts"; +import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; + +export function heartbeat(shardID: number, interval: number) { + ws.log("HEARTBEATING_STARTED", { shardID, interval }); + + const shard = ws.shards.get(shardID); + if (!shard) return; + + ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard }); + + shard.heartbeat.keepAlive = true; + shard.heartbeat.acknowledged = false; + shard.heartbeat.lastSentAt = Date.now(); + shard.heartbeat.interval = interval; + + shard.heartbeat.intervalID = setInterval(() => { + const currentShard = ws.shards.get(shardID); + if (!currentShard) return; + + ws.log("HEARTBEATING", { shardID, shard: currentShard }); + + if ( + currentShard.ws.readyState === WebSocket.CLOSED || + !currentShard.heartbeat.keepAlive + ) { + ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard }); + + // STOP THE HEARTBEAT + return clearInterval(currentShard.heartbeat.intervalID); + } + + currentShard.ws.send( + JSON.stringify({ + op: DiscordGatewayOpcodes.Heartbeat, + d: currentShard.previousSequenceNumber, + }) + ); + }, interval); +} diff --git a/src/ws/identify.ts b/src/ws/identify.ts new file mode 100644 index 000000000..6b438debd --- /dev/null +++ b/src/ws/identify.ts @@ -0,0 +1,47 @@ +import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; +import { ws } from "./ws.ts"; + +export async function identify(shardID: number, maxShards: number) { + ws.log("IDENTIFYING", { shardID, maxShards }); + + // CREATE A SHARD + const socket = await ws.createShard(shardID); + + // Identify can just set/reset the settings for the shard + ws.shards.set(shardID, { + id: shardID, + ws: socket, + resumeInterval: 0, + sessionID: "", + previousSequenceNumber: 0, + resuming: false, + heartbeat: { + lastSentAt: 0, + lastReceivedAt: 0, + acknowledged: false, + keepAlive: false, + interval: 0, + intervalID: 0, + }, + }); + + socket.onopen = () => { + socket.send( + JSON.stringify({ + op: DiscordGatewayOpcodes.Identify, + d: { ...ws.identifyPayload, shard: [shardID, maxShards] }, + }) + ); + }; + + return new Promise((resolve, reject) => { + ws.loadingShards.set(shardID, { + shardID, + resolve, + reject, + startedAt: Date.now(), + }); + + ws.cleanupLoadingShards(); + }); +} diff --git a/src/ws/mod.ts b/src/ws/mod.ts index 907ca87b3..dba9b02e8 100644 --- a/src/ws/mod.ts +++ b/src/ws/mod.ts @@ -1,2 +1,14 @@ -export * from "./shard.ts"; -export * from "./shard_manager.ts"; +export * from "./cleanup_loading_shards.ts"; +export * from "./create_shard.ts"; +export * from "./events.ts"; +export * from "./handle_discord_payload.ts"; +export * from "./handle_on_message.ts"; +export * from "./heartbeat.ts"; +export * from "./identify.ts"; +export * from "./resharder.ts"; +export * from "./resume.ts"; +export * from "./spawn_shards.ts"; +export * from "./start_gateway_options.ts"; +export * from "./start_gateway.ts"; +export * from "./tell_cluster_to_identify.ts"; +export * from "./ws.ts"; diff --git a/src/ws/proxy/deps.ts b/src/ws/proxy/deps.ts deleted file mode 100644 index fef4dd6c7..000000000 --- a/src/ws/proxy/deps.ts +++ /dev/null @@ -1 +0,0 @@ -export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js"; diff --git a/src/ws/proxy/manager.ts b/src/ws/proxy/manager.ts deleted file mode 100644 index 59f72a88a..000000000 --- a/src/ws/proxy/manager.ts +++ /dev/null @@ -1,167 +0,0 @@ -import { DiscordBotGatewayData } from "../../types/discord.ts"; -import { Intents } from "../../types/options.ts"; -import { Collection } from "../../util/collection.ts"; -import { ws } from "./ws.ts"; -import { delay } from "../../util/utils.ts"; - -/** ADVANCED DEVS ONLY!!!!!! - * Starts the standalone gateway. - * This will require starting the bot separately. - */ -export async function startGateway(options: StartGatewayOptions) { - ws.identifyPayload.token = `Bot ${options.token}`; - ws.secretKey = options.secretKey; - ws.firstShardID = options.firstShardID; - ws.url = options.url; - if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster; - if (options.maxClusters) ws.maxClusters = options.maxClusters; - - if (options.compress) { - ws.identifyPayload.compress = options.compress; - } - if (options.reshard) ws.reshard = options.reshard; - // Once an hour check if resharding is necessary - setInterval(ws.resharder, 1000 * 60 * 60); - - ws.identifyPayload.intents = options.intents.reduce( - (bits, next) => (bits |= typeof next === "string" ? Intents[next] : next), - 0, - ); - - const data = await fetch( - `https://discord.com/api/gateway/bot`, - { headers: { Authorization: ws.identifyPayload.token } }, - ).then((res) => res.json()) as DiscordBotGatewayData; - - ws.maxShards = options.maxShards || data.shards; - ws.lastShardID = options.lastShardID || data.shards - 1; - - // TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE - // ws.botGatewayData = snakeToCamel(await getGatewayBot()) - ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total; - ws.botGatewayData.sessionStartLimit.resetAfter = - data.session_start_limit.reset_after; - ws.botGatewayData.sessionStartLimit.remaining = - data.session_start_limit.remaining; - ws.botGatewayData.sessionStartLimit.maxConcurrency = - data.session_start_limit.max_concurrency; - ws.botGatewayData.shards = data.shards; - ws.botGatewayData.url = data.url; - - ws.spawnShards(ws.firstShardID); - ws.cleanupLoadingShards(); -} - -/** Begin spawning shards. */ -export function spawnShards(firstShardID = 0) { - /** Stored as bucketID: [clusterID, [ShardIDs]] */ - const buckets = new Collection(); - const maxShards = ws.maxShards || ws.botGatewayData.shards; - let cluster = 0; - - for ( - let index = firstShardID; - index < ws.botGatewayData.sessionStartLimit.maxConcurrency; - index++ - ) { - // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS - for (let i = 0; i < maxShards; i++) { - const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency; - const bucket = buckets.get(bucketID); - - if (!bucket) { - // Create the bucket since it doesnt exist - buckets.set(bucketID, [[cluster, i]]); - - if (cluster + 1 <= ws.maxClusters) cluster++; - } else { - // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE - const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1); - if (queue) { - // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE - queue.push(i); - } else { - if (cluster + 1 <= ws.maxClusters) cluster++; - // ADD A NEW QUEUE FOR THIS SHARD - bucket.push([cluster, i]); - } - } - } - } - - // SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP - buckets.forEach(async (bucket, bucketID) => { - for (const [clusterID, ...queue] of bucket) { - let shardID = queue.shift(); - - while (shardID !== undefined) { - await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID); - shardID = queue.shift(); - } - } - }); -} - -/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */ -export async function tellClusterToIdentify( - workerID: number, - shardID: number, - bucketID: number, -) { - // When resharding this may exist already - const oldShard = ws.shards.get(shardID); - - // TODO: Use workers - await ws.identify(shardID, ws.maxShards); - - if (oldShard) { - oldShard.ws.close(4009, "Resharded!"); - } -} - -/** The handler to clean up shards that identified but never received a READY. */ -export async function cleanupLoadingShards() { - while (ws.loadingShards.size) { - const now = Date.now(); - ws.loadingShards.forEach((loadingShard) => { - console.log( - now > loadingShard.startedAt + 60000, - now, - loadingShard.startedAt, - ); - // Not a minute yet. Max should be few seconds but do a minute to be safe. - if (now < loadingShard.startedAt + 60000) return; - - loadingShard.reject( - `[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.`, - ); - }); - - await delay(1000); - } -} - -export interface StartGatewayOptions { - /** The bot token. */ - token: string; - /** Whether or not to use compression for gateway payloads. */ - compress?: boolean; - /** The intents you would like to enable. */ - intents: (Intents | keyof typeof Intents)[]; - /** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */ - maxShards?: number; - /** The first shard ID for this group of shards. */ - firstShardID: number; - /** The last shard ID for this group. If none is provided, it will default to loading all shards. */ - lastShardID?: number; - /** The url to forward all payloads to. */ - url: string; - /** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */ - shardsPerCluster?: number; - /** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */ - maxClusters?: number; - /** Whether or not you want to allow automated sharding. By default this is true. */ - reshard?: boolean; - /** The authorization key that the bot http server will expect. */ - secretKey: string; -} diff --git a/src/ws/proxy/mod.ts b/src/ws/proxy/mod.ts deleted file mode 100644 index b345c0567..000000000 --- a/src/ws/proxy/mod.ts +++ /dev/null @@ -1,5 +0,0 @@ -export * from "./events.ts"; -export * from "./manager.ts"; -export * from "./resharder.ts"; -export * from "./shard.ts"; -export * from "./ws.ts"; diff --git a/src/ws/proxy/shard.ts b/src/ws/proxy/shard.ts deleted file mode 100644 index c46c3f887..000000000 --- a/src/ws/proxy/shard.ts +++ /dev/null @@ -1,297 +0,0 @@ -import { - DiscordHeartbeatPayload, - DiscordPayload, - GatewayOpcode, - ReadyPayload, -} from "../../types/discord.ts"; -import { decompressWith } from "./deps.ts"; -import { ws } from "./ws.ts"; - -export async function resume(shardID: number) { - ws.log("RESUMING", { shardID }); - - // CREATE A SHARD - const socket = await ws.createShard(shardID); - - // NOW WE HANDLE RESUMING THIS SHARD - // Get the old data for this shard necessary for resuming - const oldShard = ws.shards.get(shardID); - - if (oldShard) { - // HOW TO CLOSE OLD SHARD SOCKET!!! - oldShard.ws.close(4009, "Resuming the shard, closing old shard."); - // STOP OLD HEARTBEAT - clearInterval(oldShard.heartbeat.intervalID); - } - - const sessionID = oldShard?.sessionID || ""; - const previousSequenceNumber = oldShard?.previousSequenceNumber || 0; - - ws.shards.set(shardID, { - id: shardID, - ws: socket, - resumeInterval: 0, - sessionID, - previousSequenceNumber, - resuming: false, - heartbeat: { - lastSentAt: 0, - lastReceivedAt: 0, - acknowledged: false, - keepAlive: false, - interval: 0, - intervalID: 0, - }, - }); - - // Resume on open - socket.onopen = () => { - socket.send(JSON.stringify({ - op: GatewayOpcode.Resume, - d: { - token: ws.identifyPayload.token, - session_id: sessionID, - seq: previousSequenceNumber, - }, - })); - }; -} - -export async function identify(shardID: number, maxShards: number) { - ws.log("IDENTIFYING", { shardID, maxShards }); - - // CREATE A SHARD - const socket = await ws.createShard(shardID); - - // Identify can just set/reset the settings for the shard - ws.shards.set(shardID, { - id: shardID, - ws: socket, - resumeInterval: 0, - sessionID: "", - previousSequenceNumber: 0, - resuming: false, - heartbeat: { - lastSentAt: 0, - lastReceivedAt: 0, - acknowledged: false, - keepAlive: false, - interval: 0, - intervalID: 0, - }, - }); - - socket.onopen = () => { - socket.send( - JSON.stringify( - { - op: GatewayOpcode.Identify, - d: { ...ws.identifyPayload, shard: [shardID, maxShards] }, - }, - ), - ); - }; - - return new Promise((resolve, reject) => { - ws.loadingShards.set(shardID, { - shardID, - resolve, - reject, - startedAt: Date.now(), - }); - - ws.cleanupLoadingShards(); - }); -} - -export function heartbeat(shardID: number, interval: number) { - ws.log("HEARTBEATING_STARTED", { shardID, interval }); - - const shard = ws.shards.get(shardID); - if (!shard) return; - - ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard }); - - shard.heartbeat.keepAlive = true; - shard.heartbeat.acknowledged = false; - shard.heartbeat.lastSentAt = Date.now(); - shard.heartbeat.interval = interval; - - shard.heartbeat.intervalID = setInterval(() => { - const currentShard = ws.shards.get(shardID); - if (!currentShard) return; - - ws.log("HEARTBEATING", { shardID, shard: currentShard }); - - if ( - currentShard.ws.readyState === WebSocket.CLOSED || - !currentShard.heartbeat.keepAlive - ) { - ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard }); - - // STOP THE HEARTBEAT - return clearInterval(currentShard.heartbeat.intervalID); - } - - currentShard.ws.send( - JSON.stringify( - { - op: GatewayOpcode.Heartbeat, - d: currentShard.previousSequenceNumber, - }, - ), - ); - }, interval); -} - -// deno-lint-ignore require-await -export async function createShard(shardID: number) { - const socket = new WebSocket(ws.botGatewayData.url); - socket.binaryType = "arraybuffer"; - - socket.onerror = (errorEvent) => { - ws.log("ERROR", { shardID, error: errorEvent }); - }; - - socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID); - - socket.onclose = (event) => { - ws.log("CLOSED", { shardID, payload: event }); - - // TODO: ENUM FOR THESE CODES? - switch (event.code) { - case 4001: - case 4002: - case 4004: - case 4005: - case 4010: - case 4011: - case 4012: - case 4013: - case 4014: - throw new Error( - event.reason || "Discord gave no reason! GG! You broke Discord!", - ); - // THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY! - case 4003: - case 4007: - case 4008: - case 4009: - ws.log("CLOSED_RECONNECT", { shardID, payload: event }); - identify(shardID, ws.maxShards); - break; - default: - resume(shardID); - break; - } - }; - - return socket; -} - -/** Handler for handling every message event from websocket. */ -// deno-lint-ignore no-explicit-any -export function handleOnMessage(message: any, shardID: number) { - if (message instanceof ArrayBuffer) { - message = new Uint8Array(message); - } - - if (message instanceof Uint8Array) { - message = decompressWith( - message, - 0, - (slice: Uint8Array) => ws.utf8decoder.decode(slice), - ); - } - - if (typeof message !== "string") return; - - const messageData = JSON.parse(message); - ws.log("RAW", messageData); - - switch (messageData.op) { - case GatewayOpcode.Hello: - ws.heartbeat( - shardID, - (messageData.d as DiscordHeartbeatPayload).heartbeat_interval, - ); - break; - case GatewayOpcode.HeartbeatACK: - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.heartbeat.acknowledged = true; - } - break; - case GatewayOpcode.Reconnect: - ws.log("RECONNECT", { shardID }); - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = true; - } - - resume(shardID); - break; - case GatewayOpcode.InvalidSession: - ws.log("INVALID_SESSION", { shardID, payload: messageData }); - - // When d is false we need to reidentify - if (!messageData.d) { - identify(shardID, ws.maxShards); - break; - } - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = true; - } - - resume(shardID); - break; - default: - if (messageData.t === "RESUMED") { - ws.log("RESUMED", { shardID }); - - if (ws.shards.has(shardID)) { - ws.shards.get(shardID)!.resuming = false; - } - break; - } - - // Important for RESUME - if (messageData.t === "READY") { - const shard = ws.shards.get(shardID); - if (shard) { - shard.sessionID = (messageData.d as ReadyPayload).session_id; - } - - ws.loadingShards.get(shardID)?.resolve(true); - ws.loadingShards.delete(shardID); - } - - // Update the sequence number if it is present - if (messageData.s) { - const shard = ws.shards.get(shardID); - if (shard) { - shard.previousSequenceNumber = messageData.s; - } - } - - ws.handleDiscordPayload(messageData, shardID); - break; - } -} - -/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */ -export async function handleDiscordPayload( - data: DiscordPayload, - shardID: number, -) { - await fetch(ws.url, { - headers: { - authorization: ws.secretKey, - }, - method: "post", - body: JSON.stringify({ - shardID, - data, - }), - }).catch(console.error); -} diff --git a/src/ws/proxy/resharder.ts b/src/ws/resharder.ts similarity index 94% rename from src/ws/proxy/resharder.ts rename to src/ws/resharder.ts index b3a8ca4f2..20edb87c3 100644 --- a/src/ws/proxy/resharder.ts +++ b/src/ws/resharder.ts @@ -1,5 +1,5 @@ -import { getGatewayBot } from "../../api/handlers/gateway.ts"; import { ws } from "./ws.ts"; +import { getGatewayBot } from "../helpers/misc/get_gateway_bot.ts"; /** The handler to automatically reshard when necessary. */ export async function resharder() { diff --git a/src/ws/resume.ts b/src/ws/resume.ts new file mode 100644 index 000000000..6f0c204a1 --- /dev/null +++ b/src/ws/resume.ts @@ -0,0 +1,54 @@ +import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; +import { ws } from "./ws.ts"; + +export async function resume(shardID: number) { + ws.log("RESUMING", { shardID }); + + // CREATE A SHARD + const socket = await ws.createShard(shardID); + + // NOW WE HANDLE RESUMING THIS SHARD + // Get the old data for this shard necessary for resuming + const oldShard = ws.shards.get(shardID); + + if (oldShard) { + // HOW TO CLOSE OLD SHARD SOCKET!!! + oldShard.ws.close(4009, "Resuming the shard, closing old shard."); + // STOP OLD HEARTBEAT + clearInterval(oldShard.heartbeat.intervalID); + } + + const sessionID = oldShard?.sessionID || ""; + const previousSequenceNumber = oldShard?.previousSequenceNumber || 0; + + ws.shards.set(shardID, { + id: shardID, + ws: socket, + resumeInterval: 0, + sessionID, + previousSequenceNumber, + resuming: false, + heartbeat: { + lastSentAt: 0, + lastReceivedAt: 0, + acknowledged: false, + keepAlive: false, + interval: 0, + intervalID: 0, + }, + }); + + // Resume on open + socket.onopen = () => { + socket.send( + JSON.stringify({ + op: DiscordGatewayOpcodes.Resume, + d: { + token: ws.identifyPayload.token, + session_id: sessionID, + seq: previousSequenceNumber, + }, + }) + ); + }; +} diff --git a/src/ws/shard_manager.ts b/src/ws/shard_manager.ts deleted file mode 100644 index e02a93c21..000000000 --- a/src/ws/shard_manager.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { eventHandlers } from "../bot.ts"; -import { cache } from "../cache.ts"; -import { handlers } from "../handlers/mod.ts"; -import { Member } from "../structures/mod.ts"; -import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; -import { Collection } from "../util/collection.ts"; -import { delay } from "../util/utils.ts"; -import { createShard, requestGuildMembers } from "./mod.ts"; - -let createNextShard = true; - -/** This function is meant to be used on the ready event to alert the library to start the next shard. */ -export function allowNextShard(enabled = true) { - createNextShard = enabled; -} - -export async function spawnShards( - data: DiscordBotGatewayData, - payload: DiscordIdentify, - shardId: number, - lastShardId: number, - skipChecks?: number, -) { - // All shards on this worker have started! Cancel out. - if (shardId >= lastShardId) return; - - if (skipChecks) { - payload.shard = [ - shardId, - data.shards > lastShardId ? data.shards : lastShardId, - ]; - // Start The shard - createShard(data, payload, false, shardId); - // Spawn next shard - await spawnShards( - data, - payload, - shardId + 1, - lastShardId, - skipChecks - 1, - ); - return; - } - - // Make sure we can create a shard or we are waiting for shards to connect still. - if (createNextShard) { - createNextShard = false; - // Start the next few shards based on max concurrency - await spawnShards( - data, - payload, - shardId, - lastShardId, - data.session_start_limit.max_concurrency, - ); - return; - } - - await delay(1000); - await spawnShards(data, payload, shardId, lastShardId, skipChecks); -} - -export async function handleDiscordPayload( - data: DiscordPayload, - shardId: number, -) { - eventHandlers.raw?.(data); - await eventHandlers.dispatchRequirements?.(data, shardId); - - switch (data.op) { - case DiscordGatewayOpcodes.HeartbeatACK: - // In case the user wants to listen to heartbeat responses - return eventHandlers.heartbeat?.(); - case DiscordGatewayOpcodes.Dispatch: - if (!data.t) return; - // Run the appropriate handler for this event. - return handlers[data.t]?.(data, shardId); - default: - return; - } -} - -export async function requestAllMembers( - guildId: string, - shardId: number, - resolve: ( - value: Collection | PromiseLike>, - ) => void, - options?: FetchMembersOptions, -) { - const nonce = `${guildId}-${Date.now()}`; - cache.fetchAllMembersProcessingRequests.set(nonce, resolve); - - await requestGuildMembers( - guildId, - shardId, - nonce, - options, - ); -} diff --git a/src/ws/spawn_shards.ts b/src/ws/spawn_shards.ts new file mode 100644 index 000000000..bbefd18af --- /dev/null +++ b/src/ws/spawn_shards.ts @@ -0,0 +1,52 @@ +import { Collection } from "../util/collection.ts"; +import { ws } from "./ws.ts"; + +/** Begin spawning shards. */ +export function spawnShards(firstShardID = 0) { + /** Stored as bucketID: [clusterID, [ShardIDs]] */ + const buckets = new Collection(); + const maxShards = ws.maxShards || ws.botGatewayData.shards; + let cluster = 0; + + for ( + let index = firstShardID; + index < ws.botGatewayData.sessionStartLimit.maxConcurrency; + index++ + ) { + // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS + for (let i = 0; i < maxShards; i++) { + const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency; + const bucket = buckets.get(bucketID); + + if (!bucket) { + // Create the bucket since it doesnt exist + buckets.set(bucketID, [[cluster, i]]); + + if (cluster + 1 <= ws.maxClusters) cluster++; + } else { + // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE + const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1); + if (queue) { + // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE + queue.push(i); + } else { + if (cluster + 1 <= ws.maxClusters) cluster++; + // ADD A NEW QUEUE FOR THIS SHARD + bucket.push([cluster, i]); + } + } + } + } + + // SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP + buckets.forEach(async (bucket, bucketID) => { + for (const [clusterID, ...queue] of bucket) { + let shardID = queue.shift(); + + while (shardID !== undefined) { + await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID); + shardID = queue.shift(); + } + } + }); +} diff --git a/src/ws/start_gateway.ts b/src/ws/start_gateway.ts new file mode 100644 index 000000000..6efbb9e94 --- /dev/null +++ b/src/ws/start_gateway.ts @@ -0,0 +1,51 @@ +import { StartGatewayOptions } from "./start_gateway_options.ts"; +import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts"; +import { ws } from "./ws.ts"; + +/** ADVANCED DEVS ONLY!!!!!! + * Starts the standalone gateway. + * This will require starting the bot separately. + */ +export async function startGateway(options: StartGatewayOptions) { + ws.identifyPayload.token = `Bot ${options.token}`; + ws.secretKey = options.secretKey; + ws.firstShardID = options.firstShardID; + ws.url = options.url; + if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster; + if (options.maxClusters) ws.maxClusters = options.maxClusters; + + if (options.compress) { + ws.identifyPayload.compress = options.compress; + } + if (options.reshard) ws.reshard = options.reshard; + // Once an hour check if resharding is necessary + setInterval(ws.resharder, 1000 * 60 * 60); + + ws.identifyPayload.intents = options.intents.reduce( + (bits, next) => + (bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next), + 0 + ); + + const data = (await fetch(`https://discord.com/api/gateway/bot`, { + headers: { Authorization: ws.identifyPayload.token }, + }).then((res) => res.json())) as DiscordBotGatewayData; + + ws.maxShards = options.maxShards || data.shards; + ws.lastShardID = options.lastShardID || data.shards - 1; + + // TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE + // ws.botGatewayData = snakeToCamel(await getGatewayBot()) + ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total; + ws.botGatewayData.sessionStartLimit.resetAfter = + data.session_start_limit.reset_after; + ws.botGatewayData.sessionStartLimit.remaining = + data.session_start_limit.remaining; + ws.botGatewayData.sessionStartLimit.maxConcurrency = + data.session_start_limit.max_concurrency; + ws.botGatewayData.shards = data.shards; + ws.botGatewayData.url = data.url; + + ws.spawnShards(ws.firstShardID); + ws.cleanupLoadingShards(); +} diff --git a/src/ws/start_gateway_options.ts b/src/ws/start_gateway_options.ts new file mode 100644 index 000000000..2d016a8ff --- /dev/null +++ b/src/ws/start_gateway_options.ts @@ -0,0 +1,26 @@ +import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts"; + +export interface StartGatewayOptions { + /** The bot token. */ + token: string; + /** Whether or not to use compression for gateway payloads. */ + compress?: boolean; + /** The intents you would like to enable. */ + intents: (DiscordGatewayIntents | keyof typeof DiscordGatewayIntents)[]; + /** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */ + maxShards?: number; + /** The first shard ID for this group of shards. */ + firstShardID: number; + /** The last shard ID for this group. If none is provided, it will default to loading all shards. */ + lastShardID?: number; + /** The url to forward all payloads to. */ + url: string; + /** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */ + shardsPerCluster?: number; + /** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */ + maxClusters?: number; + /** Whether or not you want to allow automated sharding. By default this is true. */ + reshard?: boolean; + /** The authorization key that the bot http server will expect. */ + secretKey: string; +} diff --git a/src/ws/tell_cluster_to_identify.ts b/src/ws/tell_cluster_to_identify.ts new file mode 100644 index 000000000..7ef75968a --- /dev/null +++ b/src/ws/tell_cluster_to_identify.ts @@ -0,0 +1,18 @@ +import { ws } from "./ws.ts"; + +/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */ +export async function tellClusterToIdentify( + workerID: number, + shardID: number, + bucketID: number +) { + // When resharding this may exist already + const oldShard = ws.shards.get(shardID); + + // TODO: Use workers + await ws.identify(shardID, ws.maxShards); + + if (oldShard) { + oldShard.ws.close(4009, "Resharded!"); + } +} diff --git a/src/ws/proxy/ws.ts b/src/ws/ws.ts similarity index 100% rename from src/ws/proxy/ws.ts rename to src/ws/ws.ts