test queue system for shard spawinging

This commit is contained in:
Skillz
2021-02-10 12:06:32 -05:00
parent e88c49c325
commit 048dd7a4f2
3 changed files with 99 additions and 17 deletions

View File

@@ -1 +1,2 @@
export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js";
export { default as Queue } from "https://esm.sh/denque@1.5.0";

View File

@@ -1,5 +1,7 @@
import { getGatewayBot } from "../../api/handlers/gateway.ts";
import { Intents } from "../../types/options.ts";
import { Collection } from "../../util/collection.ts";
import { Queue } from "./deps.ts";
import { ws } from "./ws.ts";
/** ADVANCED DEVS ONLY!!!!!!
@@ -10,6 +12,8 @@ export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.firstShardID = options.firstShardID;
ws.url = options.url;
if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) ws.maxClusters = options.maxClusters;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
@@ -36,30 +40,98 @@ export async function startGateway(options: StartGatewayOptions) {
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
// TODO: LOG THIS IS HAPPENING
ws.spawnShards(ws.firstShardID);
}
/** Begin spawning shards.
* TODO: Put in a queue system and support clustering
*/
export function spawnShards(shardID: number) {
let skipChecks = 0;
/** Stored as bucketID: [clusterID, Queue[ShardIDs]] */
const buckets = new Collection<number, [number, Queue][]>();
const maxShards = ws.maxShards || ws.botGatewayData.shards;
let cluster = 0;
while (shardID <= ws.lastShardID) {
if (skipChecks) {
// Start The shard
ws.identify(shardID, ws.maxShards);
for (
let index = 0;
index < ws.botGatewayData.sessionStartLimit.maxConcurrency;
index++
) {
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = buckets.get(bucketID);
shardID++;
skipChecks--;
continue;
if (!bucket) {
const queue = new Queue();
queue.push(i);
// Create the bucket since it doesnt exist
buckets.set(bucketID, [[cluster, queue]]);
if (cluster + 1 <= ws.maxClusters) cluster++;
else {
// TODO: LOG THIS HAS HAPPENED
}
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.find((q) => q[1].length < ws.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue[1].push(i);
} else {
const newQueue = new Queue();
newQueue.push(i);
if (cluster + 1 <= ws.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.push([cluster, newQueue]);
}
}
}
// Previous shards is still not fully ready.
if (!ws.createNextShard) continue;
// Allows next iteration to create shard
ws.createNextShard = false;
// Set the amount of shards to start up be the bots max concurrency limit
skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency;
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
buckets.forEach(async (bucket, bucketID) => {
for (const [clusterID, queue] of bucket) {
let shardID = queue.shift();
while (shardID !== undefined) {
await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID);
shardID = queue.shift();
}
}
});
// let skipChecks = 0;
// while (shardID <= ws.lastShardID) {
// if (skipChecks) {
// // Start The shard
// ws.identify(shardID, ws.maxShards);
// shardID++;
// skipChecks--;
// continue;
// }
// // Previous shards is still not fully ready.
// if (!ws.createNextShard) continue;
// // Allows next iteration to create shard
// ws.createNextShard = false;
// // Set the amount of shards to start up be the bots max concurrency limit
// skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency;
// }
}
/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */
export async function tellClusterToIdentify(
clusterID: number,
shardID: number,
bucketID: number,
) {
await ws.identify(shardID, ws.maxShards);
}
export interface StartGatewayOptions {
@@ -77,4 +149,8 @@ export interface StartGatewayOptions {
lastShardID?: number;
/** The url to forward all payloads to. */
url: string;
/** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */
shardsPerCluster?: number;
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters?: number;
}

View File

@@ -1,5 +1,5 @@
import { Collection } from "../../util/collection.ts";
import { spawnShards, startGateway } from "./manager.ts";
import { spawnShards, startGateway, tellClusterToIdentify } from "./manager.ts";
import {
createShard,
handleDiscordPayload,
@@ -13,6 +13,10 @@ export const ws = {
url: "",
/** The maximum shard ID number. Useful for zero-downtime updates or resharding. */
maxShards: 1,
/** The amount of shards to load per cluster */
shardsPerCluster: 25,
/** The maximum amount of clusters to use for your bot. */
maxClusters: 4,
/** The first shard ID to start spawning. */
firstShardID: 0,
/** The last shard ID for this cluster. */
@@ -64,6 +68,7 @@ export const ws = {
identify,
heartbeat,
handleDiscordPayload,
tellClusterToIdentify,
};
export interface DiscordenoShard {