big brains

This commit is contained in:
Skillz
2020-10-29 20:10:26 -04:00
parent b2424be013
commit bc29fa7d4d
2 changed files with 377 additions and 0 deletions

105
src/module/bigbrainbot.ts Normal file
View File

@@ -0,0 +1,105 @@
import { delay } from "../../deps.ts";
import { DiscordBotGatewayData, RequestManager } from "../../mod.ts";
import { endpoints } from "../constants/discord.ts";
import { ClientOptions, EventHandlers } from "../types/options.ts";
import { botGatewayData } from "./client.ts";
const botOptions = {
workers: new Map<number, Worker>(),
eventHandlers: {} as EventHandlers,
botGatewayData: {} as DiscordBotGatewayData,
customShards: [] as number[],
shardsPerWorker: 25,
identifyPayload: {
token: "",
compress: true,
properties: {
$os: "linux",
$browser: "Discordeno",
$device: "Discordeno",
},
intents: 0,
shard: [0, 0],
},
};
/**
* This function should be used only by bot developers whose bots are in over 25,000 servers.
* Please be aware if you are a beginner developer using this, things will not work as per the guides. This is for advanced developers only!
*
* Advanced Devs: This function will allow you to have an insane amount of customization potential as when you get to large bots you need to be able to optimize every tiny detail to make you bot work the way you need.
*/
export async function startBigBrainBot(data: BigBrainBotOptions) {
botOptions.identifyPayload.token = `Bot ${data.token}`;
if (data.eventHandlers) botOptions.eventHandlers = data.eventHandlers;
if (data.shards) botOptions.customShards = data.shards;
if (data.compress) botOptions.identifyPayload.compress = data.compress;
if (data.shardsPerWorker) botOptions.shardsPerWorker = data.shardsPerWorker;
// Initial API connection to get info about bots connection
botOptions.botGatewayData = await RequestManager.get(
endpoints.GATEWAY_BOT,
) as DiscordBotGatewayData;
botOptions.identifyPayload.intents = data.intents.reduce(
(bits, next) => (bits |= next),
0,
);
botOptions.identifyPayload.shard = [0, botGatewayData.shards];
spawnBigBrainBotShards();
}
async function spawnBigBrainBotShards(shardID = 0, skipChecks = 0) {
if (shardID >= botOptions.botGatewayData.shards) return;
// 25 shards but shards start at 0 so we use 24
const workerID = shardID % 24;
const worker = botOptions.workers.get(workerID)
// High max concurrency allows starting shards faster
if (skipChecks) {
// If the worker exists we just need to add
if (worker) {
addShardToWorker(workerID, shardID);
} else {
createShardWorker(workerID, shardID);
}
spawnBigBrainBotShards(shardID + 1, skipChecks - 1);
}
// Make sure we can create a shard or we are waiting for shards to connect still.
if (createNextShard) {
// !(shardid % botOptions.botGatewayData.session_start_limit.max_concurrency)
createNextShard = false;
if (botOptions.botGatewayData.shards >= 25) createShardWorker();
// Start the next few shards based on max concurrency
spawnBigBrainBotShards(shardID + 1, botOptions.botGatewayData.session_start_limit.max_concurrency);
return;
}
await delay(1000);
spawnBigBrainBotShards(shardID);
}
export function createShardWorker(workerID: number, shardID: number) {
const path = new URL("./shard.ts", import.meta.url).toString();
const shard = new Worker(path, { type: "module", deno: true });
// Add to worker map
botOptions.workers.set(workerID, shard)
}
export interface BigBrainBotOptions extends ClientOptions {
/** This can be used to distribute your bot across different servers. For example, if you wanted 1 million shards per server you could control it using this. */
shards?: [number, number];
/** This can be used to forward the ws handling to a proxy. */
wsURL?: string;
/** This can be used to forward the REST handling to a proxy. */
restURL?: string;
/** This allows you to control how many shards per worker. For the times where you can optimize with more shards per worker as your bot has less tasks per shard.
* @default 25
*/
shardsPerWorker?: number;
}

272
src/module/bigbrainshard.ts Normal file
View File

@@ -0,0 +1,272 @@
import type { WebSocket } from "../../deps.ts";
import { connectWebSocket, delay, isWebSocketCloseEvent } from "../../deps.ts";
import type {
DiscordBotGatewayData,
DiscordHeartbeatPayload,
ReadyPayload,
} from "../types/discord.ts";
import { GatewayOpcode } from "../types/discord.ts";
import type { FetchMembersOptions } from "../types/guild.ts";
import type { DebugArg } from "../types/options.ts";
let shardSocket: WebSocket;
/** The session id is needed for RESUME functionality when discord disconnects randomly. */
let sessionID = "";
// Discord requests null if no number has yet been sent by discord
let previousSequenceNumber: number | null = null;
let needToResume = false;
let shardID = 0;
const RequestMembersQueue: RequestMemberQueuedRequest[] = [];
let processQueue = false;
interface RequestMemberQueuedRequest {
guildID: string;
nonce: string;
options?: FetchMembersOptions;
}
async function processRequestMembersQueue() {
if (!RequestMembersQueue.length) {
processQueue = false;
return;
}
// 2 events per second is the rate limit.
const request = RequestMembersQueue.shift();
if (request) {
requestGuildMembers(request.guildID, request.nonce, request.options, true);
const secondRequest = RequestMembersQueue.shift();
if (secondRequest) {
requestGuildMembers(
secondRequest.guildID,
secondRequest.nonce,
secondRequest.options,
true,
);
}
}
await delay(1500);
postDebug(
{
type: "requestMembersProcessing",
data: { shardID, remaining: RequestMembersQueue.length },
},
);
processRequestMembersQueue();
}
// TODO: If a client does not receive a heartbeat ack between its attempts at sending heartbeats, it should immediately terminate the connection with a non-1000 close code, reconnect, and attempt to resume.
async function sendConstantHeartbeats(
interval: number,
) {
await delay(interval);
shardSocket.send(
JSON.stringify({ op: GatewayOpcode.Heartbeat, d: previousSequenceNumber }),
);
postDebug(
{ type: "heartbeat", data: { interval, previousSequenceNumber, shardID } },
);
sendConstantHeartbeats(interval);
}
async function resumeConnection(
botGatewayData: DiscordBotGatewayData,
identifyPayload: object,
) {
postDebug({ type: "resuming", data: { shardID } });
// Run it once
createShard(botGatewayData, identifyPayload, true);
// Then retry every 15 seconds
await delay(1000 * 15);
if (needToResume) resumeConnection(botGatewayData, identifyPayload);
}
const createShard = async (
botGatewayData: DiscordBotGatewayData,
identifyPayload: object,
resuming = false,
) => {
postDebug({ type: "createShard", data: { shardID } });
shardSocket = await connectWebSocket(botGatewayData.url);
let resumeInterval = 0;
if (!resuming) {
// Intial identify with the gateway
await shardSocket.send(
JSON.stringify({ op: GatewayOpcode.Identify, d: identifyPayload }),
);
} else {
await shardSocket.send(JSON.stringify({
op: GatewayOpcode.Resume,
d: {
...identifyPayload,
session_id: sessionID,
seq: previousSequenceNumber,
},
}));
}
for await (const message of shardSocket) {
if (typeof message === "string") {
const data = JSON.parse(message);
switch (data.op) {
case GatewayOpcode.Hello:
sendConstantHeartbeats(
(data.d as DiscordHeartbeatPayload).heartbeat_interval,
);
break;
case GatewayOpcode.Reconnect:
case GatewayOpcode.InvalidSession:
// When d is false we need to reidentify
if (!data.d) {
postDebug({ type: "invalidSession", data: { shardID } });
createShard(botGatewayData, identifyPayload);
break;
}
needToResume = true;
resumeConnection(botGatewayData, identifyPayload);
break;
default:
if (data.t === "RESUMED") {
postDebug({ type: "resumed", data: { shardID } });
needToResume = false;
break;
}
// Important for RESUME
if (data.t === "READY") {
sessionID = (data.d as ReadyPayload).session_id;
}
// Update the sequence number if it is present
if (data.s) previousSequenceNumber = data.s;
// @ts-ignore
postMessage(
{
type: "HANDLE_DISCORD_PAYLOAD",
payload: message,
resumeInterval,
shardID,
},
);
break;
}
} else if (isWebSocketCloseEvent(message)) {
postDebug({ type: "websocketClose", data: { shardID, message } });
// These error codes should just crash the projects
if ([4004, 4005, 4012, 4013, 4014].includes(message.code)) {
console.error(`Close :( ${JSON.stringify(message)}`);
postDebug({ type: "websocketErrored", data: { shardID, message } });
throw new Error(
"Shard.ts: Error occurred that is not resumeable or able to be reconnected.",
);
}
// These error codes can not be resumed but need to reconnect from start
if ([4003, 4007, 4008, 4009].includes(message.code)) {
postDebug(
{ type: "websocketReconnecting", data: { shardID, message } },
);
createShard(botGatewayData, identifyPayload);
} else {
needToResume = true;
resumeConnection(botGatewayData, identifyPayload);
}
}
}
};
function requestGuildMembers(
guildID: string,
nonce: string,
options?: FetchMembersOptions,
queuedRequest = false,
) {
// This request was not from this queue so we add it to queue first
if (!queuedRequest) {
RequestMembersQueue.push({
guildID,
nonce,
options,
});
if (!processQueue) {
processQueue = true;
processRequestMembersQueue();
}
return;
}
// If its closed add back to queue to redo on resume
if (shardSocket.isClosed) {
requestGuildMembers(guildID, nonce, options);
return;
}
shardSocket.send(JSON.stringify({
op: GatewayOpcode.RequestGuildMembers,
d: {
guild_id: guildID,
query: options?.query || "",
limit: options?.query || 0,
presences: options?.presences || false,
user_ids: options?.userIDs,
nonce,
},
}));
}
// TODO: Errors need to be fixed by VSC plugin
// @ts-ignore
postMessage({ type: "REQUEST_CLIENT_OPTIONS" });
// @ts-ignore
onmessage = (message: MessageEvent) => {
if (message.data.type === "CREATE_SHARD") {
createShard(
message.data.botGatewayData,
message.data.identifyPayload,
);
shardID = message.data.shardID;
}
if (message.data.type === "FETCH_MEMBERS") {
requestGuildMembers(
message.data.guildID,
message.data.nonce,
message.data.options,
);
}
if (message.data.type === "EDIT_BOTS_STATUS") {
shardSocket.send(JSON.stringify({
op: GatewayOpcode.StatusUpdate,
d: {
since: null,
game: message.data.game.name
? {
name: message.data.game.name,
type: message.data.game.type,
}
: null,
status: message.data.status,
afk: false,
},
}));
}
};
function postDebug(details: DebugArg) {
// TODO: Errors need to be fixed by VSC plugin
postMessage({ type: "DEBUG_LOG", details });
}