diff --git a/src/ws/proxy/deps.ts b/src/ws/proxy/deps.ts new file mode 100644 index 000000000..fef4dd6c7 --- /dev/null +++ b/src/ws/proxy/deps.ts @@ -0,0 +1 @@ +export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js"; diff --git a/src/ws/proxy/manager.ts b/src/ws/proxy/manager.ts new file mode 100644 index 000000000..e66fe5795 --- /dev/null +++ b/src/ws/proxy/manager.ts @@ -0,0 +1,80 @@ +import { getGatewayBot } from "../../api/handlers/gateway.ts"; +import { Intents } from "../../types/options.ts"; +import { ws } from "./ws.ts"; + +/** ADVANCED DEVS ONLY!!!!!! + * Starts the standalone gateway. + * This will require starting the bot separately. + */ +export async function startGateway(options: StartGatewayOptions) { + ws.identifyPayload.token = `Bot ${options.token}`; + ws.firstShardID = options.firstShardID; + ws.url = options.url; + + if (options.compress) { + ws.identifyPayload.compress = options.compress; + } + + ws.identifyPayload.intents = options.intents.reduce( + (bits, next) => (bits |= typeof next === "string" ? Intents[next] : next), + 0, + ); + + const data = await getGatewayBot(); + ws.maxShards = options.maxShards || data.shards; + ws.lastShardID = options.lastShardID || data.shards - 1; + + // TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE + // ws.botGatewayData = snakeToCamel(await getGatewayBot()) + ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total; + ws.botGatewayData.sessionStartLimit.resetAfter = + data.session_start_limit.reset_after; + ws.botGatewayData.sessionStartLimit.remaining = + data.session_start_limit.remaining; + ws.botGatewayData.sessionStartLimit.maxConcurrency = + data.session_start_limit.max_concurrency; + ws.botGatewayData.shards = data.shards; + ws.botGatewayData.url = data.url; + + ws.spawnShards(ws.firstShardID); +} + +export function spawnShards(shardID: number) { + let skipChecks = 0; + + while (shardID <= ws.lastShardID) { + if (skipChecks) { + // Start The shard + ws.identify(shardID, ws.maxShards); + + shardID++; + skipChecks--; + continue; + } + + // Previous shards is still not fully ready. + if (!ws.createNextShard) continue; + + // Allows next iteration to create shard + ws.createNextShard = false; + // Set the amount of shards to start up be the bots max concurrency limit + skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency; + } +} + +export interface StartGatewayOptions { + /** The bot token. */ + token: string; + /** Whether or not to use compression for gateway payloads. */ + compress?: boolean; + /** The intents you would like to enable. */ + intents: (Intents | keyof typeof Intents)[]; + /** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */ + maxShards?: number; + /** The first shard ID for this group of shards. */ + firstShardID: number; + /** The last shard ID for this group. If none is provided, it will default to loading all shards. */ + lastShardID?: number; + /** The url to forward all payloads to. */ + url: string; +} diff --git a/src/ws/proxy/mod.ts b/src/ws/proxy/mod.ts new file mode 100644 index 000000000..e69de29bb diff --git a/src/ws/proxy/shard.ts b/src/ws/proxy/shard.ts new file mode 100644 index 000000000..64e5b1e0e --- /dev/null +++ b/src/ws/proxy/shard.ts @@ -0,0 +1,293 @@ +import { + DiscordHeartbeatPayload, + DiscordPayload, + GatewayOpcode, + ReadyPayload, +} from "../../types/discord.ts"; +import { decompressWith } from "./deps.ts"; +import { ws } from "./ws.ts"; + +export function resume(shardID: number) { + // TODO: Log that this is happening + + // CREATE A SHARD + const socket = ws.createShard(shardID); + + // NOW WE HANDLE RESUMING THIS SHARD + // Get the old data for this shard necessary for resuming + const oldShard = ws.shards.get(shardID); + // TODO: HOW TO CLOSE OLD SHARD SOCKET!!! + // TODO: STOP OLD HEARTBEAT + const sessionID = oldShard?.sessionID || ""; + const previousSequenceNumber = oldShard?.previousSequenceNumber || 0; + + ws.shards.set(shardID, { + id: shardID, + ws: socket, + resumeInterval: 0, + sessionID, + previousSequenceNumber, + resuming: false, + heartbeat: { + lastSentAt: 0, + lastReceivedAt: 0, + acknowledged: false, + keepAlive: false, + interval: 0, + intervalID: 0, + }, + }); + + // Resume on open + socket.onopen = () => { + socket.send(JSON.stringify({ + op: GatewayOpcode.Resume, + d: { + token: ws.identifyPayload.token, + session_id: sessionID, + seq: previousSequenceNumber, + }, + })); + }; +} + +export function identify(shardID: number, maxShards: number) { + // TODO: Log that this is happening + + // CREATE A SHARD + const socket = ws.createShard(shardID); + + // Identify can just set/reset the settings for the shard + ws.shards.set(shardID, { + id: shardID, + ws: socket, + resumeInterval: 0, + sessionID: "", + previousSequenceNumber: 0, + resuming: false, + heartbeat: { + lastSentAt: 0, + lastReceivedAt: 0, + acknowledged: false, + keepAlive: false, + interval: 0, + intervalID: 0, + }, + }); + + socket.send( + JSON.stringify( + { + op: GatewayOpcode.Identify, + d: { ...ws.identifyPayload, shard: [shardID, maxShards] }, + }, + ), + ); +} + +export function heartbeat(shardID: number, interval: number) { + // TODO: Log that this is happening + + const shard = ws.shards.get(shardID); + if (!shard) return; + + shard.heartbeat.keepAlive = true; + shard.heartbeat.acknowledged = false; + shard.heartbeat.lastSentAt = Date.now(); + shard.heartbeat.interval = interval; + + shard.heartbeat.intervalID = setInterval(() => { + // TODO: Log that this is happening + + const currentShard = ws.shards.get(shardID); + if (!currentShard) return; + + if ( + currentShard.ws.readyState === WebSocket.CLOSED || + !currentShard.heartbeat.keepAlive + ) { + // TODO: Log that this is happening + + // STOP THE HEARTBEAT + return clearInterval(currentShard.heartbeat.intervalID); + } + + currentShard.ws.send( + JSON.stringify( + { + op: GatewayOpcode.Heartbeat, + d: currentShard.previousSequenceNumber, + }, + ), + ); + }, interval); +} + +export function createShard(shardID: number) { + const socket = new WebSocket(ws.botGatewayData.url); + socket.binaryType = "arraybuffer"; + + socket.onerror = (errorEvent) => { + // TODO: Log that this is happening + + // eventHandlers.debug?.({ + // type: "wsError", + // data: { shardID, ...errorEvent }, + // }); + }; + + socket.onmessage = ({ data: message }) => { + if (message instanceof ArrayBuffer) { + message = new Uint8Array(message); + } + + if (message instanceof Uint8Array) { + message = decompressWith( + message, + 0, + (slice: Uint8Array) => ws.utf8decoder.decode(slice), + ); + } + + if (typeof message !== "string") return; + + const messageData = JSON.parse(message); + // TODO: Log that this is happening + // if (!messageData.t) eventHandlers.rawGateway?.(messageData); + switch (messageData.op) { + case GatewayOpcode.Hello: + ws.heartbeat( + shardID, + (messageData.d as DiscordHeartbeatPayload).heartbeat_interval, + ); + break; + case GatewayOpcode.HeartbeatACK: + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.heartbeat.acknowledged = true; + } + break; + case GatewayOpcode.Reconnect: + // TODO: Log that this is happening + // eventHandlers.debug?.( + // { type: "gatewayReconnect", data: { shardID } }, + // ); + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + case GatewayOpcode.InvalidSession: + // TODO: Log that this is happening + // eventHandlers.debug?.( + // { + // type: "gatewayInvalidSession", + // data: { shardID, data }, + // }, + // ); + // When d is false we need to reidentify + if (!messageData.d) { + identify(shardID, ws.maxShards); + break; + } + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = true; + } + + resume(shardID); + break; + default: + if (messageData.t === "RESUMED") { + // TODO: Log that this is happening + // eventHandlers.debug?.( + // { type: "gatewayResumed", data: { shardID } }, + // ); + + if (ws.shards.has(shardID)) { + ws.shards.get(shardID)!.resuming = false; + } + break; + } + + // Important for RESUME + if (messageData.t === "READY") { + const shard = ws.shards.get(shardID); + if (shard) { + shard.sessionID = (messageData.d as ReadyPayload).session_id; + } + } + + // Update the sequence number if it is present + if (messageData.s) { + const shard = ws.shards.get(shardID); + if (shard) { + shard.previousSequenceNumber = messageData.s; + } + } + + ws.handleDiscordPayload(messageData, shardID); + break; + } + }; + + socket.onclose = ({ reason, code, wasClean }) => { + // TODO: Log that this is happening + // eventHandlers.debug?.( + // { + // type: "wsClose", + // data: { shardID, code, reason, wasClean }, + // }, + // ); + + // TODO: ENUM FOR THESE CODES? + switch (code) { + case 4001: + case 4002: + case 4004: + case 4005: + case 4010: + case 4011: + case 4012: + case 4013: + case 4014: + throw new Error( + reason || "Discord gave no reason! GG! You broke Discord!", + ); + // THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY! + case 4003: + case 4007: + case 4008: + case 4009: + // TODO: Log that this is happening + // eventHandlers.debug?.({ + // type: "wsReconnect", + // data: { shardID, code, reason, wasClean }, + // }); + identify(shardID, ws.maxShards); + break; + default: + resume(shardID); + break; + } + }; + + return socket; +} + +export async function handleDiscordPayload( + data: DiscordPayload, + shardID: number, +) { + // TODO: Log that this is happening + // eventHandlers.raw?.(data); + // await eventHandlers.dispatchRequirements?.(data, shardID); + + await fetch(ws.url, { + method: "post", + body: JSON.stringify({ + shardID, + data, + }), + }).catch(console.error); +} diff --git a/src/ws/proxy/ws.ts b/src/ws/proxy/ws.ts new file mode 100644 index 000000000..9113cd28d --- /dev/null +++ b/src/ws/proxy/ws.ts @@ -0,0 +1,95 @@ +import { Collection } from "../../util/collection.ts"; +import { spawnShards, startGateway } from "./manager.ts"; +import { + createShard, + handleDiscordPayload, + heartbeat, + identify, +} from "./shard.ts"; + +// CONTROLLER LIKE INTERFACE FOR WS HANDLING +export const ws = { + /** The url that all discord payloads for the dispatch type should be sent to. */ + url: "", + /** The maximum shard ID number. Useful for zero-downtime updates or resharding. */ + maxShards: 1, + /** The first shard ID to start spawning. */ + firstShardID: 0, + /** The last shard ID for this cluster. */ + lastShardID: 1, + /** This prop decides whether Discord allows our next shard to be started. When 1 starts, this is set to false until it is ready for the next one. */ + createNextShard: true, + /** The identify payload holds the necessary data to connect and stay connected with Discords WSS. */ + identifyPayload: { + token: "", + compress: false, + properties: { + $os: "linux", + $browser: "Discordeno", + $device: "Discordeno", + }, + intents: 0, + shard: [0, 0], + }, + botGatewayData: { + /** The WSS URL that can be used for connecting to the gateway. */ + url: "wss://gateway.discord.gg/?v=8&encoding=json", + /** The recommended number of shards to use when connecting. */ + shards: 1, + /** Info on the current start limit. */ + sessionStartLimit: { + /** The total number of session starts the current user is allowed. */ + total: 1000, + /** The remaining number of session starts the current user is allowed. */ + remaining: 1000, + /** Milliseconds left until limit is reset. */ + resetAfter: 0, + /** The number of identify requests allowed per 5 seconds. + * So, if you had a max concurrency of 16, and 16 shards for example, you could start them all up at the same time. + * Whereas if you had 32 shards, if you tried to start up shard 0 and 16 at the same time for example, it would not work. You can start shards 0-15 concurrently, then 16-31... + * */ + maxConcurrency: 1, + }, + }, + shards: new Collection(), + utf8decoder: new TextDecoder(), + + // METHODS + + /** The handler function that starts the gateway. */ + startGateway, + /** The handler for spawning ALL the shards. */ + spawnShards, + createShard, + identify, + heartbeat, + handleDiscordPayload, +}; + +export interface DiscordenoShard { + /** The shard id number */ + id: number; + /** The websocket for this shard */ + ws: WebSocket; + resumeInterval: number; + /** The session id important for resuming connections. */ + sessionID: string; + /** The previous sequence number, important for resuming connections. */ + previousSequenceNumber: number | null; + /** Whether the shard is currently resuming. */ + resuming: boolean; + heartbeat: { + /** The exact timestamp the last heartbeat was sent */ + lastSentAt: number; + /** The timestamp the last heartbeat ACK was received from discord. */ + lastReceivedAt: number; + /** Whether or not the heartbeat was acknowledged by discord in time. */ + acknowledged: boolean; + /** Whether or not to keep heartbeating. Useful for when needing to stop heartbeating. */ + keepAlive: boolean; + /** The interval between heartbeats requested by discord. */ + interval: number; + /** The id of the interval, useful for stopping the interval if ws closed. */ + intervalID: number; + }; +} diff --git a/src/ws/shard_manager.ts b/src/ws/shard_manager.ts index 4d92625fa..38fafb515 100644 --- a/src/ws/shard_manager.ts +++ b/src/ws/shard_manager.ts @@ -39,7 +39,7 @@ export async function spawnShards( shardID, data.shards > lastShardID ? data.shards : lastShardID, ]; - // Start The shard + // Startx The shard await createShard(data, payload, false, shardID); // Spawn next shard await spawnShards(