refactor: merge both shard systems

This commit is contained in:
Skillz4Killz
2021-04-04 12:47:02 +00:00
committed by GitHub
parent 181bdffb41
commit 535f27c734
20 changed files with 488 additions and 574 deletions
+24
View File
@@ -0,0 +1,24 @@
import { delay } from "../util/utils.ts";
import { ws } from "./ws.ts";
/** The handler to clean up shards that identified but never received a READY. */
export async function cleanupLoadingShards() {
while (ws.loadingShards.size) {
const now = Date.now();
ws.loadingShards.forEach((loadingShard) => {
console.log(
now > loadingShard.startedAt + 60000,
now,
loadingShard.startedAt
);
// Not a minute yet. Max should be few seconds but do a minute to be safe.
if (now < loadingShard.startedAt + 60000) return;
loadingShard.reject(
`[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.`
);
});
await delay(1000);
}
}
+49
View File
@@ -0,0 +1,49 @@
import { identify } from "./identify.ts";
import { handleOnMessage } from "./proxy/shard.ts";
import { resume } from "./resume.ts";
import { ws } from "./ws.ts";
// deno-lint-ignore require-await
export async function createShard(shardID: number) {
const socket = new WebSocket(ws.botGatewayData.url);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {
ws.log("ERROR", { shardID, error: errorEvent });
};
socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID);
socket.onclose = (event) => {
ws.log("CLOSED", { shardID, payload: event });
// TODO: ENUM FOR THESE CODES?
switch (event.code) {
case 4001:
case 4002:
case 4004:
case 4005:
case 4010:
case 4011:
case 4012:
case 4013:
case 4014:
throw new Error(
event.reason || "Discord gave no reason! GG! You broke Discord!"
);
// THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY!
case 4003:
case 4007:
case 4008:
case 4009:
ws.log("CLOSED_RECONNECT", { shardID, payload: event });
identify(shardID, ws.maxShards);
break;
default:
resume(shardID);
break;
}
};
return socket;
}
@@ -1,4 +1,3 @@
import { DiscordPayload } from "../../types/discord.ts";
import { DiscordenoShard } from "./ws.ts"; import { DiscordenoShard } from "./ws.ts";
/** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */ /** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */
+18
View File
@@ -0,0 +1,18 @@
import { ws } from "./ws.ts";
/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */
export async function handleDiscordPayload(
data: DiscordPayload,
shardID: number
) {
await fetch(ws.url, {
headers: {
authorization: ws.secretKey,
},
method: "post",
body: JSON.stringify({
shardID,
data,
}),
}).catch(console.error);
}
+94
View File
@@ -0,0 +1,94 @@
import { identify } from "./identify.ts";
import { resume } from "./resume.ts";
import { ws } from "./ws.ts";
import { decompressWith } from "./deps.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { DiscordReady } from "../types/gateway/ready.ts";
/** Handler for handling every message event from websocket. */
// deno-lint-ignore no-explicit-any
export function handleOnMessage(message: any, shardID: number) {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(message, 0, (slice: Uint8Array) =>
ws.utf8decoder.decode(slice)
);
}
if (typeof message !== "string") return;
const messageData = JSON.parse(message);
ws.log("RAW", messageData);
switch (messageData.op) {
case DiscordGatewayOpcodes.Hello:
ws.heartbeat(
shardID,
(messageData.d as DiscordHeartbeat).heartbeat_interval
);
break;
case DiscordGatewayOpcodes.HeartbeatACK:
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.heartbeat.acknowledged = true;
}
break;
case DiscordGatewayOpcodes.Reconnect:
ws.log("RECONNECT", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
case DiscordGatewayOpcodes.InvalidSession:
ws.log("INVALID_SESSION", { shardID, payload: messageData });
// When d is false we need to reidentify
if (!messageData.d) {
identify(shardID, ws.maxShards);
break;
}
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
default:
if (messageData.t === "RESUMED") {
ws.log("RESUMED", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = false;
}
break;
}
// Important for RESUME
if (messageData.t === "READY") {
const shard = ws.shards.get(shardID);
if (shard) {
shard.sessionID = (messageData.d as DiscordReady).session_id;
}
ws.loadingShards.get(shardID)?.resolve(true);
ws.loadingShards.delete(shardID);
}
// Update the sequence number if it is present
if (messageData.s) {
const shard = ws.shards.get(shardID);
if (shard) {
shard.previousSequenceNumber = messageData.s;
}
}
ws.handleDiscordPayload(messageData, shardID);
break;
}
}
+40
View File
@@ -0,0 +1,40 @@
import { ws } from "./ws.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
export function heartbeat(shardID: number, interval: number) {
ws.log("HEARTBEATING_STARTED", { shardID, interval });
const shard = ws.shards.get(shardID);
if (!shard) return;
ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard });
shard.heartbeat.keepAlive = true;
shard.heartbeat.acknowledged = false;
shard.heartbeat.lastSentAt = Date.now();
shard.heartbeat.interval = interval;
shard.heartbeat.intervalID = setInterval(() => {
const currentShard = ws.shards.get(shardID);
if (!currentShard) return;
ws.log("HEARTBEATING", { shardID, shard: currentShard });
if (
currentShard.ws.readyState === WebSocket.CLOSED ||
!currentShard.heartbeat.keepAlive
) {
ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard });
// STOP THE HEARTBEAT
return clearInterval(currentShard.heartbeat.intervalID);
}
currentShard.ws.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Heartbeat,
d: currentShard.previousSequenceNumber,
})
);
}, interval);
}
+47
View File
@@ -0,0 +1,47 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
export async function identify(shardID: number, maxShards: number) {
ws.log("IDENTIFYING", { shardID, maxShards });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// Identify can just set/reset the settings for the shard
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID: "",
previousSequenceNumber: 0,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
socket.onopen = () => {
socket.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Identify,
d: { ...ws.identifyPayload, shard: [shardID, maxShards] },
})
);
};
return new Promise((resolve, reject) => {
ws.loadingShards.set(shardID, {
shardID,
resolve,
reject,
startedAt: Date.now(),
});
ws.cleanupLoadingShards();
});
}
+14 -2
View File
@@ -1,2 +1,14 @@
export * from "./shard.ts"; export * from "./cleanup_loading_shards.ts";
export * from "./shard_manager.ts"; export * from "./create_shard.ts";
export * from "./events.ts";
export * from "./handle_discord_payload.ts";
export * from "./handle_on_message.ts";
export * from "./heartbeat.ts";
export * from "./identify.ts";
export * from "./resharder.ts";
export * from "./resume.ts";
export * from "./spawn_shards.ts";
export * from "./start_gateway_options.ts";
export * from "./start_gateway.ts";
export * from "./tell_cluster_to_identify.ts";
export * from "./ws.ts";
-1
View File
@@ -1 +0,0 @@
export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js";
-167
View File
@@ -1,167 +0,0 @@
import { DiscordBotGatewayData } from "../../types/discord.ts";
import { Intents } from "../../types/options.ts";
import { Collection } from "../../util/collection.ts";
import { ws } from "./ws.ts";
import { delay } from "../../util/utils.ts";
/** ADVANCED DEVS ONLY!!!!!!
* Starts the standalone gateway.
* This will require starting the bot separately.
*/
export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.secretKey = options.secretKey;
ws.firstShardID = options.firstShardID;
ws.url = options.url;
if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) ws.maxClusters = options.maxClusters;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
}
if (options.reshard) ws.reshard = options.reshard;
// Once an hour check if resharding is necessary
setInterval(ws.resharder, 1000 * 60 * 60);
ws.identifyPayload.intents = options.intents.reduce(
(bits, next) => (bits |= typeof next === "string" ? Intents[next] : next),
0,
);
const data = await fetch(
`https://discord.com/api/gateway/bot`,
{ headers: { Authorization: ws.identifyPayload.token } },
).then((res) => res.json()) as DiscordBotGatewayData;
ws.maxShards = options.maxShards || data.shards;
ws.lastShardID = options.lastShardID || data.shards - 1;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
ws.cleanupLoadingShards();
}
/** Begin spawning shards. */
export function spawnShards(firstShardID = 0) {
/** Stored as bucketID: [clusterID, [ShardIDs]] */
const buckets = new Collection<number, number[][]>();
const maxShards = ws.maxShards || ws.botGatewayData.shards;
let cluster = 0;
for (
let index = firstShardID;
index < ws.botGatewayData.sessionStartLimit.maxConcurrency;
index++
) {
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = buckets.get(bucketID);
if (!bucket) {
// Create the bucket since it doesnt exist
buckets.set(bucketID, [[cluster, i]]);
if (cluster + 1 <= ws.maxClusters) cluster++;
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (cluster + 1 <= ws.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.push([cluster, i]);
}
}
}
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
buckets.forEach(async (bucket, bucketID) => {
for (const [clusterID, ...queue] of bucket) {
let shardID = queue.shift();
while (shardID !== undefined) {
await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID);
shardID = queue.shift();
}
}
});
}
/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */
export async function tellClusterToIdentify(
workerID: number,
shardID: number,
bucketID: number,
) {
// When resharding this may exist already
const oldShard = ws.shards.get(shardID);
// TODO: Use workers
await ws.identify(shardID, ws.maxShards);
if (oldShard) {
oldShard.ws.close(4009, "Resharded!");
}
}
/** The handler to clean up shards that identified but never received a READY. */
export async function cleanupLoadingShards() {
while (ws.loadingShards.size) {
const now = Date.now();
ws.loadingShards.forEach((loadingShard) => {
console.log(
now > loadingShard.startedAt + 60000,
now,
loadingShard.startedAt,
);
// Not a minute yet. Max should be few seconds but do a minute to be safe.
if (now < loadingShard.startedAt + 60000) return;
loadingShard.reject(
`[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.`,
);
});
await delay(1000);
}
}
export interface StartGatewayOptions {
/** The bot token. */
token: string;
/** Whether or not to use compression for gateway payloads. */
compress?: boolean;
/** The intents you would like to enable. */
intents: (Intents | keyof typeof Intents)[];
/** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */
maxShards?: number;
/** The first shard ID for this group of shards. */
firstShardID: number;
/** The last shard ID for this group. If none is provided, it will default to loading all shards. */
lastShardID?: number;
/** The url to forward all payloads to. */
url: string;
/** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */
shardsPerCluster?: number;
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters?: number;
/** Whether or not you want to allow automated sharding. By default this is true. */
reshard?: boolean;
/** The authorization key that the bot http server will expect. */
secretKey: string;
}
-5
View File
@@ -1,5 +0,0 @@
export * from "./events.ts";
export * from "./manager.ts";
export * from "./resharder.ts";
export * from "./shard.ts";
export * from "./ws.ts";
-297
View File
@@ -1,297 +0,0 @@
import {
DiscordHeartbeatPayload,
DiscordPayload,
GatewayOpcode,
ReadyPayload,
} from "../../types/discord.ts";
import { decompressWith } from "./deps.ts";
import { ws } from "./ws.ts";
export async function resume(shardID: number) {
ws.log("RESUMING", { shardID });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardID);
if (oldShard) {
// HOW TO CLOSE OLD SHARD SOCKET!!!
oldShard.ws.close(4009, "Resuming the shard, closing old shard.");
// STOP OLD HEARTBEAT
clearInterval(oldShard.heartbeat.intervalID);
}
const sessionID = oldShard?.sessionID || "";
const previousSequenceNumber = oldShard?.previousSequenceNumber || 0;
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID,
previousSequenceNumber,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
// Resume on open
socket.onopen = () => {
socket.send(JSON.stringify({
op: GatewayOpcode.Resume,
d: {
token: ws.identifyPayload.token,
session_id: sessionID,
seq: previousSequenceNumber,
},
}));
};
}
export async function identify(shardID: number, maxShards: number) {
ws.log("IDENTIFYING", { shardID, maxShards });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// Identify can just set/reset the settings for the shard
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID: "",
previousSequenceNumber: 0,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
socket.onopen = () => {
socket.send(
JSON.stringify(
{
op: GatewayOpcode.Identify,
d: { ...ws.identifyPayload, shard: [shardID, maxShards] },
},
),
);
};
return new Promise((resolve, reject) => {
ws.loadingShards.set(shardID, {
shardID,
resolve,
reject,
startedAt: Date.now(),
});
ws.cleanupLoadingShards();
});
}
export function heartbeat(shardID: number, interval: number) {
ws.log("HEARTBEATING_STARTED", { shardID, interval });
const shard = ws.shards.get(shardID);
if (!shard) return;
ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard });
shard.heartbeat.keepAlive = true;
shard.heartbeat.acknowledged = false;
shard.heartbeat.lastSentAt = Date.now();
shard.heartbeat.interval = interval;
shard.heartbeat.intervalID = setInterval(() => {
const currentShard = ws.shards.get(shardID);
if (!currentShard) return;
ws.log("HEARTBEATING", { shardID, shard: currentShard });
if (
currentShard.ws.readyState === WebSocket.CLOSED ||
!currentShard.heartbeat.keepAlive
) {
ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard });
// STOP THE HEARTBEAT
return clearInterval(currentShard.heartbeat.intervalID);
}
currentShard.ws.send(
JSON.stringify(
{
op: GatewayOpcode.Heartbeat,
d: currentShard.previousSequenceNumber,
},
),
);
}, interval);
}
// deno-lint-ignore require-await
export async function createShard(shardID: number) {
const socket = new WebSocket(ws.botGatewayData.url);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {
ws.log("ERROR", { shardID, error: errorEvent });
};
socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID);
socket.onclose = (event) => {
ws.log("CLOSED", { shardID, payload: event });
// TODO: ENUM FOR THESE CODES?
switch (event.code) {
case 4001:
case 4002:
case 4004:
case 4005:
case 4010:
case 4011:
case 4012:
case 4013:
case 4014:
throw new Error(
event.reason || "Discord gave no reason! GG! You broke Discord!",
);
// THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY!
case 4003:
case 4007:
case 4008:
case 4009:
ws.log("CLOSED_RECONNECT", { shardID, payload: event });
identify(shardID, ws.maxShards);
break;
default:
resume(shardID);
break;
}
};
return socket;
}
/** Handler for handling every message event from websocket. */
// deno-lint-ignore no-explicit-any
export function handleOnMessage(message: any, shardID: number) {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(
message,
0,
(slice: Uint8Array) => ws.utf8decoder.decode(slice),
);
}
if (typeof message !== "string") return;
const messageData = JSON.parse(message);
ws.log("RAW", messageData);
switch (messageData.op) {
case GatewayOpcode.Hello:
ws.heartbeat(
shardID,
(messageData.d as DiscordHeartbeatPayload).heartbeat_interval,
);
break;
case GatewayOpcode.HeartbeatACK:
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.heartbeat.acknowledged = true;
}
break;
case GatewayOpcode.Reconnect:
ws.log("RECONNECT", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
case GatewayOpcode.InvalidSession:
ws.log("INVALID_SESSION", { shardID, payload: messageData });
// When d is false we need to reidentify
if (!messageData.d) {
identify(shardID, ws.maxShards);
break;
}
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
default:
if (messageData.t === "RESUMED") {
ws.log("RESUMED", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = false;
}
break;
}
// Important for RESUME
if (messageData.t === "READY") {
const shard = ws.shards.get(shardID);
if (shard) {
shard.sessionID = (messageData.d as ReadyPayload).session_id;
}
ws.loadingShards.get(shardID)?.resolve(true);
ws.loadingShards.delete(shardID);
}
// Update the sequence number if it is present
if (messageData.s) {
const shard = ws.shards.get(shardID);
if (shard) {
shard.previousSequenceNumber = messageData.s;
}
}
ws.handleDiscordPayload(messageData, shardID);
break;
}
}
/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */
export async function handleDiscordPayload(
data: DiscordPayload,
shardID: number,
) {
await fetch(ws.url, {
headers: {
authorization: ws.secretKey,
},
method: "post",
body: JSON.stringify({
shardID,
data,
}),
}).catch(console.error);
}
@@ -1,5 +1,5 @@
import { getGatewayBot } from "../../api/handlers/gateway.ts";
import { ws } from "./ws.ts"; import { ws } from "./ws.ts";
import { getGatewayBot } from "../helpers/misc/get_gateway_bot.ts";
/** The handler to automatically reshard when necessary. */ /** The handler to automatically reshard when necessary. */
export async function resharder() { export async function resharder() {
+54
View File
@@ -0,0 +1,54 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
export async function resume(shardID: number) {
ws.log("RESUMING", { shardID });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardID);
if (oldShard) {
// HOW TO CLOSE OLD SHARD SOCKET!!!
oldShard.ws.close(4009, "Resuming the shard, closing old shard.");
// STOP OLD HEARTBEAT
clearInterval(oldShard.heartbeat.intervalID);
}
const sessionID = oldShard?.sessionID || "";
const previousSequenceNumber = oldShard?.previousSequenceNumber || 0;
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID,
previousSequenceNumber,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
// Resume on open
socket.onopen = () => {
socket.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Resume,
d: {
token: ws.identifyPayload.token,
session_id: sessionID,
seq: previousSequenceNumber,
},
})
);
};
}
-100
View File
@@ -1,100 +0,0 @@
import { eventHandlers } from "../bot.ts";
import { cache } from "../cache.ts";
import { handlers } from "../handlers/mod.ts";
import { Member } from "../structures/mod.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { Collection } from "../util/collection.ts";
import { delay } from "../util/utils.ts";
import { createShard, requestGuildMembers } from "./mod.ts";
let createNextShard = true;
/** This function is meant to be used on the ready event to alert the library to start the next shard. */
export function allowNextShard(enabled = true) {
createNextShard = enabled;
}
export async function spawnShards(
data: DiscordBotGatewayData,
payload: DiscordIdentify,
shardId: number,
lastShardId: number,
skipChecks?: number,
) {
// All shards on this worker have started! Cancel out.
if (shardId >= lastShardId) return;
if (skipChecks) {
payload.shard = [
shardId,
data.shards > lastShardId ? data.shards : lastShardId,
];
// Start The shard
createShard(data, payload, false, shardId);
// Spawn next shard
await spawnShards(
data,
payload,
shardId + 1,
lastShardId,
skipChecks - 1,
);
return;
}
// Make sure we can create a shard or we are waiting for shards to connect still.
if (createNextShard) {
createNextShard = false;
// Start the next few shards based on max concurrency
await spawnShards(
data,
payload,
shardId,
lastShardId,
data.session_start_limit.max_concurrency,
);
return;
}
await delay(1000);
await spawnShards(data, payload, shardId, lastShardId, skipChecks);
}
export async function handleDiscordPayload(
data: DiscordPayload,
shardId: number,
) {
eventHandlers.raw?.(data);
await eventHandlers.dispatchRequirements?.(data, shardId);
switch (data.op) {
case DiscordGatewayOpcodes.HeartbeatACK:
// In case the user wants to listen to heartbeat responses
return eventHandlers.heartbeat?.();
case DiscordGatewayOpcodes.Dispatch:
if (!data.t) return;
// Run the appropriate handler for this event.
return handlers[data.t]?.(data, shardId);
default:
return;
}
}
export async function requestAllMembers(
guildId: string,
shardId: number,
resolve: (
value: Collection<string, Member> | PromiseLike<Collection<string, Member>>,
) => void,
options?: FetchMembersOptions,
) {
const nonce = `${guildId}-${Date.now()}`;
cache.fetchAllMembersProcessingRequests.set(nonce, resolve);
await requestGuildMembers(
guildId,
shardId,
nonce,
options,
);
}
+52
View File
@@ -0,0 +1,52 @@
import { Collection } from "../util/collection.ts";
import { ws } from "./ws.ts";
/** Begin spawning shards. */
export function spawnShards(firstShardID = 0) {
/** Stored as bucketID: [clusterID, [ShardIDs]] */
const buckets = new Collection<number, number[][]>();
const maxShards = ws.maxShards || ws.botGatewayData.shards;
let cluster = 0;
for (
let index = firstShardID;
index < ws.botGatewayData.sessionStartLimit.maxConcurrency;
index++
) {
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = buckets.get(bucketID);
if (!bucket) {
// Create the bucket since it doesnt exist
buckets.set(bucketID, [[cluster, i]]);
if (cluster + 1 <= ws.maxClusters) cluster++;
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (cluster + 1 <= ws.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.push([cluster, i]);
}
}
}
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
buckets.forEach(async (bucket, bucketID) => {
for (const [clusterID, ...queue] of bucket) {
let shardID = queue.shift();
while (shardID !== undefined) {
await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID);
shardID = queue.shift();
}
}
});
}
+51
View File
@@ -0,0 +1,51 @@
import { StartGatewayOptions } from "./start_gateway_options.ts";
import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts";
import { ws } from "./ws.ts";
/** ADVANCED DEVS ONLY!!!!!!
* Starts the standalone gateway.
* This will require starting the bot separately.
*/
export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.secretKey = options.secretKey;
ws.firstShardID = options.firstShardID;
ws.url = options.url;
if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) ws.maxClusters = options.maxClusters;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
}
if (options.reshard) ws.reshard = options.reshard;
// Once an hour check if resharding is necessary
setInterval(ws.resharder, 1000 * 60 * 60);
ws.identifyPayload.intents = options.intents.reduce(
(bits, next) =>
(bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next),
0
);
const data = (await fetch(`https://discord.com/api/gateway/bot`, {
headers: { Authorization: ws.identifyPayload.token },
}).then((res) => res.json())) as DiscordBotGatewayData;
ws.maxShards = options.maxShards || data.shards;
ws.lastShardID = options.lastShardID || data.shards - 1;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
ws.cleanupLoadingShards();
}
+26
View File
@@ -0,0 +1,26 @@
import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts";
export interface StartGatewayOptions {
/** The bot token. */
token: string;
/** Whether or not to use compression for gateway payloads. */
compress?: boolean;
/** The intents you would like to enable. */
intents: (DiscordGatewayIntents | keyof typeof DiscordGatewayIntents)[];
/** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */
maxShards?: number;
/** The first shard ID for this group of shards. */
firstShardID: number;
/** The last shard ID for this group. If none is provided, it will default to loading all shards. */
lastShardID?: number;
/** The url to forward all payloads to. */
url: string;
/** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */
shardsPerCluster?: number;
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters?: number;
/** Whether or not you want to allow automated sharding. By default this is true. */
reshard?: boolean;
/** The authorization key that the bot http server will expect. */
secretKey: string;
}
+18
View File
@@ -0,0 +1,18 @@
import { ws } from "./ws.ts";
/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */
export async function tellClusterToIdentify(
workerID: number,
shardID: number,
bucketID: number
) {
// When resharding this may exist already
const oldShard = ws.shards.get(shardID);
// TODO: Use workers
await ws.identify(shardID, ws.maxShards);
if (oldShard) {
oldShard.ws.close(4009, "Resharded!");
}
}
View File