From dcbf0fcaaa9b2c58b11d16a10121cbdfde515dec Mon Sep 17 00:00:00 2001 From: TriForMine Date: Fri, 8 Oct 2021 08:05:04 +0200 Subject: [PATCH] More conversion --- src/helpers/guilds/edit_guild.ts | 1 - src/helpers/members/fetch_members.ts | 10 ++-- src/helpers/misc/edit_bot_status.ts | 9 ++- src/helpers/voice/connect_to_voice_channel.ts | 5 +- src/util/calculate_shard_id.ts | 8 +-- src/ws/create_shard.ts | 20 +++---- src/ws/handle_discord_payload.ts | 8 +-- src/ws/handle_on_message.ts | 60 +++++++++---------- src/ws/heartbeat.ts | 32 +++++----- src/ws/identify.ts | 23 +++---- src/ws/process_queue.ts | 16 ++--- src/ws/resharder.ts | 26 ++++---- src/ws/resume.ts | 21 +++---- src/ws/send_shard_message.ts | 9 +-- src/ws/spawn_shards.ts | 32 +++++----- src/ws/start_gateway.ts | 42 ++++++------- src/ws/tell_cluster_to_identify.ts | 8 +-- 17 files changed, 166 insertions(+), 164 deletions(-) diff --git a/src/helpers/guilds/edit_guild.ts b/src/helpers/guilds/edit_guild.ts index 760e387ce..6350aea0c 100644 --- a/src/helpers/guilds/edit_guild.ts +++ b/src/helpers/guilds/edit_guild.ts @@ -6,7 +6,6 @@ import type { ModifyGuild } from "../../types/guilds/modify_guild.ts"; import { endpoints } from "../../util/constants.ts"; import { requireBotGuildPermissions } from "../../util/permissions.ts"; import { snakelize, urlToBase64 } from "../../util/utils.ts"; -import { ws } from "../../ws/ws.ts"; /** Modify a guilds settings. Requires the MANAGE_GUILD permission. */ export async function editGuild(guildId: bigint, options: ModifyGuild) { diff --git a/src/helpers/members/fetch_members.ts b/src/helpers/members/fetch_members.ts index cb05f6327..ab87cdcb3 100644 --- a/src/helpers/members/fetch_members.ts +++ b/src/helpers/members/fetch_members.ts @@ -5,7 +5,7 @@ import { Errors } from "../../types/discordeno/errors.ts"; import { DiscordGatewayIntents } from "../../types/gateway/gateway_intents.ts"; import type { RequestGuildMembers } from "../../types/members/request_guild_members.ts"; import { Collection } from "../../util/collection.ts"; -import { ws } from "../../ws/ws.ts"; +import {GatewayManager} from "../../bot.ts"; /** * ⚠️ BEGINNER DEVS!! YOU SHOULD ALMOST NEVER NEED THIS AND YOU CAN GET FROM cache.members.get() @@ -15,10 +15,10 @@ import { ws } from "../../ws/ws.ts"; * REST: 50/s global(across all shards) rate limit with ALL requests this included * GW(this function): 120/m(PER shard) rate limit. Meaning if you have 8 shards your limit is now 960/m. */ -export function fetchMembers(guildId: bigint, shardId: number, options?: Omit) { +export function fetchMembers(gateway: GatewayManager, guildId: bigint, shardId: number, options?: Omit) { // You can request 1 member without the intent - // Check if intents is not 0 as proxy ws won't set intents in other instances - if (ws.identifyPayload.intents && (!options?.limit || options.limit > 1) && !(ws.identifyPayload.intents & DiscordGatewayIntents.GuildMembers)) { + // Check if intents is not 0 as proxy gateway won't set intents in other instances + if (gateway.identifyPayload.intents && (!options?.limit || options.limit > 1) && !(gateway.identifyPayload.intents & DiscordGatewayIntents.GuildMembers)) { throw new Error(Errors.MISSING_INTENT_GUILD_MEMBERS); } @@ -30,7 +30,7 @@ export function fetchMembers(guildId: bigint, shardId: number, options?: Omit) { - ws.shards.forEach((shard) => { +export function editBotStatus(gateway: GatewayManager, data: Omit) { + gateway.shards.forEach((shard) => { eventHandlers.debug?.("loop", `Running forEach loop in editBotStatus function.`); - ws.sendShardMessage(shard, { + gateway.sendShardMessage(gateway, shard, { op: DiscordGatewayOpcodes.StatusUpdate, d: { since: null, diff --git a/src/helpers/voice/connect_to_voice_channel.ts b/src/helpers/voice/connect_to_voice_channel.ts index eb815c083..d56e4cbfe 100644 --- a/src/helpers/voice/connect_to_voice_channel.ts +++ b/src/helpers/voice/connect_to_voice_channel.ts @@ -3,18 +3,19 @@ import type { UpdateVoiceState } from "../../types/voice/update_voice_state.ts"; import { requireBotChannelPermissions } from "../../util/permissions.ts"; import { calculateShardId } from "../../util/calculate_shard_id.ts"; import { snakelize } from "../../util/utils.ts"; -import { ws } from "../../ws/ws.ts"; import type { AtLeastOne } from "../../types/util.ts"; +import {GatewayManager} from "../../bot.ts"; /** Connect or join a voice channel inside a guild. By default, the "selfDeaf" option is true. Requires `CONNECT` and `VIEW_CHANNEL` permissions. */ export async function connectToVoiceChannel( + gateway: GatewayManager, guildId: bigint, channelId: bigint, options?: AtLeastOne> ) { await requireBotChannelPermissions(channelId, ["CONNECT", "VIEW_CHANNEL"]); - ws.sendShardMessage(calculateShardId(guildId), { + gateway.sendShardMessage(gateway, calculateShardId(gateway, guildId), { op: DiscordGatewayOpcodes.VoiceStateUpdate, d: snakelize({ guildId, diff --git a/src/util/calculate_shard_id.ts b/src/util/calculate_shard_id.ts index 12012f09f..e9b7a331b 100644 --- a/src/util/calculate_shard_id.ts +++ b/src/util/calculate_shard_id.ts @@ -1,7 +1,7 @@ -import { ws } from "../ws/ws.ts"; +import {GatewayManager} from "../bot.ts"; -export function calculateShardId(guildId: bigint) { - if (ws.maxShards === 1) return 0; +export function calculateShardId(gateway: GatewayManager, guildId: bigint) { + if (gateway.maxShards === 1) return 0; - return Number((guildId >> 22n) % BigInt(ws.maxShards - 1)); + return Number((guildId >> 22n) % BigInt(gateway.maxShards - 1)); } diff --git a/src/ws/create_shard.ts b/src/ws/create_shard.ts index ee1969d3a..f2909dfe5 100644 --- a/src/ws/create_shard.ts +++ b/src/ws/create_shard.ts @@ -1,25 +1,25 @@ import { DiscordGatewayCloseEventCodes } from "../types/codes/gateway_close_event_codes.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; -export function createShard(shardId: number) { - const socket = new WebSocket(ws.botGatewayData.url); +export function createShard(gateway: GatewayManager, shardId: number) { + const socket = new WebSocket(gateway.botGatewayData.url); socket.binaryType = "arraybuffer"; socket.onerror = (errorEvent) => { - ws.log("ERROR", { shardId, error: errorEvent }); + gateway.log("ERROR", { shardId, error: errorEvent }); }; - socket.onmessage = ({ data: message }) => ws.handleOnMessage(message, shardId); + socket.onmessage = ({ data: message }) => gateway.handleOnMessage(gateway, message, shardId); socket.onclose = async (event) => { - ws.log("CLOSED", { shardId, payload: event }); + gateway.log("CLOSED", { shardId, payload: event }); if (event.code === 3064 || event.reason === "Discordeno Testing Finished! Do Not RESUME!") { return; } if (event.code === 3065 || ["Resharded!", "Resuming the shard, closing old shard."].includes(event.reason)) { - return ws.log("CLOSED_RECONNECT", { shardId, payload: event }); + return gateway.log("CLOSED_RECONNECT", { shardId, payload: event }); } switch (event.code) { @@ -31,7 +31,7 @@ export function createShard(shardId: number) { case 3065: // Reidentifying case 3066: // Missing ACK // Will restart shard manually - return ws.log("CLOSED_RECONNECT", { shardId, payload: event }); + return gateway.log("CLOSED_RECONNECT", { shardId, payload: event }); case DiscordGatewayCloseEventCodes.UnknownOpcode: case DiscordGatewayCloseEventCodes.DecodeError: case DiscordGatewayCloseEventCodes.AuthenticationFailed: @@ -47,10 +47,10 @@ export function createShard(shardId: number) { case DiscordGatewayCloseEventCodes.InvalidSeq: case DiscordGatewayCloseEventCodes.RateLimited: case DiscordGatewayCloseEventCodes.SessionTimedOut: - await ws.identify(shardId, ws.maxShards); + await gateway.identify(gateway, shardId, gateway.maxShards); break; default: - ws.resume(shardId); + gateway.resume(gateway, shardId); break; } }; diff --git a/src/ws/handle_discord_payload.ts b/src/ws/handle_discord_payload.ts index 3ba4fdfcc..6ca60e1e0 100644 --- a/src/ws/handle_discord_payload.ts +++ b/src/ws/handle_discord_payload.ts @@ -1,11 +1,11 @@ import type { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; /** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */ -export async function handleDiscordPayload(data: DiscordGatewayPayload, shardId: number) { - await fetch(ws.url, { +export async function handleDiscordPayload(gateway: GatewayManager, data: DiscordGatewayPayload, shardId: number) { + await fetch(gateway.url, { headers: { - authorization: ws.secretKey, + authorization: gateway.secretKey, }, method: "post", body: JSON.stringify({ diff --git a/src/ws/handle_on_message.ts b/src/ws/handle_on_message.ts index 9dcff9061..e09427d79 100644 --- a/src/ws/handle_on_message.ts +++ b/src/ws/handle_on_message.ts @@ -1,4 +1,4 @@ -import { eventHandlers } from "../bot.ts"; +import {eventHandlers, GatewayManager} from "../bot.ts"; import { handlers } from "../handlers/mod.ts"; import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; import type { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts"; @@ -6,33 +6,33 @@ import type { DiscordHello } from "../types/gateway/hello.ts"; import type { DiscordReady } from "../types/gateway/ready.ts"; import { camelize, delay } from "../util/utils.ts"; import { decompressWith } from "./deps.ts"; -import { ws } from "./ws.ts"; /** Handler for handling every message event from websocket. */ // deno-lint-ignore no-explicit-any -export async function handleOnMessage(message: any, shardId: number) { +export async function handleOnMessage(gateway: GatewayManager, message: any, shardId: number) { if (message instanceof ArrayBuffer) { message = new Uint8Array(message); } if (message instanceof Uint8Array) { - message = decompressWith(message, 0, (slice: Uint8Array) => ws.utf8decoder.decode(slice)); + message = decompressWith(message, 0, (slice: Uint8Array) => gateway.utf8decoder.decode(slice)); } if (typeof message !== "string") return; - const shard = ws.shards.get(shardId); + const shard = gateway.shards.get(shardId); const messageData = JSON.parse(message) as DiscordGatewayPayload; - ws.log("RAW", { shardId, payload: messageData }); + gateway.log("RAW", { shardId, payload: messageData }); switch (messageData.op) { case DiscordGatewayOpcodes.Heartbeat: - if (shard?.ws.readyState !== WebSocket.OPEN) return; + if (shard?.gateway.readyState !== WebSocket.OPEN) return; shard.heartbeat.lastSentAt = Date.now(); // Discord randomly sends this requiring an immediate heartbeat back - ws.sendShardMessage( + gateway.sendShardMessage( + gateway, shard, { op: DiscordGatewayOpcodes.Heartbeat, @@ -42,77 +42,77 @@ export async function handleOnMessage(message: any, shardId: number) { ); break; case DiscordGatewayOpcodes.Hello: - ws.heartbeat(shardId, (messageData.d as DiscordHello).heartbeat_interval); + gateway.heartbeat(gateway, shardId, (messageData.d as DiscordHello).heartbeat_interval); break; case DiscordGatewayOpcodes.HeartbeatACK: - if (ws.shards.has(shardId)) { - const shard = ws.shards.get(shardId)!; + if (gateway.shards.has(shardId)) { + const shard = gateway.shards.get(shardId)!; shard.heartbeat.acknowledged = true; shard.heartbeat.lastReceivedAt = Date.now(); } break; case DiscordGatewayOpcodes.Reconnect: - ws.log("RECONNECT", { shardId }); + gateway.log("RECONNECT", { shardId }); - if (ws.shards.has(shardId)) { - ws.shards.get(shardId)!.resuming = true; + if (gateway.shards.has(shardId)) { + gateway.shards.get(shardId)!.resuming = true; } - ws.resume(shardId); + gateway.resume(gateway, shardId); break; case DiscordGatewayOpcodes.InvalidSession: - ws.log("INVALID_SESSION", { shardId, payload: messageData }); + gateway.log("INVALID_SESSION", { shardId, payload: messageData }); // We need to wait for a random amount of time between 1 and 5: https://discord.com/developers/docs/topics/gateway#resuming await delay(Math.floor((Math.random() * 4 + 1) * 1000)); // When d is false we need to reidentify if (!messageData.d) { - await ws.identify(shardId, ws.maxShards); + await gateway.identify(gateway, shardId, gateway.maxShards); break; } - if (ws.shards.has(shardId)) { - ws.shards.get(shardId)!.resuming = true; + if (gateway.shards.has(shardId)) { + gateway.shards.get(shardId)!.resuming = true; } - ws.resume(shardId); + gateway.resume(gateway, shardId); break; default: if (messageData.t === "RESUMED") { - ws.log("RESUMED", { shardId }); + gateway.log("RESUMED", { shardId }); - if (ws.shards.has(shardId)) { - ws.shards.get(shardId)!.resuming = false; + if (gateway.shards.has(shardId)) { + gateway.shards.get(shardId)!.resuming = false; } break; } // Important for RESUME if (messageData.t === "READY") { - const shard = ws.shards.get(shardId); + const shard = gateway.shards.get(shardId); if (shard) { shard.sessionId = (messageData.d as DiscordReady).session_id; } - ws.loadingShards.get(shardId)?.resolve(true); - ws.loadingShards.delete(shardId); + gateway.loadingShards.get(shardId)?.resolve(true); + gateway.loadingShards.delete(shardId); // Wait few seconds to spawn next shard setTimeout(() => { - const bucket = ws.buckets.get(shardId % ws.botGatewayData.sessionStartLimit.maxConcurrency); + const bucket = gateway.buckets.get(shardId % gateway.botGatewayData.sessionStartLimit.maxConcurrency); if (bucket) bucket.createNextShard.shift()?.(); - }, ws.spawnShardDelay); + }, gateway.spawnShardDelay); } // Update the sequence number if it is present if (messageData.s) { - const shard = ws.shards.get(shardId); + const shard = gateway.shards.get(shardId); if (shard) { shard.previousSequenceNumber = messageData.s; } } - if (ws.url) await ws.handleDiscordPayload(messageData, shardId); + if (gateway.url) await gateway.handleDiscordPayload(gateway, messageData, shardId); else { eventHandlers.raw?.(messageData); await eventHandlers.dispatchRequirements?.(messageData, shardId); diff --git a/src/ws/heartbeat.ts b/src/ws/heartbeat.ts index d695136f2..ef25f1f63 100644 --- a/src/ws/heartbeat.ts +++ b/src/ws/heartbeat.ts @@ -1,21 +1,21 @@ import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; import { delay } from "../util/utils.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; -export async function heartbeat(shardId: number, interval: number) { - ws.log("HEARTBEATING_STARTED", { shardId, interval }); +export async function heartbeat(gateway: GatewayManager, shardId: number, interval: number) { + gateway.log("HEARTBEATING_STARTED", { shardId, interval }); - const shard = ws.shards.get(shardId); + const shard = gateway.shards.get(shardId); if (!shard) return; - ws.log("HEARTBEATING_DETAILS", { shardId, interval, shard }); + gateway.log("HEARTBEATING_DETAILS", { shardId, interval, shard }); // The first heartbeat is special so we send it without setInterval: https://discord.com/developers/docs/topics/gateway#heartbeating await delay(Math.floor(shard.heartbeat.interval * Math.random())); - if (shard.ws.readyState !== WebSocket.OPEN) return; + if (shard.gateway.readyState !== WebSocket.OPEN) return; - shard.ws.send( + shard.gateway.send( JSON.stringify({ op: DiscordGatewayOpcodes.Heartbeat, d: shard.previousSequenceNumber, @@ -28,29 +28,29 @@ export async function heartbeat(shardId: number, interval: number) { shard.heartbeat.interval = interval; shard.heartbeat.intervalId = setInterval(async () => { - ws.log("DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`); - const currentShard = ws.shards.get(shardId); + gateway.log("DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`); + const currentShard = gateway.shards.get(shardId); if (!currentShard) return; - ws.log("HEARTBEATING", { shardId, shard: currentShard }); + gateway.log("HEARTBEATING", { shardId, shard: currentShard }); - if (currentShard.ws.readyState === WebSocket.CLOSED || !currentShard.heartbeat.keepAlive) { - ws.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard }); + if (currentShard.gateway.readyState === WebSocket.CLOSED || !currentShard.heartbeat.keepAlive) { + gateway.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard }); // STOP THE HEARTBEAT return clearInterval(shard.heartbeat.intervalId); } if (!currentShard.heartbeat.acknowledged) { - ws.closeWS(currentShard.ws, 3066, "Did not receive an ACK in time."); - return await ws.identify(shardId, ws.maxShards); + gateway.closeWS(currentShard.gateway, 3066, "Did not receive an ACK in time."); + return await gateway.identify(gateway, shardId, gateway.maxShards); } - if (currentShard.ws.readyState !== WebSocket.OPEN) return; + if (currentShard.gateway.readyState !== WebSocket.OPEN) return; currentShard.heartbeat.acknowledged = false; - currentShard.ws.send( + currentShard.gateway.send( JSON.stringify({ op: DiscordGatewayOpcodes.Heartbeat, d: currentShard.previousSequenceNumber, diff --git a/src/ws/identify.ts b/src/ws/identify.ts index f036b930e..9441e7144 100644 --- a/src/ws/identify.ts +++ b/src/ws/identify.ts @@ -1,23 +1,23 @@ import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; -export function identify(shardId: number, maxShards: number) { - ws.log("IDENTIFYING", { shardId, maxShards }); +export function identify(gateway: GatewayManager, shardId: number, maxShards: number) { + gateway.log("IDENTIFYING", { shardId, maxShards }); // Need to clear the old heartbeat interval - const oldShard = ws.shards.get(shardId); + const oldShard = gateway.shards.get(shardId); if (oldShard) { - ws.closeWS(oldShard.ws, 3065, "Reidentifying closure of old shard"); + gateway.closeWS(oldShard.gateway, 3065, "Reidentifying closure of old shard"); clearInterval(oldShard.heartbeat.intervalId); } // CREATE A SHARD - const socket = ws.createShard(shardId); + const socket = gateway.createShard(gateway, shardId); // Identify can just set/reset the settings for the shard - ws.shards.set(shardId, { + gateway.shards.set(shardId, { id: shardId, - ws: socket, + gateway: socket, resumeInterval: 0, sessionId: "", previousSequenceNumber: 0, @@ -39,11 +39,12 @@ export function identify(shardId: number, maxShards: number) { }); socket.onopen = () => { - ws.sendShardMessage( + gateway.sendShardMessage( + gateway, shardId, { op: DiscordGatewayOpcodes.Identify, - d: { ...ws.identifyPayload, shard: [shardId, maxShards] }, + d: { ...gateway.identifyPayload, shard: [shardId, maxShards] }, }, true ); @@ -54,7 +55,7 @@ export function identify(shardId: number, maxShards: number) { reject(`[Identify Failure] Shard ${shardId} has not received READY event in over a minute.`); }, 600000); - ws.loadingShards.set(shardId, { + gateway.loadingShards.set(shardId, { shardId, resolve: (args) => { clearTimeout(timeout); diff --git a/src/ws/process_queue.ts b/src/ws/process_queue.ts index eb6814ff4..ab670d3f1 100644 --- a/src/ws/process_queue.ts +++ b/src/ws/process_queue.ts @@ -1,16 +1,16 @@ import { loopObject } from "../util/loop_object.ts"; import { delay } from "../util/utils.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; -export async function processQueue(id: number) { - const shard = ws.shards.get(id); +export async function processQueue(gateway: GatewayManager, id: number) { + const shard = gateway.shards.get(id); // If no items or its already processing then exit if (!shard?.queue.length || shard.processingQueue) return; shard.processingQueue = true; while (shard.queue.length) { - if (shard.ws.readyState !== WebSocket.OPEN) { + if (shard.gateway.readyState !== WebSocket.OPEN) { shard.processingQueue = false; return; } @@ -34,20 +34,20 @@ export async function processQueue(id: number) { : Array.isArray(value) ? value.map((v) => (typeof v === "bigint" ? v.toString() : v)) : value, - `Running forEach loop in ws.processQueue function for changing bigints to strings.` + `Running forEach loop in gateway.processQueue function for changing bigints to strings.` ); } - ws.log("RAW_SEND", shard.id, request); + gateway.log("RAW_SEND", shard.id, request); - shard.ws.send(JSON.stringify(request)); + shard.gateway.send(JSON.stringify(request)); // Counter is useful for preventing 120/m requests. shard.queueCounter++; // Handle if the requests have been maxed if (shard.queueCounter >= 118) { - ws.log("DEBUG", { + gateway.log("DEBUG", { message: "Max gateway requests per minute reached setting timeout for one minute", shardId: shard.id, }); diff --git a/src/ws/resharder.ts b/src/ws/resharder.ts index e930d9f0f..c647d2ede 100644 --- a/src/ws/resharder.ts +++ b/src/ws/resharder.ts @@ -1,30 +1,30 @@ import { getGatewayBot } from "../helpers/misc/get_gateway_bot.ts"; -import { ws } from "./ws.ts"; +import { GatewayManager } from "../bot.ts"; /** The handler to automatically reshard when necessary. */ -export async function resharder() { - ws.botGatewayData = await getGatewayBot(); +export async function resharder(gateway: GatewayManager) { + gateway.botGatewayData = await getGatewayBot(); - const percentage = ((ws.botGatewayData.shards - ws.maxShards) / ws.maxShards) * 100; + const percentage = ((gateway.botGatewayData.shards - gateway.maxShards) / gateway.maxShards) * 100; // Less than necessary% being used so do nothing - if (percentage < ws.reshardPercentage) return; + if (percentage < gateway.reshardPercentage) return; // Don't have enough identify rate limits to reshard - if (ws.botGatewayData.sessionStartLimit.remaining < ws.botGatewayData.shards) { + if (gateway.botGatewayData.sessionStartLimit.remaining < gateway.botGatewayData.shards) { return; } // Begin resharding - ws.maxShards = ws.botGatewayData.shards; + gateway.maxShards = gateway.botGatewayData.shards; // If more than 100K servers, begin switching to 16x sharding - if (ws.maxShards && ws.useOptimalLargeBotSharding) { - ws.maxShards = Math.ceil( - ws.maxShards / - (ws.botGatewayData.sessionStartLimit.maxConcurrency === 1 + if (gateway.maxShards && gateway.useOptimalLargeBotSharding) { + gateway.maxShards = Math.ceil( + gateway.maxShards / + (gateway.botGatewayData.sessionStartLimit.maxConcurrency === 1 ? 16 - : ws.botGatewayData.sessionStartLimit.maxConcurrency) + : gateway.botGatewayData.sessionStartLimit.maxConcurrency) ); } - ws.spawnShards(ws.firstShardId); + gateway.spawnShards(gateway, gateway.firstShardId); } diff --git a/src/ws/resume.ts b/src/ws/resume.ts index 038858247..e63e8da41 100644 --- a/src/ws/resume.ts +++ b/src/ws/resume.ts @@ -1,29 +1,29 @@ import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; -import { ws } from "./ws.ts"; +import {GatewayManager} from "../bot.ts"; -export function resume(shardId: number) { - ws.log("RESUMING", { shardId }); +export function resume(gateway: GatewayManager, shardId: number) { + gateway.log("RESUMING", { shardId }); // NOW WE HANDLE RESUMING THIS SHARD // Get the old data for this shard necessary for resuming - const oldShard = ws.shards.get(shardId); + const oldShard = gateway.shards.get(shardId); if (oldShard) { // HOW TO CLOSE OLD SHARD SOCKET!!! - ws.closeWS(oldShard.ws, 3064, "Resuming the shard, closing old shard."); + gateway.closeWS(oldShard.gateway, 3064, "Resuming the shard, closing old shard."); // STOP OLD HEARTBEAT clearInterval(oldShard.heartbeat.intervalId); } // CREATE A SHARD - const socket = ws.createShard(shardId); + const socket = gateway.createShard(gateway, shardId); const sessionId = oldShard?.sessionId || ""; const previousSequenceNumber = oldShard?.previousSequenceNumber || 0; - ws.shards.set(shardId, { + gateway.shards.set(shardId, { id: shardId, - ws: socket, + gateway: socket, resumeInterval: 0, sessionId: sessionId, previousSequenceNumber: previousSequenceNumber, @@ -46,12 +46,13 @@ export function resume(shardId: number) { // Resume on open socket.onopen = () => { - ws.sendShardMessage( + gateway.sendShardMessage( + gateway, shardId, { op: DiscordGatewayOpcodes.Resume, d: { - token: ws.identifyPayload.token, + token: gateway.identifyPayload.token, session_id: sessionId, seq: previousSequenceNumber, }, diff --git a/src/ws/send_shard_message.ts b/src/ws/send_shard_message.ts index 787792b94..6179e00d3 100644 --- a/src/ws/send_shard_message.ts +++ b/src/ws/send_shard_message.ts @@ -1,7 +1,8 @@ -import { DiscordenoShard, WebSocketRequest, ws } from "./ws.ts"; +import { DiscordenoShard, WebSocketRequest } from "./ws.ts"; +import { GatewayManager } from "../bot.ts"; -export function sendShardMessage(shard: number | DiscordenoShard, message: WebSocketRequest, highPriority = false) { - if (typeof shard === "number") shard = ws.shards.get(shard)!; +export function sendShardMessage(gateway: GatewayManager, shard: number | DiscordenoShard, message: WebSocketRequest, highPriority = false) { + if (typeof shard === "number") shard = gateway.shards.get(shard)!; if (!shard) return; if (!highPriority) { @@ -10,5 +11,5 @@ export function sendShardMessage(shard: number | DiscordenoShard, message: WebSo shard.queue.unshift(message); } - ws.processQueue(shard.id); + gateway.processQueue(gateway, shard.id); } diff --git a/src/ws/spawn_shards.ts b/src/ws/spawn_shards.ts index 4f3010894..364375744 100644 --- a/src/ws/spawn_shards.ts +++ b/src/ws/spawn_shards.ts @@ -1,35 +1,35 @@ /** Begin spawning shards. */ -import {Bot} from "../bot.ts"; +import { GatewayManager } from "../bot.ts"; -export function spawnShards(bot: Bot, firstShardId = 0) { +export function spawnShards(gateway: GatewayManager, firstShardId = 0) { /** Stored as bucketId: [clusterId, [ShardIds]] */ - const maxShards = bot.gateway.maxShards || bot.gateway.botGatewayData.shards; + const maxShards = gateway.maxShards || gateway.botGatewayData.shards; let cluster = 0; - for (let index = firstShardId; index < bot.gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) { - bot.gateway.log("DEBUG", `1. Running for loop in spawnShards function.`); + for (let index = firstShardId; index < gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) { + gateway.log("DEBUG", `1. Running for loop in spawnShards function.`); // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS for (let i = 0; i < maxShards; i++) { - bot.gateway.log("DEBUG", `2. Running for loop in spawnShards function.`); - const bucketId = i % bot.gateway.botGatewayData.sessionStartLimit.maxConcurrency; - const bucket = bot.gateway.buckets.get(bucketId); + gateway.log("DEBUG", `2. Running for loop in spawnShards function.`); + const bucketId = i % gateway.botGatewayData.sessionStartLimit.maxConcurrency; + const bucket = gateway.buckets.get(bucketId); if (!bucket) { // Create the bucket since it doesnt exist - bot.gateway.buckets.set(bucketId, { + gateway.buckets.set(bucketId, { clusters: [[cluster, i]], createNextShard: [], }); - if (cluster + 1 <= bot.gateway.maxClusters) cluster++; + if (cluster + 1 <= gateway.maxClusters) cluster++; } else { // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE - const queue = bucket.clusters.find((q) => q.length < bot.gateway.shardsPerCluster + 1); + const queue = bucket.clusters.find((q) => q.length < gateway.shardsPerCluster + 1); if (queue) { // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE queue.push(i); } else { - if (cluster + 1 <= bot.gateway.maxClusters) cluster++; + if (cluster + 1 <= gateway.maxClusters) cluster++; // ADD A NEW QUEUE FOR THIS SHARD bucket.clusters.push([cluster, i]); } @@ -38,14 +38,14 @@ export function spawnShards(bot: Bot, firstShardId = 0) { } // SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP - bot.gateway.buckets.forEach((bucket, bucketId) => { - bot.gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`); + gateway.buckets.forEach((bucket, bucketId) => { + gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`); for (const [clusterId, ...queue] of bucket.clusters) { - bot.gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`); + gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`); queue.forEach((shardId) => { bucket.createNextShard.push(async () => { - await bot.gateway.tellClusterToIdentify(clusterId, shardId, bucketId); + await gateway.tellClusterToIdentify(gateway, clusterId, shardId, bucketId); }); }); diff --git a/src/ws/start_gateway.ts b/src/ws/start_gateway.ts index e91415493..99c704f94 100644 --- a/src/ws/start_gateway.ts +++ b/src/ws/start_gateway.ts @@ -2,40 +2,40 @@ import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts"; import type { GetGatewayBot } from "../types/gateway/get_gateway_bot.ts"; import { camelize } from "../util/utils.ts"; import { StartGatewayOptions } from "./start_gateway_options.ts"; -import { ws } from "./ws.ts"; +import { GatewayManager } from "../bot.ts"; /** ADVANCED DEVS ONLY!!!!!! * Starts the standalone gateway. * This will require starting the bot separately. */ -export async function startGateway(options: StartGatewayOptions) { - ws.identifyPayload.token = `Bot ${options.token}`; - ws.secretKey = options.secretKey; - ws.firstShardId = options.firstShardId; - ws.url = options.url; - if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster; - if (options.maxClusters) ws.maxClusters = options.maxClusters; +export async function startGateway(gateway: GatewayManager, options: StartGatewayOptions) { + gateway.identifyPayload.token = `Bot ${options.token}`; + gateway.secretKey = options.secretKey; + gateway.firstShardId = options.firstShardId; + gateway.url = options.url; + if (options.shardsPerCluster) gateway.shardsPerCluster = options.shardsPerCluster; + if (options.maxClusters) gateway.maxClusters = options.maxClusters; if (options.compress) { - ws.identifyPayload.compress = options.compress; + gateway.identifyPayload.compress = options.compress; } - if (options.reshard) ws.reshard = options.reshard; + if (options.reshard) gateway.reshard = options.reshard; // Once an hour check if resharding is necessary - setInterval(ws.resharder, 1000 * 60 * 60); + setInterval(() => gateway.resharder(gateway), 1000 * 60 * 60); - ws.identifyPayload.intents = options.intents.reduce( - (bits, next) => (bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next), - 0 + gateway.identifyPayload.intents = options.intents.reduce( + (bits, next) => (bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next), + 0 ); - ws.botGatewayData = camelize( - await fetch(`https://discord.com/api/gateway/bot`, { - headers: { Authorization: ws.identifyPayload.token }, - }).then((res) => res.json()) + gateway.botGatewayData = camelize( + await fetch(`https://discord.com/api/gateway/bot`, { + headers: { Authorization: gateway.identifyPayload.token }, + }).then((res) => res.json()) ) as GetGatewayBot; - ws.maxShards = options.maxShards || ws.botGatewayData.shards; - ws.lastShardId = options.lastShardId || ws.botGatewayData.shards - 1; + gateway.maxShards = options.maxShards || gateway.botGatewayData.shards; + gateway.lastShardId = options.lastShardId || gateway.botGatewayData.shards - 1; - ws.spawnShards(ws.firstShardId); + gateway.spawnShards(gateway, gateway.firstShardId); } diff --git a/src/ws/tell_cluster_to_identify.ts b/src/ws/tell_cluster_to_identify.ts index 564872dfb..bbef60e95 100644 --- a/src/ws/tell_cluster_to_identify.ts +++ b/src/ws/tell_cluster_to_identify.ts @@ -1,6 +1,6 @@ -import { ws } from "./ws.ts"; - +import {GatewayManager} from "../bot.ts"; /** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */ -export async function tellClusterToIdentify(_workerId: number, shardId: number, _bucketId: number) { - await ws.identify(shardId, ws.maxShards); + +export async function tellClusterToIdentify(gateway: GatewayManager, _workerId: number, shardId: number, _bucketId: number) { + await gateway.identify(gateway, shardId, gateway.maxShards); }