From cbfd81ea0bb2d7ebdd0cbdc290ce7d228da2bf40 Mon Sep 17 00:00:00 2001 From: Skillz4Killz <23035000+Skillz4Killz@users.noreply.github.com> Date: Mon, 25 Oct 2021 01:54:57 +0000 Subject: [PATCH] fixes --- src/bot.ts | 21 ++++++++++----- src/rest/process_queue.ts | 7 +++-- src/ws/create_shard.ts | 2 +- src/ws/identify.ts | 16 +++++++++--- src/ws/spawn_shards.ts | 55 ++++++++++++++++++--------------------- src/ws/start_gateway.ts | 2 +- tests/mod.ts | 4 +-- 7 files changed, 59 insertions(+), 48 deletions(-) diff --git a/src/bot.ts b/src/bot.ts index 2eaed9bcc..dd9ad7b1f 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -541,9 +541,16 @@ export async function startBot(bot: Bot) { // START REST bot.rest = createRestManager({ token: bot.token }); + if (!bot.botGatewayData) bot.botGatewayData = await bot.helpers.getGatewayBot(bot); // START WS bot.gateway = createGatewayManager({ + urlWSS: bot.botGatewayData.url, + shardsRecommended: bot.botGatewayData.shards, + sessionStartLimitTotal: bot.botGatewayData.sessionStartLimit.total, + sessionStartLimitRemaining: bot.botGatewayData.sessionStartLimit.remaining, + sessionStartLimitResetAfter: bot.botGatewayData.sessionStartLimit.resetAfter, + maxConcurrency: bot.botGatewayData.sessionStartLimit.maxConcurrency, handleDiscordPayload: // bot.handleDiscordPayload || async function (_, data: DiscordGatewayPayload, shardId: number) { @@ -558,7 +565,7 @@ export async function startBot(bot: Bot) { }, }); - if (!bot.botGatewayData) bot.botGatewayData = await bot.helpers.getGatewayBot(bot); + bot.gateway.spawnShards(bot.gateway) } export function createUtils(options: Partial) { @@ -1079,13 +1086,13 @@ export interface GatewayManager { maxShards: number; /** Whether or not the resharder should automatically switch to LARGE BOT SHARDING when you are above 100K servers. */ useOptimalLargeBotSharding: boolean; - /** The amount of shards to load per cluster. */ + /** The amount of shards to load per worker. */ shardsPerCluster: number; - /** The maximum amount of clusters to use for your bot. */ + /** The maximum amount of workers to use for your bot. */ maxClusters: number; /** The first shard Id to start spawning. */ firstShardId: number; - /** The last shard Id for this cluster. */ + /** The last shard Id for this worker. */ lastShardId: number; token: string; compress: boolean; @@ -1119,11 +1126,11 @@ export interface GatewayManager { startedAt: number; } >; - /** Stored as bucketId: { clusters: [clusterId, [ShardIds]], createNextShard: boolean } */ + /** Stored as bucketId: { workers: [workerId, [ShardIds]], createNextShard: boolean } */ buckets: Collection< number, { - clusters: number[][]; + workers: number[][]; createNextShard: (() => unknown)[]; } >; @@ -1149,7 +1156,7 @@ export interface GatewayManager { heartbeat: typeof heartbeat; /** Sends the discord payload to another server. */ handleDiscordPayload: (gateway: GatewayManager, data: GatewayPayload, shardId: number) => any; - /** Tell the cluster/worker to begin identifying this shard */ + /** Tell the worker to begin identifying this shard */ tellClusterToIdentify: typeof tellClusterToIdentify; /** Handle the different logs. Used for debugging. */ log: typeof log; diff --git a/src/rest/process_queue.ts b/src/rest/process_queue.ts index 3c75e5255..d54390853 100644 --- a/src/rest/process_queue.ts +++ b/src/rest/process_queue.ts @@ -10,7 +10,7 @@ export function processQueue(rest: RestManager, id: string) { // SELECT THE FIRST ITEM FROM THIS QUEUE const queuedRequest = queue.requests[0]; // IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT. - if (!queuedRequest) return; + if (!queuedRequest) break; const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method.toUpperCase()); @@ -30,14 +30,12 @@ export function processQueue(rest: RestManager, id: string) { // BREAK WHILE LOOP break; } - // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS const bucketResetIn = queuedRequest.payload.bucketId ? rest.checkRateLimits(rest, queuedRequest.payload.bucketId) : false; // THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE if (bucketResetIn) continue; - // EXECUTE THE REQUEST // IF THIS IS A GET REQUEST, CHANGE THE BODY TO QUERY PARAMETERS @@ -56,7 +54,6 @@ export function processQueue(rest: RestManager, id: string) { queuedRequest.request.method.toUpperCase() === "GET" && query ? `${queuedRequest.request.url}?${query}` : queuedRequest.request.url; - // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`); rest.globalQueue.push({ @@ -64,6 +61,8 @@ export function processQueue(rest: RestManager, id: string) { basicURL, urlToUse, }); + rest.processGlobalQueue(rest); + queue.requests.splice(0, 1) } // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP diff --git a/src/ws/create_shard.ts b/src/ws/create_shard.ts index a83b36823..eef127835 100644 --- a/src/ws/create_shard.ts +++ b/src/ws/create_shard.ts @@ -2,7 +2,7 @@ import { DiscordGatewayCloseEventCodes } from "../types/codes/gateway_close_even import { GatewayManager } from "../bot.ts"; export function createShard(gateway: GatewayManager, shardId: number) { - const socket = new WebSocket(gateway.botGatewayData.url); + const socket = new WebSocket(gateway.urlWSS); socket.binaryType = "arraybuffer"; socket.onerror = (errorEvent) => { diff --git a/src/ws/identify.ts b/src/ws/identify.ts index 47f17d5eb..e5010fc23 100644 --- a/src/ws/identify.ts +++ b/src/ws/identify.ts @@ -7,7 +7,7 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu // Need to clear the old heartbeat interval const oldShard = gateway.shards.get(shardId); if (oldShard) { - gateway.closeWS(oldShard.gateway, 3065, "Reidentifying closure of old shard"); + gateway.closeWS(oldShard.ws, 3065, "Reidentifying closure of old shard"); clearInterval(oldShard.heartbeat.intervalId); } @@ -17,7 +17,7 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu // Identify can just set/reset the settings for the shard gateway.shards.set(shardId, { id: shardId, - gateway: socket, + ws: socket, resumeInterval: 0, sessionId: "", previousSequenceNumber: 0, @@ -44,7 +44,17 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu shardId, { op: DiscordGatewayOpcodes.Identify, - d: { ...gateway.identifyPayload, shard: [shardId, maxShards] }, + d: { + token: gateway.token, + compress: gateway.compress, + properties: { + $os: gateway.$os, + $browser: gateway.$browser, + $device: gateway.$device, + }, + intents: gateway.intents, + shard: [shardId, maxShards], + }, }, true ); diff --git a/src/ws/spawn_shards.ts b/src/ws/spawn_shards.ts index 364375744..d4f51930b 100644 --- a/src/ws/spawn_shards.ts +++ b/src/ws/spawn_shards.ts @@ -3,49 +3,44 @@ import { GatewayManager } from "../bot.ts"; export function spawnShards(gateway: GatewayManager, firstShardId = 0) { /** Stored as bucketId: [clusterId, [ShardIds]] */ - const maxShards = gateway.maxShards || gateway.botGatewayData.shards; - let cluster = 0; + const maxShards = gateway.lastShardId || gateway.maxShards; + let worker = 0; - for (let index = firstShardId; index < gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) { - gateway.log("DEBUG", `1. Running for loop in spawnShards function.`); - // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS - for (let i = 0; i < maxShards; i++) { - gateway.log("DEBUG", `2. Running for loop in spawnShards function.`); - const bucketId = i % gateway.botGatewayData.sessionStartLimit.maxConcurrency; - const bucket = gateway.buckets.get(bucketId); + for (let i = 0; i < gateway.maxConcurrency; i++) { + gateway.buckets.set(i, { + workers: [], + createNextShard: [], + }); + } - if (!bucket) { - // Create the bucket since it doesnt exist - gateway.buckets.set(bucketId, { - clusters: [[cluster, i]], - createNextShard: [], - }); + // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS + for (let i = 0; i < maxShards; i++) { + gateway.log("DEBUG", `2. Running for loop in spawnShards function.`); + const bucketId = i % gateway.maxConcurrency; + const bucket = gateway.buckets.get(bucketId); + if (!bucket) throw new Error("Bucket not found when spawning shards."); - if (cluster + 1 <= gateway.maxClusters) cluster++; - } else { - // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE - const queue = bucket.clusters.find((q) => q.length < gateway.shardsPerCluster + 1); - if (queue) { - // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE - queue.push(i); - } else { - if (cluster + 1 <= gateway.maxClusters) cluster++; - // ADD A NEW QUEUE FOR THIS SHARD - bucket.clusters.push([cluster, i]); - } - } + // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE + const queue = bucket.workers.find((q) => q.length < gateway.shardsPerCluster + 1); + if (queue) { + // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE + queue.push(i); + } else { + if (worker + 1 <= gateway.maxClusters) worker++; + // ADD A NEW QUEUE FOR THIS SHARD + bucket.workers.push([worker, i]); } } // SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP gateway.buckets.forEach((bucket, bucketId) => { gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`); - for (const [clusterId, ...queue] of bucket.clusters) { + for (const [workerId, ...queue] of bucket.workers) { gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`); queue.forEach((shardId) => { bucket.createNextShard.push(async () => { - await gateway.tellClusterToIdentify(gateway, clusterId, shardId, bucketId); + await gateway.tellClusterToIdentify(gateway, workerId, shardId, bucketId); }); }); diff --git a/src/ws/start_gateway.ts b/src/ws/start_gateway.ts index 11ab2017e..d0f0f4ba9 100644 --- a/src/ws/start_gateway.ts +++ b/src/ws/start_gateway.ts @@ -32,7 +32,7 @@ export async function startGateway(gateway: GatewayManager, options: StartGatewa headers: { Authorization: gateway.token }, }).then((res) => res.json())) as SnakeCasedPropertiesDeep; - gateway.url = result.url; + gateway.urlWSS = result.url; gateway.sessionStartLimitTotal = result.session_start_limit.total; gateway.sessionStartLimitRemaining = result.session_start_limit.remaining; gateway.sessionStartLimitResetAfter = result.session_start_limit.reset_after; diff --git a/tests/mod.ts b/tests/mod.ts index df767137e..2112e6000 100644 --- a/tests/mod.ts +++ b/tests/mod.ts @@ -6,8 +6,8 @@ Deno.test("[Bot] - Starting Tests", async (t) => { token: TOKEN || Deno.env.get('DISCORD_TOKEN'), botId: 0n, events: createEventHandlers({ - - }), + raw: console.log, + }), intents: [], })