This commit is contained in:
Skillz4Killz
2021-10-25 01:54:57 +00:00
committed by GitHub
parent 231554d091
commit cbfd81ea0b
7 changed files with 59 additions and 48 deletions

View File

@@ -541,9 +541,16 @@ export async function startBot(bot: Bot) {
// START REST
bot.rest = createRestManager({ token: bot.token });
if (!bot.botGatewayData) bot.botGatewayData = await bot.helpers.getGatewayBot(bot);
// START WS
bot.gateway = createGatewayManager({
urlWSS: bot.botGatewayData.url,
shardsRecommended: bot.botGatewayData.shards,
sessionStartLimitTotal: bot.botGatewayData.sessionStartLimit.total,
sessionStartLimitRemaining: bot.botGatewayData.sessionStartLimit.remaining,
sessionStartLimitResetAfter: bot.botGatewayData.sessionStartLimit.resetAfter,
maxConcurrency: bot.botGatewayData.sessionStartLimit.maxConcurrency,
handleDiscordPayload:
// bot.handleDiscordPayload ||
async function (_, data: DiscordGatewayPayload, shardId: number) {
@@ -558,7 +565,7 @@ export async function startBot(bot: Bot) {
},
});
if (!bot.botGatewayData) bot.botGatewayData = await bot.helpers.getGatewayBot(bot);
bot.gateway.spawnShards(bot.gateway)
}
export function createUtils(options: Partial<HelperUtils>) {
@@ -1079,13 +1086,13 @@ export interface GatewayManager {
maxShards: number;
/** Whether or not the resharder should automatically switch to LARGE BOT SHARDING when you are above 100K servers. */
useOptimalLargeBotSharding: boolean;
/** The amount of shards to load per cluster. */
/** The amount of shards to load per worker. */
shardsPerCluster: number;
/** The maximum amount of clusters to use for your bot. */
/** The maximum amount of workers to use for your bot. */
maxClusters: number;
/** The first shard Id to start spawning. */
firstShardId: number;
/** The last shard Id for this cluster. */
/** The last shard Id for this worker. */
lastShardId: number;
token: string;
compress: boolean;
@@ -1119,11 +1126,11 @@ export interface GatewayManager {
startedAt: number;
}
>;
/** Stored as bucketId: { clusters: [clusterId, [ShardIds]], createNextShard: boolean } */
/** Stored as bucketId: { workers: [workerId, [ShardIds]], createNextShard: boolean } */
buckets: Collection<
number,
{
clusters: number[][];
workers: number[][];
createNextShard: (() => unknown)[];
}
>;
@@ -1149,7 +1156,7 @@ export interface GatewayManager {
heartbeat: typeof heartbeat;
/** Sends the discord payload to another server. */
handleDiscordPayload: (gateway: GatewayManager, data: GatewayPayload, shardId: number) => any;
/** Tell the cluster/worker to begin identifying this shard */
/** Tell the worker to begin identifying this shard */
tellClusterToIdentify: typeof tellClusterToIdentify;
/** Handle the different logs. Used for debugging. */
log: typeof log;

View File

@@ -10,7 +10,7 @@ export function processQueue(rest: RestManager, id: string) {
// SELECT THE FIRST ITEM FROM THIS QUEUE
const queuedRequest = queue.requests[0];
// IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) return;
if (!queuedRequest) break;
const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method.toUpperCase());
@@ -30,14 +30,12 @@ export function processQueue(rest: RestManager, id: string) {
// BREAK WHILE LOOP
break;
}
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = queuedRequest.payload.bucketId
? rest.checkRateLimits(rest, queuedRequest.payload.bucketId)
: false;
// THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE
if (bucketResetIn) continue;
// EXECUTE THE REQUEST
// IF THIS IS A GET REQUEST, CHANGE THE BODY TO QUERY PARAMETERS
@@ -56,7 +54,6 @@ export function processQueue(rest: RestManager, id: string) {
queuedRequest.request.method.toUpperCase() === "GET" && query
? `${queuedRequest.request.url}?${query}`
: queuedRequest.request.url;
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`);
rest.globalQueue.push({
@@ -64,6 +61,8 @@ export function processQueue(rest: RestManager, id: string) {
basicURL,
urlToUse,
});
rest.processGlobalQueue(rest);
queue.requests.splice(0, 1)
}
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP

View File

@@ -2,7 +2,7 @@ import { DiscordGatewayCloseEventCodes } from "../types/codes/gateway_close_even
import { GatewayManager } from "../bot.ts";
export function createShard(gateway: GatewayManager, shardId: number) {
const socket = new WebSocket(gateway.botGatewayData.url);
const socket = new WebSocket(gateway.urlWSS);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {

View File

@@ -7,7 +7,7 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu
// Need to clear the old heartbeat interval
const oldShard = gateway.shards.get(shardId);
if (oldShard) {
gateway.closeWS(oldShard.gateway, 3065, "Reidentifying closure of old shard");
gateway.closeWS(oldShard.ws, 3065, "Reidentifying closure of old shard");
clearInterval(oldShard.heartbeat.intervalId);
}
@@ -17,7 +17,7 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu
// Identify can just set/reset the settings for the shard
gateway.shards.set(shardId, {
id: shardId,
gateway: socket,
ws: socket,
resumeInterval: 0,
sessionId: "",
previousSequenceNumber: 0,
@@ -44,7 +44,17 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu
shardId,
{
op: DiscordGatewayOpcodes.Identify,
d: { ...gateway.identifyPayload, shard: [shardId, maxShards] },
d: {
token: gateway.token,
compress: gateway.compress,
properties: {
$os: gateway.$os,
$browser: gateway.$browser,
$device: gateway.$device,
},
intents: gateway.intents,
shard: [shardId, maxShards],
},
},
true
);

View File

@@ -3,49 +3,44 @@ import { GatewayManager } from "../bot.ts";
export function spawnShards(gateway: GatewayManager, firstShardId = 0) {
/** Stored as bucketId: [clusterId, [ShardIds]] */
const maxShards = gateway.maxShards || gateway.botGatewayData.shards;
let cluster = 0;
const maxShards = gateway.lastShardId || gateway.maxShards;
let worker = 0;
for (let index = firstShardId; index < gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) {
gateway.log("DEBUG", `1. Running for loop in spawnShards function.`);
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
gateway.log("DEBUG", `2. Running for loop in spawnShards function.`);
const bucketId = i % gateway.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = gateway.buckets.get(bucketId);
for (let i = 0; i < gateway.maxConcurrency; i++) {
gateway.buckets.set(i, {
workers: [],
createNextShard: [],
});
}
if (!bucket) {
// Create the bucket since it doesnt exist
gateway.buckets.set(bucketId, {
clusters: [[cluster, i]],
createNextShard: [],
});
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
gateway.log("DEBUG", `2. Running for loop in spawnShards function.`);
const bucketId = i % gateway.maxConcurrency;
const bucket = gateway.buckets.get(bucketId);
if (!bucket) throw new Error("Bucket not found when spawning shards.");
if (cluster + 1 <= gateway.maxClusters) cluster++;
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.clusters.find((q) => q.length < gateway.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (cluster + 1 <= gateway.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.clusters.push([cluster, i]);
}
}
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.workers.find((q) => q.length < gateway.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (worker + 1 <= gateway.maxClusters) worker++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.workers.push([worker, i]);
}
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
gateway.buckets.forEach((bucket, bucketId) => {
gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`);
for (const [clusterId, ...queue] of bucket.clusters) {
for (const [workerId, ...queue] of bucket.workers) {
gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`);
queue.forEach((shardId) => {
bucket.createNextShard.push(async () => {
await gateway.tellClusterToIdentify(gateway, clusterId, shardId, bucketId);
await gateway.tellClusterToIdentify(gateway, workerId, shardId, bucketId);
});
});

View File

@@ -32,7 +32,7 @@ export async function startGateway(gateway: GatewayManager, options: StartGatewa
headers: { Authorization: gateway.token },
}).then((res) => res.json())) as SnakeCasedPropertiesDeep<GetGatewayBot>;
gateway.url = result.url;
gateway.urlWSS = result.url;
gateway.sessionStartLimitTotal = result.session_start_limit.total;
gateway.sessionStartLimitRemaining = result.session_start_limit.remaining;
gateway.sessionStartLimitResetAfter = result.session_start_limit.reset_after;

View File

@@ -6,8 +6,8 @@ Deno.test("[Bot] - Starting Tests", async (t) => {
token: TOKEN || Deno.env.get('DISCORD_TOKEN'),
botId: 0n,
events: createEventHandlers({
}),
raw: console.log,
}),
intents: [],
})