handle not loading and autoresharding

This commit is contained in:
Skillz4Killz
2021-02-15 23:35:01 +00:00
committed by GitHub
parent 6d32d283b9
commit 848a61cfd8
7 changed files with 196 additions and 37 deletions

View File

@@ -6,4 +6,4 @@ export type CamelizeString<T extends PropertyKey> = T extends string
: T
: T;
export type Camelize<T> = { [K in keyof T as CamelizeString<K>]: T[K] }
export type Camelize<T> = { [K in keyof T]: T[K] };

View File

@@ -5,21 +5,52 @@ This WS service is meant for ADVANCED DEVELOPERS ONLY!
## Benefits
- **Zero Downtime Updates**:
- Your bot can be updated in a matter of seconds. With normal sharding, you have to restart which also has to process identifying all your shards with a 1/~5s rate limit. With WS handling moved to a proxy process, this allows you to instantly get the bot code restarted without any concerns of delays. If you have a bot on 200,000 servers normally this would mean a 20 minute delay to restart your bot if you made a small change and restarted.
- Your bot can be updated in a matter of seconds. With normal sharding, you
have to restart which also has to process identifying all your shards with a
1/~5s rate limit. With WS handling moved to a proxy process, this allows you
to instantly get the bot code restarted without any concerns of delays. If
you have a bot on 200,000 servers normally this would mean a 20 minute delay
to restart your bot if you made a small change and restarted.
- **Zero Downtime Resharding**:
- Discord stops letting your bot get added to new servers at certain points in time. For example, suppose you had 150,000 servers running 150 shards. The maximum amount of servers your shards could hold is 150 * 2500 = 375,000. If your bot reaches this, it can no longer join new servers until it re-shards.
- DD proxy provides 2 types of re-sharding. Automated and manual. You can also have both.
- `Automated`: This system will automatically begin a Zero-downtime resharding process behind the scenes when you reach 80% of your maximum servers allowed by your shards. For example, since 375,000 was the max, at 300,000 we would begin re-sharding behind the scenes with `ZERO DOWNTIME`.
- 80% of maximum servers reached (The % of 80% is customizable.)
- Identify limits have room to allow re-sharding. (Also customizable)
- `Manual`: You can also trigger this manually should you choose.
- Discord stops letting your bot get added to new servers at certain points in
time. For example, suppose you had 150,000 servers running 150 shards. The
maximum amount of servers your shards could hold is 150 * 2500 = 375,000. If
your bot reaches this, it can no longer join new servers until it re-shards.
- DD proxy provides 2 types of re-sharding. Automated and manual. You can also
have both.
- `Automated`: This system will automatically begin a Zero-downtime
resharding process behind the scenes when you reach 80% of your maximum
servers allowed by your shards. For example, since 375,000 was the max, at
300,000 we would begin re-sharding behind the scenes with `ZERO DOWNTIME`.
- 80% of maximum servers reached (The % of 80% is customizable.)
- Identify limits have room to allow re-sharding. (Also customizable)
- `Manual`: You can also trigger this manually should you choose.
- **Horizontal Scaling**:
- The proxy system allows you to scale the bot horizontally. When you reach a huge size, you can either keep spending more money to keep beefing up your server or you can buy several cheaper servers and scale horizontally. The proxy means you can have WS handling on a completely separate system.
- The proxy system allows you to scale the bot horizontally. When you reach a
huge size, you can either keep spending more money to keep beefing up your
server or you can buy several cheaper servers and scale horizontally. The
proxy means you can have WS handling on a completely separate system.
- **No Loss Restarts**:
- When you restart a bot without the proxy system, normally you would lose many events. Users may be using commands or messages are sent that will not be filtered. As your bot's grow this number rises dramatically. Users may join who wont get the auto-roles or any other actions your bot should take. With the proxy system, you can keep restarting your bot and never lose any events. Events will be put into a queue while your bot is down(max size of queue is customizable), once the bot is available the queue will begin processing all events.
- When you restart a bot without the proxy system, normally you would lose
many events. Users may be using commands or messages are sent that will not
be filtered. As your bot's grow this number rises dramatically. Users may
join who wont get the auto-roles or any other actions your bot should take.
With the proxy system, you can keep restarting your bot and never lose any
events. Events will be put into a queue while your bot is down(max size of
queue is customizable), once the bot is available the queue will begin
processing all events.
- **Controllers**:
- The controller aspect gives you full control over everything inside the proxy. You can provide a function to simply override the handler. For example, if you would like a certain function to do something different, instead of having to fork and maintain your fork, you can just provide a function to override.
- The controller aspect gives you full control over everything inside the
proxy. You can provide a function to simply override the handler. For
example, if you would like a certain function to do something different,
instead of having to fork and maintain your fork, you can just provide a
function to override.
- **Clustering With Workers**:
- Take full advantage of all your CPU cores by using workers to spread the
load. Control how many shards per worker and how many workers to maximize
efficiency!

View File

@@ -2,20 +2,62 @@ import { DiscordPayload } from "../../types/discord.ts";
import { DiscordenoShard } from "./ws.ts";
/** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */
export function log(type: "CLOSED", data: { shardID: number, payload: CloseEvent }): unknown;
export function log(type: "CLOSED_RECONNECT", data: { shardID: number, payload: CloseEvent }): unknown;
export function log(type: "ERROR", data: Record<string, unknown> & { shardID: number }): unknown;
export function log(type: "HEARTBEATING", data: { shardID: number, shard: DiscordenoShard }): unknown;
export function log(type: "HEARTBEATING_CLOSED", data: { shardID: number, shard: DiscordenoShard }): unknown;
export function log(type: "HEARTBEATING_DETAILS", data: { shardID: number, interval: number, shard: DiscordenoShard }): unknown;
export function log(type: "HEARTBEATING_STARTED", data: { shardID: number, interval: number }): unknown;
export function log(type: "IDENTIFYING", data: { shardID: number, maxShards: number }): unknown;
export function log(type: "INVALID_SESSION", data: { shardID: number, payload: DiscordPayload }): unknown;
export function log(
type: "CLOSED",
data: { shardID: number; payload: CloseEvent },
): unknown;
export function log(
type: "CLOSED_RECONNECT",
data: { shardID: number; payload: CloseEvent },
): unknown;
export function log(
type: "ERROR",
data: Record<string, unknown> & { shardID: number },
): unknown;
export function log(
type: "HEARTBEATING",
data: { shardID: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_CLOSED",
data: { shardID: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_DETAILS",
data: { shardID: number; interval: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_STARTED",
data: { shardID: number; interval: number },
): unknown;
export function log(
type: "IDENTIFYING",
data: { shardID: number; maxShards: number },
): unknown;
export function log(
type: "INVALID_SESSION",
data: { shardID: number; payload: DiscordPayload },
): unknown;
export function log(type: "RAW", data: Record<string, unknown>): unknown;
export function log(type: "RECONNECT", data: { shardID: number }): unknown;
export function log(type: "RESUMED", data: { shardID: number }): unknown;
export function log(type: "RESUMING", data: { shardID: number }): unknown;
export function log(type: "CLOSED" | "CLOSED_RECONNECT" | "ERROR" | "HEARTBEATING" | "HEARTBEATING_CLOSED" | "HEARTBEATING_DETAILS" | "HEARTBEATING_STARTED" | "IDENTIFYING" | "INVALID_SESSION" | "RAW" | "RECONNECT" | "RESUMED" | "RESUMING", data: unknown) {
console.log(type, data);
export function log(
type:
| "CLOSED"
| "CLOSED_RECONNECT"
| "ERROR"
| "HEARTBEATING"
| "HEARTBEATING_CLOSED"
| "HEARTBEATING_DETAILS"
| "HEARTBEATING_STARTED"
| "IDENTIFYING"
| "INVALID_SESSION"
| "RAW"
| "RECONNECT"
| "RESUMED"
| "RESUMING",
data: unknown,
) {
console.log(type, data);
}

View File

@@ -17,6 +17,9 @@ export async function startGateway(options: StartGatewayOptions) {
if (options.compress) {
ws.identifyPayload.compress = options.compress;
}
if (options.reshard) ws.reshard = options.reshard;
// Once an hour check if resharding is necessary
setInterval(ws.resharder, 1000 * 60 * 60);
ws.identifyPayload.intents = options.intents.reduce(
(bits, next) => (bits |= typeof next === "string" ? Intents[next] : next),
@@ -39,21 +42,19 @@ export async function startGateway(options: StartGatewayOptions) {
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
// TODO: LOG THIS IS HAPPENING
ws.spawnShards(ws.firstShardID);
ws.cleanupLoadingShards();
}
/** Begin spawning shards.
* TODO: Put in a queue system and support clustering
*/
export function spawnShards(shardID: number) {
/** Begin spawning shards. */
export function spawnShards(firstShardID = 0) {
/** Stored as bucketID: [clusterID, [ShardIDs]] */
const buckets = new Collection<number, number[][]>();
const maxShards = ws.maxShards || ws.botGatewayData.shards;
let cluster = 0;
for (
let index = 0;
let index = firstShardID;
index < ws.botGatewayData.sessionStartLimit.maxConcurrency;
index++
) {
@@ -67,9 +68,6 @@ export function spawnShards(shardID: number) {
buckets.set(bucketID, [[cluster, i]]);
if (cluster + 1 <= ws.maxClusters) cluster++;
else {
// TODO: LOG THIS HAS HAPPENED
}
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1);
@@ -104,8 +102,29 @@ export async function tellClusterToIdentify(
shardID: number,
bucketID: number,
) {
// When resharding
const oldShard = ws.shards.get(shardID);
// TODO: resolve promise 5 sec after ready
await ws.identify(shardID, ws.maxShards);
if (oldShard) {
oldShard.ws.close(4009, "Resharded!");
}
}
/** The handler to clean up shards that identified but never received a READY. */
export function cleanupLoadingShards() {
while (ws.loadingShards.size) {
const now = Date.now();
ws.loadingShards.forEach((loadingShard) => {
// Not a minute yet. Max should be few seconds but do a minute to be safe.
if (loadingShard.startedAt + 60000 < now) return;
loadingShard.reject(
`[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.`,
);
});
}
}
export interface StartGatewayOptions {
@@ -127,4 +146,6 @@ export interface StartGatewayOptions {
shardsPerCluster?: number;
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters?: number;
/** Whether or not you want to allow automated sharding. By default this is true. */
reshard?: boolean;
}

30
src/ws/proxy/resharder.ts Normal file
View File

@@ -0,0 +1,30 @@
import { getGatewayBot } from "../../api/handlers/gateway.ts";
import { ws } from "./ws.ts";
/** The handler to automatically reshard when necessary. */
export async function resharder() {
const data = await getGatewayBot();
const percentage = (data.shards - ws.maxShards) / ws.maxShards * 100;
// Less than necessary% being used so do nothing
if (percentage < ws.reshardPercentage) return;
// Don't have enough identify rate limits to reshard
if (data.session_start_limit.remaining < data.shards) return;
// Begin resharding
ws.maxShards = data.shards;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
}

View File

@@ -16,7 +16,7 @@ export async function resume(shardID: number) {
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardID);
if (oldShard) {
// HOW TO CLOSE OLD SHARD SOCKET!!!
oldShard.ws.close(4009, "Resuming the shard, closing old shard.");
@@ -58,7 +58,7 @@ export async function resume(shardID: number) {
}
export async function identify(shardID: number, maxShards: number) {
ws.log("IDENTIFYING", { shardID, maxShards })
ws.log("IDENTIFYING", { shardID, maxShards });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
@@ -89,6 +89,17 @@ export async function identify(shardID: number, maxShards: number) {
},
),
);
return new Promise((resolve, reject) => {
ws.loadingShards.set(shardID, {
shardID,
resolve,
reject,
startedAt: Date.now(),
});
ws.cleanupLoadingShards();
});
}
export function heartbeat(shardID: number, interval: number) {
@@ -181,7 +192,7 @@ export async function createShard(shardID: number) {
break;
case GatewayOpcode.InvalidSession:
ws.log("INVALID_SESSION", { shardID, payload: messageData });
// When d is false we need to reidentify
if (!messageData.d) {
identify(shardID, ws.maxShards);
@@ -210,6 +221,9 @@ export async function createShard(shardID: number) {
if (shard) {
shard.sessionID = (messageData.d as ReadyPayload).session_id;
}
ws.loadingShards.get(shardID)?.resolve(true);
ws.loadingShards.delete(shardID);
}
// Update the sequence number if it is present

View File

@@ -1,5 +1,10 @@
import { Collection } from "../../util/collection.ts";
import { spawnShards, startGateway, tellClusterToIdentify } from "./manager.ts";
import {
cleanupLoadingShards,
spawnShards,
startGateway,
tellClusterToIdentify,
} from "./manager.ts";
import {
createShard,
handleDiscordPayload,
@@ -7,11 +12,16 @@ import {
identify,
} from "./shard.ts";
import { log } from "./events.ts";
import { resharder } from "./resharder.ts";
// CONTROLLER LIKE INTERFACE FOR WS HANDLING
export const ws = {
/** The url that all discord payloads for the dispatch type should be sent to. */
url: "",
/** Whether or not to automatically reshard. */
reshard: true,
/** The percentage at which resharding should occur. */
reshardPercentage: 80,
/** The maximum shard ID number. Useful for zero-downtime updates or resharding. */
maxShards: 1,
/** The amount of shards to load per cluster */
@@ -57,6 +67,15 @@ export const ws = {
},
},
shards: new Collection<number, DiscordenoShard>(),
loadingShards: new Collection<
number,
{
shardID: number;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
startedAt: number;
}
>(),
utf8decoder: new TextDecoder(),
// METHODS
@@ -70,7 +89,9 @@ export const ws = {
heartbeat,
handleDiscordPayload,
tellClusterToIdentify,
log
log,
resharder,
cleanupLoadingShards,
};
export interface DiscordenoShard {