From 048dd7a4f235f7c9c52112d10e239cf6a67a5ed2 Mon Sep 17 00:00:00 2001 From: Skillz Date: Wed, 10 Feb 2021 12:06:32 -0500 Subject: [PATCH] test queue system for shard spawinging --- src/ws/proxy/deps.ts | 1 + src/ws/proxy/manager.ts | 108 ++++++++++++++++++++++++++++++++++------ src/ws/proxy/ws.ts | 7 ++- 3 files changed, 99 insertions(+), 17 deletions(-) diff --git a/src/ws/proxy/deps.ts b/src/ws/proxy/deps.ts index fef4dd6c7..4a3715e57 100644 --- a/src/ws/proxy/deps.ts +++ b/src/ws/proxy/deps.ts @@ -1 +1,2 @@ export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js"; +export { default as Queue } from "https://esm.sh/denque@1.5.0"; diff --git a/src/ws/proxy/manager.ts b/src/ws/proxy/manager.ts index e66fe5795..da4052487 100644 --- a/src/ws/proxy/manager.ts +++ b/src/ws/proxy/manager.ts @@ -1,5 +1,7 @@ import { getGatewayBot } from "../../api/handlers/gateway.ts"; import { Intents } from "../../types/options.ts"; +import { Collection } from "../../util/collection.ts"; +import { Queue } from "./deps.ts"; import { ws } from "./ws.ts"; /** ADVANCED DEVS ONLY!!!!!! @@ -10,6 +12,8 @@ export async function startGateway(options: StartGatewayOptions) { ws.identifyPayload.token = `Bot ${options.token}`; ws.firstShardID = options.firstShardID; ws.url = options.url; + if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster; + if (options.maxClusters) ws.maxClusters = options.maxClusters; if (options.compress) { ws.identifyPayload.compress = options.compress; @@ -36,30 +40,98 @@ export async function startGateway(options: StartGatewayOptions) { ws.botGatewayData.shards = data.shards; ws.botGatewayData.url = data.url; + // TODO: LOG THIS IS HAPPENING ws.spawnShards(ws.firstShardID); } +/** Begin spawning shards. + * TODO: Put in a queue system and support clustering + */ export function spawnShards(shardID: number) { - let skipChecks = 0; + /** Stored as bucketID: [clusterID, Queue[ShardIDs]] */ + const buckets = new Collection(); + const maxShards = ws.maxShards || ws.botGatewayData.shards; + let cluster = 0; - while (shardID <= ws.lastShardID) { - if (skipChecks) { - // Start The shard - ws.identify(shardID, ws.maxShards); + for ( + let index = 0; + index < ws.botGatewayData.sessionStartLimit.maxConcurrency; + index++ + ) { + // ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS + for (let i = 0; i < maxShards; i++) { + const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency; + const bucket = buckets.get(bucketID); - shardID++; - skipChecks--; - continue; + if (!bucket) { + const queue = new Queue(); + queue.push(i); + + // Create the bucket since it doesnt exist + buckets.set(bucketID, [[cluster, queue]]); + + if (cluster + 1 <= ws.maxClusters) cluster++; + else { + // TODO: LOG THIS HAS HAPPENED + } + } else { + // FIND A QUEUE IN THIS BUCKET THAT HAS SPACE + const queue = bucket.find((q) => q[1].length < ws.shardsPerCluster + 1); + if (queue) { + // IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE + queue[1].push(i); + } else { + const newQueue = new Queue(); + newQueue.push(i); + + if (cluster + 1 <= ws.maxClusters) cluster++; + // ADD A NEW QUEUE FOR THIS SHARD + bucket.push([cluster, newQueue]); + } + } } - - // Previous shards is still not fully ready. - if (!ws.createNextShard) continue; - - // Allows next iteration to create shard - ws.createNextShard = false; - // Set the amount of shards to start up be the bots max concurrency limit - skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency; } + + // SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP + buckets.forEach(async (bucket, bucketID) => { + for (const [clusterID, queue] of bucket) { + let shardID = queue.shift(); + + while (shardID !== undefined) { + await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID); + shardID = queue.shift(); + } + } + }); + // let skipChecks = 0; + + // while (shardID <= ws.lastShardID) { + // if (skipChecks) { + // // Start The shard + // ws.identify(shardID, ws.maxShards); + + // shardID++; + // skipChecks--; + // continue; + // } + + // // Previous shards is still not fully ready. + // if (!ws.createNextShard) continue; + + // // Allows next iteration to create shard + // ws.createNextShard = false; + // // Set the amount of shards to start up be the bots max concurrency limit + // skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency; + // } +} + +/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */ +export async function tellClusterToIdentify( + clusterID: number, + shardID: number, + bucketID: number, +) { + await ws.identify(shardID, ws.maxShards); } export interface StartGatewayOptions { @@ -77,4 +149,8 @@ export interface StartGatewayOptions { lastShardID?: number; /** The url to forward all payloads to. */ url: string; + /** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */ + shardsPerCluster?: number; + /** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */ + maxClusters?: number; } diff --git a/src/ws/proxy/ws.ts b/src/ws/proxy/ws.ts index 9113cd28d..b2c225e08 100644 --- a/src/ws/proxy/ws.ts +++ b/src/ws/proxy/ws.ts @@ -1,5 +1,5 @@ import { Collection } from "../../util/collection.ts"; -import { spawnShards, startGateway } from "./manager.ts"; +import { spawnShards, startGateway, tellClusterToIdentify } from "./manager.ts"; import { createShard, handleDiscordPayload, @@ -13,6 +13,10 @@ export const ws = { url: "", /** The maximum shard ID number. Useful for zero-downtime updates or resharding. */ maxShards: 1, + /** The amount of shards to load per cluster */ + shardsPerCluster: 25, + /** The maximum amount of clusters to use for your bot. */ + maxClusters: 4, /** The first shard ID to start spawning. */ firstShardID: 0, /** The last shard ID for this cluster. */ @@ -64,6 +68,7 @@ export const ws = { identify, heartbeat, handleDiscordPayload, + tellClusterToIdentify, }; export interface DiscordenoShard {