feat: implement gateway queue (#844)

* feat: implement gateway queue

* process queues when adding to queue

* fix: tests fixed ready for review

* handle shard closed

Co-authored-by: Skillz4Killz <Skillz4Killz@users.noreply.github.com>
This commit is contained in:
Skillz4Killz
2021-04-13 12:29:35 -04:00
committed by GitHub
parent 16a5ac684a
commit 72618947f0
6 changed files with 90 additions and 37 deletions
+4 -4
View File
@@ -9,15 +9,15 @@ export function editBotStatus(data: Omit<StatusUpdate, "afk" | "since">) {
"loop",
`Running forEach loop in editBotStatus function.`,
);
shard.ws.send(
JSON.stringify({
shard.queue.push({
op: DiscordGatewayOpcodes.StatusUpdate,
d: {
since: null,
afk: false,
...data,
},
}),
);
});
ws.processQueue(shard.id);
});
}
+5 -8
View File
@@ -15,10 +15,7 @@ export function heartbeat(shardId: number, interval: number) {
shard.heartbeat.interval = interval;
shard.heartbeat.intervalId = setInterval(() => {
ws.log(
"DEBUG",
`Running setInterval in heartbeat file.`,
);
ws.log("DEBUG", `Running setInterval in heartbeat file.`);
const currentShard = ws.shards.get(shardId);
if (!currentShard) return;
@@ -34,11 +31,11 @@ export function heartbeat(shardId: number, interval: number) {
return clearInterval(currentShard.heartbeat.intervalId);
}
currentShard.ws.send(
JSON.stringify({
currentShard.queue.unshift({
op: DiscordGatewayOpcodes.Heartbeat,
d: currentShard.previousSequenceNumber,
}),
);
});
ws.processQueue(currentShard.id);
}, interval);
}
+5 -5
View File
@@ -3,7 +3,6 @@ import { ws } from "./ws.ts";
export async function identify(shardId: number, maxShards: number) {
ws.log("IDENTIFYING", { shardId, maxShards });
console.log("IDENTIFYING", { shardId, maxShards });
// CREATE A SHARD
const socket = await ws.createShard(shardId);
@@ -25,15 +24,16 @@ export async function identify(shardId: number, maxShards: number) {
interval: 0,
intervalId: 0,
},
queue: [],
processingQueue: false,
});
socket.onopen = () => {
socket.send(
JSON.stringify({
ws.shards.get(shardId)?.queue.unshift({
op: DiscordGatewayOpcodes.Identify,
d: { ...ws.identifyPayload, shard: [shardId, maxShards] },
}),
);
});
ws.processQueue(shardId);
};
return new Promise((resolve, reject) => {
+36
View File
@@ -0,0 +1,36 @@
import { delay } from "../util/utils.ts";
import { ws } from "./ws.ts";
export async function processQueue(id: number) {
const shard = ws.shards.get(id);
// If no items or its already processing then exit
if (!shard?.queue.length || shard.processingQueue) return;
shard.processingQueue = true;
let counter = 0;
while (shard.queue.length) {
if (shard.ws.readyState !== WebSocket.OPEN) {
shard.processingQueue = false;
return;
}
// Send a request that is next in line
const request = shard.queue.shift();
shard.ws.send(JSON.stringify(request));
// Counter is useful for preventing 120/m requests.
counter++;
// Handle if the requests have been maxed
if (counter >= 120) {
await delay(60000);
counter = 0;
continue;
}
}
shard.processingQueue = false;
}
+6 -4
View File
@@ -38,19 +38,21 @@ export async function resume(shardId: number) {
interval: 0,
intervalId: 0,
},
queue: oldShard?.queue || [],
processingQueue: false,
});
// Resume on open
socket.onopen = () => {
socket.send(
JSON.stringify({
ws.shards.get(shardId)?.queue.unshift({
op: DiscordGatewayOpcodes.Resume,
d: {
token: ws.identifyPayload.token,
session_id: sessionId,
seq: previousSequenceNumber,
},
}),
);
});
ws.processQueue(shardId);
};
}
+18
View File
@@ -1,3 +1,4 @@
import { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts";
import { Collection } from "../util/collection.ts";
import { cleanupLoadingShards } from "./cleanup_loading_shards.ts";
import { createShard } from "./create_shard.ts";
@@ -9,7 +10,9 @@ import { identify } from "./identify.ts";
import { resharder } from "./resharder.ts";
import { spawnShards } from "./spawn_shards.ts";
import { startGateway } from "./start_gateway.ts";
import { processQueue } from "./process_queue.ts";
import { tellClusterToIdentify } from "./tell_cluster_to_identify.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
// CONTROLLER LIKE INTERFACE FOR WS HANDLING
export const ws = {
@@ -103,6 +106,8 @@ export const ws = {
cleanupLoadingShards,
/** Handles the message events from websocket */
handleOnMessage,
/** Handles processing queue of requests send to this shard */
processQueue,
};
export interface DiscordenoShard {
@@ -136,4 +141,17 @@ export interface DiscordenoShard {
/** The id of the interval, useful for stopping the interval if ws closed. */
intervalId: number;
};
/** The items/requestst that are in queue to be sent to this shard websocket. */
queue: WebSocketRequest[];
/** Whether or not the queue for this shard is being processed. */
processingQueue: boolean;
}
export interface WebSocketRequest {
op: DiscordGatewayOpcodes;
d: unknown;
// guildId: string;
// shardId: number;
// nonce?: string;
// options?: Record<string, unknown>;
}