diff --git a/src/helpers/misc/edit_bot_status.ts b/src/helpers/misc/edit_bot_status.ts index 9f4bf3615..35e09c6bc 100644 --- a/src/helpers/misc/edit_bot_status.ts +++ b/src/helpers/misc/edit_bot_status.ts @@ -9,15 +9,15 @@ export function editBotStatus(data: Omit) { "loop", `Running forEach loop in editBotStatus function.`, ); - shard.ws.send( - JSON.stringify({ - op: DiscordGatewayOpcodes.StatusUpdate, - d: { - since: null, - afk: false, - ...data, - }, - }), - ); + + shard.queue.push({ + op: DiscordGatewayOpcodes.StatusUpdate, + d: { + since: null, + afk: false, + ...data, + }, + }); + ws.processQueue(shard.id); }); } diff --git a/src/ws/heartbeat.ts b/src/ws/heartbeat.ts index 2a4a1e094..f631ec3f5 100644 --- a/src/ws/heartbeat.ts +++ b/src/ws/heartbeat.ts @@ -15,10 +15,7 @@ export function heartbeat(shardId: number, interval: number) { shard.heartbeat.interval = interval; shard.heartbeat.intervalId = setInterval(() => { - ws.log( - "DEBUG", - `Running setInterval in heartbeat file.`, - ); + ws.log("DEBUG", `Running setInterval in heartbeat file.`); const currentShard = ws.shards.get(shardId); if (!currentShard) return; @@ -34,11 +31,11 @@ export function heartbeat(shardId: number, interval: number) { return clearInterval(currentShard.heartbeat.intervalId); } - currentShard.ws.send( - JSON.stringify({ - op: DiscordGatewayOpcodes.Heartbeat, - d: currentShard.previousSequenceNumber, - }), - ); + currentShard.queue.unshift({ + op: DiscordGatewayOpcodes.Heartbeat, + d: currentShard.previousSequenceNumber, + }); + + ws.processQueue(currentShard.id); }, interval); } diff --git a/src/ws/identify.ts b/src/ws/identify.ts index a31076a31..afd54e64c 100644 --- a/src/ws/identify.ts +++ b/src/ws/identify.ts @@ -3,7 +3,6 @@ import { ws } from "./ws.ts"; export async function identify(shardId: number, maxShards: number) { ws.log("IDENTIFYING", { shardId, maxShards }); - console.log("IDENTIFYING", { shardId, maxShards }); // CREATE A SHARD const socket = await ws.createShard(shardId); @@ -25,15 +24,16 @@ export async function identify(shardId: number, maxShards: number) { interval: 0, intervalId: 0, }, + queue: [], + processingQueue: false, }); socket.onopen = () => { - socket.send( - JSON.stringify({ - op: DiscordGatewayOpcodes.Identify, - d: { ...ws.identifyPayload, shard: [shardId, maxShards] }, - }), - ); + ws.shards.get(shardId)?.queue.unshift({ + op: DiscordGatewayOpcodes.Identify, + d: { ...ws.identifyPayload, shard: [shardId, maxShards] }, + }); + ws.processQueue(shardId); }; return new Promise((resolve, reject) => { diff --git a/src/ws/process_queue.ts b/src/ws/process_queue.ts new file mode 100644 index 000000000..19749555e --- /dev/null +++ b/src/ws/process_queue.ts @@ -0,0 +1,36 @@ +import { delay } from "../util/utils.ts"; +import { ws } from "./ws.ts"; + +export async function processQueue(id: number) { + const shard = ws.shards.get(id); + // If no items or its already processing then exit + if (!shard?.queue.length || shard.processingQueue) return; + + shard.processingQueue = true; + + let counter = 0; + + while (shard.queue.length) { + if (shard.ws.readyState !== WebSocket.OPEN) { + shard.processingQueue = false; + return; + } + + // Send a request that is next in line + const request = shard.queue.shift(); + + shard.ws.send(JSON.stringify(request)); + + // Counter is useful for preventing 120/m requests. + counter++; + + // Handle if the requests have been maxed + if (counter >= 120) { + await delay(60000); + counter = 0; + continue; + } + } + + shard.processingQueue = false; +} diff --git a/src/ws/resume.ts b/src/ws/resume.ts index 18f2819fa..f212c8d0c 100644 --- a/src/ws/resume.ts +++ b/src/ws/resume.ts @@ -38,19 +38,21 @@ export async function resume(shardId: number) { interval: 0, intervalId: 0, }, + queue: oldShard?.queue || [], + processingQueue: false, }); // Resume on open socket.onopen = () => { - socket.send( - JSON.stringify({ - op: DiscordGatewayOpcodes.Resume, - d: { - token: ws.identifyPayload.token, - session_id: sessionId, - seq: previousSequenceNumber, - }, - }), - ); + ws.shards.get(shardId)?.queue.unshift({ + op: DiscordGatewayOpcodes.Resume, + d: { + token: ws.identifyPayload.token, + session_id: sessionId, + seq: previousSequenceNumber, + }, + }); + + ws.processQueue(shardId); }; } diff --git a/src/ws/ws.ts b/src/ws/ws.ts index 3237d0321..1db6c8a74 100644 --- a/src/ws/ws.ts +++ b/src/ws/ws.ts @@ -1,3 +1,4 @@ +import { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts"; import { Collection } from "../util/collection.ts"; import { cleanupLoadingShards } from "./cleanup_loading_shards.ts"; import { createShard } from "./create_shard.ts"; @@ -9,7 +10,9 @@ import { identify } from "./identify.ts"; import { resharder } from "./resharder.ts"; import { spawnShards } from "./spawn_shards.ts"; import { startGateway } from "./start_gateway.ts"; +import { processQueue } from "./process_queue.ts"; import { tellClusterToIdentify } from "./tell_cluster_to_identify.ts"; +import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; // CONTROLLER LIKE INTERFACE FOR WS HANDLING export const ws = { @@ -103,6 +106,8 @@ export const ws = { cleanupLoadingShards, /** Handles the message events from websocket */ handleOnMessage, + /** Handles processing queue of requests send to this shard */ + processQueue, }; export interface DiscordenoShard { @@ -136,4 +141,17 @@ export interface DiscordenoShard { /** The id of the interval, useful for stopping the interval if ws closed. */ intervalId: number; }; + /** The items/requestst that are in queue to be sent to this shard websocket. */ + queue: WebSocketRequest[]; + /** Whether or not the queue for this shard is being processed. */ + processingQueue: boolean; +} + +export interface WebSocketRequest { + op: DiscordGatewayOpcodes; + d: unknown; + // guildId: string; + // shardId: number; + // nonce?: string; + // options?: Record; }