More conversion

This commit is contained in:
TriForMine
2021-10-08 08:05:04 +02:00
parent 101fd50acb
commit dcbf0fcaaa
17 changed files with 166 additions and 164 deletions

View File

@@ -6,7 +6,6 @@ import type { ModifyGuild } from "../../types/guilds/modify_guild.ts";
import { endpoints } from "../../util/constants.ts";
import { requireBotGuildPermissions } from "../../util/permissions.ts";
import { snakelize, urlToBase64 } from "../../util/utils.ts";
import { ws } from "../../ws/ws.ts";
/** Modify a guilds settings. Requires the MANAGE_GUILD permission. */
export async function editGuild(guildId: bigint, options: ModifyGuild) {

View File

@@ -5,7 +5,7 @@ import { Errors } from "../../types/discordeno/errors.ts";
import { DiscordGatewayIntents } from "../../types/gateway/gateway_intents.ts";
import type { RequestGuildMembers } from "../../types/members/request_guild_members.ts";
import { Collection } from "../../util/collection.ts";
import { ws } from "../../ws/ws.ts";
import {GatewayManager} from "../../bot.ts";
/**
* ⚠️ BEGINNER DEVS!! YOU SHOULD ALMOST NEVER NEED THIS AND YOU CAN GET FROM cache.members.get()
@@ -15,10 +15,10 @@ import { ws } from "../../ws/ws.ts";
* REST: 50/s global(across all shards) rate limit with ALL requests this included
* GW(this function): 120/m(PER shard) rate limit. Meaning if you have 8 shards your limit is now 960/m.
*/
export function fetchMembers(guildId: bigint, shardId: number, options?: Omit<RequestGuildMembers, "guildId">) {
export function fetchMembers(gateway: GatewayManager, guildId: bigint, shardId: number, options?: Omit<RequestGuildMembers, "guildId">) {
// You can request 1 member without the intent
// Check if intents is not 0 as proxy ws won't set intents in other instances
if (ws.identifyPayload.intents && (!options?.limit || options.limit > 1) && !(ws.identifyPayload.intents & DiscordGatewayIntents.GuildMembers)) {
// Check if intents is not 0 as proxy gateway won't set intents in other instances
if (gateway.identifyPayload.intents && (!options?.limit || options.limit > 1) && !(gateway.identifyPayload.intents & DiscordGatewayIntents.GuildMembers)) {
throw new Error(Errors.MISSING_INTENT_GUILD_MEMBERS);
}
@@ -30,7 +30,7 @@ export function fetchMembers(guildId: bigint, shardId: number, options?: Omit<Re
const nonce = `${guildId}-${Date.now()}`;
cache.fetchAllMembersProcessingRequests.set(nonce, resolve);
ws.sendShardMessage(shardId, {
gateway.sendShardMessage(gateway, shardId, {
op: DiscordGatewayOpcodes.RequestGuildMembers,
d: {
guild_id: guildId,

View File

@@ -1,14 +1,13 @@
import { eventHandlers } from "../../bot.ts";
import {eventHandlers, GatewayManager} from "../../bot.ts";
import { DiscordGatewayOpcodes } from "../../types/codes/gateway_opcodes.ts";
import type { StatusUpdate } from "../../types/gateway/status_update.ts";
import { snakelize } from "../../util/utils.ts";
import { ws } from "../../ws/ws.ts";
export function editBotStatus(data: Omit<StatusUpdate, "afk" | "since">) {
ws.shards.forEach((shard) => {
export function editBotStatus(gateway: GatewayManager, data: Omit<StatusUpdate, "afk" | "since">) {
gateway.shards.forEach((shard) => {
eventHandlers.debug?.("loop", `Running forEach loop in editBotStatus function.`);
ws.sendShardMessage(shard, {
gateway.sendShardMessage(gateway, shard, {
op: DiscordGatewayOpcodes.StatusUpdate,
d: {
since: null,

View File

@@ -3,18 +3,19 @@ import type { UpdateVoiceState } from "../../types/voice/update_voice_state.ts";
import { requireBotChannelPermissions } from "../../util/permissions.ts";
import { calculateShardId } from "../../util/calculate_shard_id.ts";
import { snakelize } from "../../util/utils.ts";
import { ws } from "../../ws/ws.ts";
import type { AtLeastOne } from "../../types/util.ts";
import {GatewayManager} from "../../bot.ts";
/** Connect or join a voice channel inside a guild. By default, the "selfDeaf" option is true. Requires `CONNECT` and `VIEW_CHANNEL` permissions. */
export async function connectToVoiceChannel(
gateway: GatewayManager,
guildId: bigint,
channelId: bigint,
options?: AtLeastOne<Omit<UpdateVoiceState, "guildId" | "channelId">>
) {
await requireBotChannelPermissions(channelId, ["CONNECT", "VIEW_CHANNEL"]);
ws.sendShardMessage(calculateShardId(guildId), {
gateway.sendShardMessage(gateway, calculateShardId(gateway, guildId), {
op: DiscordGatewayOpcodes.VoiceStateUpdate,
d: snakelize<UpdateVoiceState>({
guildId,

View File

@@ -1,7 +1,7 @@
import { ws } from "../ws/ws.ts";
import {GatewayManager} from "../bot.ts";
export function calculateShardId(guildId: bigint) {
if (ws.maxShards === 1) return 0;
export function calculateShardId(gateway: GatewayManager, guildId: bigint) {
if (gateway.maxShards === 1) return 0;
return Number((guildId >> 22n) % BigInt(ws.maxShards - 1));
return Number((guildId >> 22n) % BigInt(gateway.maxShards - 1));
}

View File

@@ -1,25 +1,25 @@
import { DiscordGatewayCloseEventCodes } from "../types/codes/gateway_close_event_codes.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
export function createShard(shardId: number) {
const socket = new WebSocket(ws.botGatewayData.url);
export function createShard(gateway: GatewayManager, shardId: number) {
const socket = new WebSocket(gateway.botGatewayData.url);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {
ws.log("ERROR", { shardId, error: errorEvent });
gateway.log("ERROR", { shardId, error: errorEvent });
};
socket.onmessage = ({ data: message }) => ws.handleOnMessage(message, shardId);
socket.onmessage = ({ data: message }) => gateway.handleOnMessage(gateway, message, shardId);
socket.onclose = async (event) => {
ws.log("CLOSED", { shardId, payload: event });
gateway.log("CLOSED", { shardId, payload: event });
if (event.code === 3064 || event.reason === "Discordeno Testing Finished! Do Not RESUME!") {
return;
}
if (event.code === 3065 || ["Resharded!", "Resuming the shard, closing old shard."].includes(event.reason)) {
return ws.log("CLOSED_RECONNECT", { shardId, payload: event });
return gateway.log("CLOSED_RECONNECT", { shardId, payload: event });
}
switch (event.code) {
@@ -31,7 +31,7 @@ export function createShard(shardId: number) {
case 3065: // Reidentifying
case 3066: // Missing ACK
// Will restart shard manually
return ws.log("CLOSED_RECONNECT", { shardId, payload: event });
return gateway.log("CLOSED_RECONNECT", { shardId, payload: event });
case DiscordGatewayCloseEventCodes.UnknownOpcode:
case DiscordGatewayCloseEventCodes.DecodeError:
case DiscordGatewayCloseEventCodes.AuthenticationFailed:
@@ -47,10 +47,10 @@ export function createShard(shardId: number) {
case DiscordGatewayCloseEventCodes.InvalidSeq:
case DiscordGatewayCloseEventCodes.RateLimited:
case DiscordGatewayCloseEventCodes.SessionTimedOut:
await ws.identify(shardId, ws.maxShards);
await gateway.identify(gateway, shardId, gateway.maxShards);
break;
default:
ws.resume(shardId);
gateway.resume(gateway, shardId);
break;
}
};

View File

@@ -1,11 +1,11 @@
import type { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */
export async function handleDiscordPayload(data: DiscordGatewayPayload, shardId: number) {
await fetch(ws.url, {
export async function handleDiscordPayload(gateway: GatewayManager, data: DiscordGatewayPayload, shardId: number) {
await fetch(gateway.url, {
headers: {
authorization: ws.secretKey,
authorization: gateway.secretKey,
},
method: "post",
body: JSON.stringify({

View File

@@ -1,4 +1,4 @@
import { eventHandlers } from "../bot.ts";
import {eventHandlers, GatewayManager} from "../bot.ts";
import { handlers } from "../handlers/mod.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import type { DiscordGatewayPayload } from "../types/gateway/gateway_payload.ts";
@@ -6,33 +6,33 @@ import type { DiscordHello } from "../types/gateway/hello.ts";
import type { DiscordReady } from "../types/gateway/ready.ts";
import { camelize, delay } from "../util/utils.ts";
import { decompressWith } from "./deps.ts";
import { ws } from "./ws.ts";
/** Handler for handling every message event from websocket. */
// deno-lint-ignore no-explicit-any
export async function handleOnMessage(message: any, shardId: number) {
export async function handleOnMessage(gateway: GatewayManager, message: any, shardId: number) {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(message, 0, (slice: Uint8Array) => ws.utf8decoder.decode(slice));
message = decompressWith(message, 0, (slice: Uint8Array) => gateway.utf8decoder.decode(slice));
}
if (typeof message !== "string") return;
const shard = ws.shards.get(shardId);
const shard = gateway.shards.get(shardId);
const messageData = JSON.parse(message) as DiscordGatewayPayload;
ws.log("RAW", { shardId, payload: messageData });
gateway.log("RAW", { shardId, payload: messageData });
switch (messageData.op) {
case DiscordGatewayOpcodes.Heartbeat:
if (shard?.ws.readyState !== WebSocket.OPEN) return;
if (shard?.gateway.readyState !== WebSocket.OPEN) return;
shard.heartbeat.lastSentAt = Date.now();
// Discord randomly sends this requiring an immediate heartbeat back
ws.sendShardMessage(
gateway.sendShardMessage(
gateway,
shard,
{
op: DiscordGatewayOpcodes.Heartbeat,
@@ -42,77 +42,77 @@ export async function handleOnMessage(message: any, shardId: number) {
);
break;
case DiscordGatewayOpcodes.Hello:
ws.heartbeat(shardId, (messageData.d as DiscordHello).heartbeat_interval);
gateway.heartbeat(gateway, shardId, (messageData.d as DiscordHello).heartbeat_interval);
break;
case DiscordGatewayOpcodes.HeartbeatACK:
if (ws.shards.has(shardId)) {
const shard = ws.shards.get(shardId)!;
if (gateway.shards.has(shardId)) {
const shard = gateway.shards.get(shardId)!;
shard.heartbeat.acknowledged = true;
shard.heartbeat.lastReceivedAt = Date.now();
}
break;
case DiscordGatewayOpcodes.Reconnect:
ws.log("RECONNECT", { shardId });
gateway.log("RECONNECT", { shardId });
if (ws.shards.has(shardId)) {
ws.shards.get(shardId)!.resuming = true;
if (gateway.shards.has(shardId)) {
gateway.shards.get(shardId)!.resuming = true;
}
ws.resume(shardId);
gateway.resume(gateway, shardId);
break;
case DiscordGatewayOpcodes.InvalidSession:
ws.log("INVALID_SESSION", { shardId, payload: messageData });
gateway.log("INVALID_SESSION", { shardId, payload: messageData });
// We need to wait for a random amount of time between 1 and 5: https://discord.com/developers/docs/topics/gateway#resuming
await delay(Math.floor((Math.random() * 4 + 1) * 1000));
// When d is false we need to reidentify
if (!messageData.d) {
await ws.identify(shardId, ws.maxShards);
await gateway.identify(gateway, shardId, gateway.maxShards);
break;
}
if (ws.shards.has(shardId)) {
ws.shards.get(shardId)!.resuming = true;
if (gateway.shards.has(shardId)) {
gateway.shards.get(shardId)!.resuming = true;
}
ws.resume(shardId);
gateway.resume(gateway, shardId);
break;
default:
if (messageData.t === "RESUMED") {
ws.log("RESUMED", { shardId });
gateway.log("RESUMED", { shardId });
if (ws.shards.has(shardId)) {
ws.shards.get(shardId)!.resuming = false;
if (gateway.shards.has(shardId)) {
gateway.shards.get(shardId)!.resuming = false;
}
break;
}
// Important for RESUME
if (messageData.t === "READY") {
const shard = ws.shards.get(shardId);
const shard = gateway.shards.get(shardId);
if (shard) {
shard.sessionId = (messageData.d as DiscordReady).session_id;
}
ws.loadingShards.get(shardId)?.resolve(true);
ws.loadingShards.delete(shardId);
gateway.loadingShards.get(shardId)?.resolve(true);
gateway.loadingShards.delete(shardId);
// Wait few seconds to spawn next shard
setTimeout(() => {
const bucket = ws.buckets.get(shardId % ws.botGatewayData.sessionStartLimit.maxConcurrency);
const bucket = gateway.buckets.get(shardId % gateway.botGatewayData.sessionStartLimit.maxConcurrency);
if (bucket) bucket.createNextShard.shift()?.();
}, ws.spawnShardDelay);
}, gateway.spawnShardDelay);
}
// Update the sequence number if it is present
if (messageData.s) {
const shard = ws.shards.get(shardId);
const shard = gateway.shards.get(shardId);
if (shard) {
shard.previousSequenceNumber = messageData.s;
}
}
if (ws.url) await ws.handleDiscordPayload(messageData, shardId);
if (gateway.url) await gateway.handleDiscordPayload(gateway, messageData, shardId);
else {
eventHandlers.raw?.(messageData);
await eventHandlers.dispatchRequirements?.(messageData, shardId);

View File

@@ -1,21 +1,21 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { delay } from "../util/utils.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
export async function heartbeat(shardId: number, interval: number) {
ws.log("HEARTBEATING_STARTED", { shardId, interval });
export async function heartbeat(gateway: GatewayManager, shardId: number, interval: number) {
gateway.log("HEARTBEATING_STARTED", { shardId, interval });
const shard = ws.shards.get(shardId);
const shard = gateway.shards.get(shardId);
if (!shard) return;
ws.log("HEARTBEATING_DETAILS", { shardId, interval, shard });
gateway.log("HEARTBEATING_DETAILS", { shardId, interval, shard });
// The first heartbeat is special so we send it without setInterval: https://discord.com/developers/docs/topics/gateway#heartbeating
await delay(Math.floor(shard.heartbeat.interval * Math.random()));
if (shard.ws.readyState !== WebSocket.OPEN) return;
if (shard.gateway.readyState !== WebSocket.OPEN) return;
shard.ws.send(
shard.gateway.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Heartbeat,
d: shard.previousSequenceNumber,
@@ -28,29 +28,29 @@ export async function heartbeat(shardId: number, interval: number) {
shard.heartbeat.interval = interval;
shard.heartbeat.intervalId = setInterval(async () => {
ws.log("DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`);
const currentShard = ws.shards.get(shardId);
gateway.log("DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`);
const currentShard = gateway.shards.get(shardId);
if (!currentShard) return;
ws.log("HEARTBEATING", { shardId, shard: currentShard });
gateway.log("HEARTBEATING", { shardId, shard: currentShard });
if (currentShard.ws.readyState === WebSocket.CLOSED || !currentShard.heartbeat.keepAlive) {
ws.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard });
if (currentShard.gateway.readyState === WebSocket.CLOSED || !currentShard.heartbeat.keepAlive) {
gateway.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard });
// STOP THE HEARTBEAT
return clearInterval(shard.heartbeat.intervalId);
}
if (!currentShard.heartbeat.acknowledged) {
ws.closeWS(currentShard.ws, 3066, "Did not receive an ACK in time.");
return await ws.identify(shardId, ws.maxShards);
gateway.closeWS(currentShard.gateway, 3066, "Did not receive an ACK in time.");
return await gateway.identify(gateway, shardId, gateway.maxShards);
}
if (currentShard.ws.readyState !== WebSocket.OPEN) return;
if (currentShard.gateway.readyState !== WebSocket.OPEN) return;
currentShard.heartbeat.acknowledged = false;
currentShard.ws.send(
currentShard.gateway.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Heartbeat,
d: currentShard.previousSequenceNumber,

View File

@@ -1,23 +1,23 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
export function identify(shardId: number, maxShards: number) {
ws.log("IDENTIFYING", { shardId, maxShards });
export function identify(gateway: GatewayManager, shardId: number, maxShards: number) {
gateway.log("IDENTIFYING", { shardId, maxShards });
// Need to clear the old heartbeat interval
const oldShard = ws.shards.get(shardId);
const oldShard = gateway.shards.get(shardId);
if (oldShard) {
ws.closeWS(oldShard.ws, 3065, "Reidentifying closure of old shard");
gateway.closeWS(oldShard.gateway, 3065, "Reidentifying closure of old shard");
clearInterval(oldShard.heartbeat.intervalId);
}
// CREATE A SHARD
const socket = ws.createShard(shardId);
const socket = gateway.createShard(gateway, shardId);
// Identify can just set/reset the settings for the shard
ws.shards.set(shardId, {
gateway.shards.set(shardId, {
id: shardId,
ws: socket,
gateway: socket,
resumeInterval: 0,
sessionId: "",
previousSequenceNumber: 0,
@@ -39,11 +39,12 @@ export function identify(shardId: number, maxShards: number) {
});
socket.onopen = () => {
ws.sendShardMessage(
gateway.sendShardMessage(
gateway,
shardId,
{
op: DiscordGatewayOpcodes.Identify,
d: { ...ws.identifyPayload, shard: [shardId, maxShards] },
d: { ...gateway.identifyPayload, shard: [shardId, maxShards] },
},
true
);
@@ -54,7 +55,7 @@ export function identify(shardId: number, maxShards: number) {
reject(`[Identify Failure] Shard ${shardId} has not received READY event in over a minute.`);
}, 600000);
ws.loadingShards.set(shardId, {
gateway.loadingShards.set(shardId, {
shardId,
resolve: (args) => {
clearTimeout(timeout);

View File

@@ -1,16 +1,16 @@
import { loopObject } from "../util/loop_object.ts";
import { delay } from "../util/utils.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
export async function processQueue(id: number) {
const shard = ws.shards.get(id);
export async function processQueue(gateway: GatewayManager, id: number) {
const shard = gateway.shards.get(id);
// If no items or its already processing then exit
if (!shard?.queue.length || shard.processingQueue) return;
shard.processingQueue = true;
while (shard.queue.length) {
if (shard.ws.readyState !== WebSocket.OPEN) {
if (shard.gateway.readyState !== WebSocket.OPEN) {
shard.processingQueue = false;
return;
}
@@ -34,20 +34,20 @@ export async function processQueue(id: number) {
: Array.isArray(value)
? value.map((v) => (typeof v === "bigint" ? v.toString() : v))
: value,
`Running forEach loop in ws.processQueue function for changing bigints to strings.`
`Running forEach loop in gateway.processQueue function for changing bigints to strings.`
);
}
ws.log("RAW_SEND", shard.id, request);
gateway.log("RAW_SEND", shard.id, request);
shard.ws.send(JSON.stringify(request));
shard.gateway.send(JSON.stringify(request));
// Counter is useful for preventing 120/m requests.
shard.queueCounter++;
// Handle if the requests have been maxed
if (shard.queueCounter >= 118) {
ws.log("DEBUG", {
gateway.log("DEBUG", {
message: "Max gateway requests per minute reached setting timeout for one minute",
shardId: shard.id,
});

View File

@@ -1,30 +1,30 @@
import { getGatewayBot } from "../helpers/misc/get_gateway_bot.ts";
import { ws } from "./ws.ts";
import { GatewayManager } from "../bot.ts";
/** The handler to automatically reshard when necessary. */
export async function resharder() {
ws.botGatewayData = await getGatewayBot();
export async function resharder(gateway: GatewayManager) {
gateway.botGatewayData = await getGatewayBot();
const percentage = ((ws.botGatewayData.shards - ws.maxShards) / ws.maxShards) * 100;
const percentage = ((gateway.botGatewayData.shards - gateway.maxShards) / gateway.maxShards) * 100;
// Less than necessary% being used so do nothing
if (percentage < ws.reshardPercentage) return;
if (percentage < gateway.reshardPercentage) return;
// Don't have enough identify rate limits to reshard
if (ws.botGatewayData.sessionStartLimit.remaining < ws.botGatewayData.shards) {
if (gateway.botGatewayData.sessionStartLimit.remaining < gateway.botGatewayData.shards) {
return;
}
// Begin resharding
ws.maxShards = ws.botGatewayData.shards;
gateway.maxShards = gateway.botGatewayData.shards;
// If more than 100K servers, begin switching to 16x sharding
if (ws.maxShards && ws.useOptimalLargeBotSharding) {
ws.maxShards = Math.ceil(
ws.maxShards /
(ws.botGatewayData.sessionStartLimit.maxConcurrency === 1
if (gateway.maxShards && gateway.useOptimalLargeBotSharding) {
gateway.maxShards = Math.ceil(
gateway.maxShards /
(gateway.botGatewayData.sessionStartLimit.maxConcurrency === 1
? 16
: ws.botGatewayData.sessionStartLimit.maxConcurrency)
: gateway.botGatewayData.sessionStartLimit.maxConcurrency)
);
}
ws.spawnShards(ws.firstShardId);
gateway.spawnShards(gateway, gateway.firstShardId);
}

View File

@@ -1,29 +1,29 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
export function resume(shardId: number) {
ws.log("RESUMING", { shardId });
export function resume(gateway: GatewayManager, shardId: number) {
gateway.log("RESUMING", { shardId });
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardId);
const oldShard = gateway.shards.get(shardId);
if (oldShard) {
// HOW TO CLOSE OLD SHARD SOCKET!!!
ws.closeWS(oldShard.ws, 3064, "Resuming the shard, closing old shard.");
gateway.closeWS(oldShard.gateway, 3064, "Resuming the shard, closing old shard.");
// STOP OLD HEARTBEAT
clearInterval(oldShard.heartbeat.intervalId);
}
// CREATE A SHARD
const socket = ws.createShard(shardId);
const socket = gateway.createShard(gateway, shardId);
const sessionId = oldShard?.sessionId || "";
const previousSequenceNumber = oldShard?.previousSequenceNumber || 0;
ws.shards.set(shardId, {
gateway.shards.set(shardId, {
id: shardId,
ws: socket,
gateway: socket,
resumeInterval: 0,
sessionId: sessionId,
previousSequenceNumber: previousSequenceNumber,
@@ -46,12 +46,13 @@ export function resume(shardId: number) {
// Resume on open
socket.onopen = () => {
ws.sendShardMessage(
gateway.sendShardMessage(
gateway,
shardId,
{
op: DiscordGatewayOpcodes.Resume,
d: {
token: ws.identifyPayload.token,
token: gateway.identifyPayload.token,
session_id: sessionId,
seq: previousSequenceNumber,
},

View File

@@ -1,7 +1,8 @@
import { DiscordenoShard, WebSocketRequest, ws } from "./ws.ts";
import { DiscordenoShard, WebSocketRequest } from "./ws.ts";
import { GatewayManager } from "../bot.ts";
export function sendShardMessage(shard: number | DiscordenoShard, message: WebSocketRequest, highPriority = false) {
if (typeof shard === "number") shard = ws.shards.get(shard)!;
export function sendShardMessage(gateway: GatewayManager, shard: number | DiscordenoShard, message: WebSocketRequest, highPriority = false) {
if (typeof shard === "number") shard = gateway.shards.get(shard)!;
if (!shard) return;
if (!highPriority) {
@@ -10,5 +11,5 @@ export function sendShardMessage(shard: number | DiscordenoShard, message: WebSo
shard.queue.unshift(message);
}
ws.processQueue(shard.id);
gateway.processQueue(gateway, shard.id);
}

View File

@@ -1,35 +1,35 @@
/** Begin spawning shards. */
import {Bot} from "../bot.ts";
import { GatewayManager } from "../bot.ts";
export function spawnShards(bot: Bot, firstShardId = 0) {
export function spawnShards(gateway: GatewayManager, firstShardId = 0) {
/** Stored as bucketId: [clusterId, [ShardIds]] */
const maxShards = bot.gateway.maxShards || bot.gateway.botGatewayData.shards;
const maxShards = gateway.maxShards || gateway.botGatewayData.shards;
let cluster = 0;
for (let index = firstShardId; index < bot.gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) {
bot.gateway.log("DEBUG", `1. Running for loop in spawnShards function.`);
for (let index = firstShardId; index < gateway.botGatewayData.sessionStartLimit.maxConcurrency; index++) {
gateway.log("DEBUG", `1. Running for loop in spawnShards function.`);
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
bot.gateway.log("DEBUG", `2. Running for loop in spawnShards function.`);
const bucketId = i % bot.gateway.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = bot.gateway.buckets.get(bucketId);
gateway.log("DEBUG", `2. Running for loop in spawnShards function.`);
const bucketId = i % gateway.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = gateway.buckets.get(bucketId);
if (!bucket) {
// Create the bucket since it doesnt exist
bot.gateway.buckets.set(bucketId, {
gateway.buckets.set(bucketId, {
clusters: [[cluster, i]],
createNextShard: [],
});
if (cluster + 1 <= bot.gateway.maxClusters) cluster++;
if (cluster + 1 <= gateway.maxClusters) cluster++;
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.clusters.find((q) => q.length < bot.gateway.shardsPerCluster + 1);
const queue = bucket.clusters.find((q) => q.length < gateway.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (cluster + 1 <= bot.gateway.maxClusters) cluster++;
if (cluster + 1 <= gateway.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.clusters.push([cluster, i]);
}
@@ -38,14 +38,14 @@ export function spawnShards(bot: Bot, firstShardId = 0) {
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
bot.gateway.buckets.forEach((bucket, bucketId) => {
bot.gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`);
gateway.buckets.forEach((bucket, bucketId) => {
gateway.log("DEBUG", `3. Running forEach loop in spawnShards function.`);
for (const [clusterId, ...queue] of bucket.clusters) {
bot.gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`);
gateway.log("DEBUG", `4. Running for of loop in spawnShards function.`);
queue.forEach((shardId) => {
bucket.createNextShard.push(async () => {
await bot.gateway.tellClusterToIdentify(clusterId, shardId, bucketId);
await gateway.tellClusterToIdentify(gateway, clusterId, shardId, bucketId);
});
});

View File

@@ -2,40 +2,40 @@ import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts";
import type { GetGatewayBot } from "../types/gateway/get_gateway_bot.ts";
import { camelize } from "../util/utils.ts";
import { StartGatewayOptions } from "./start_gateway_options.ts";
import { ws } from "./ws.ts";
import { GatewayManager } from "../bot.ts";
/** ADVANCED DEVS ONLY!!!!!!
* Starts the standalone gateway.
* This will require starting the bot separately.
*/
export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.secretKey = options.secretKey;
ws.firstShardId = options.firstShardId;
ws.url = options.url;
if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) ws.maxClusters = options.maxClusters;
export async function startGateway(gateway: GatewayManager, options: StartGatewayOptions) {
gateway.identifyPayload.token = `Bot ${options.token}`;
gateway.secretKey = options.secretKey;
gateway.firstShardId = options.firstShardId;
gateway.url = options.url;
if (options.shardsPerCluster) gateway.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) gateway.maxClusters = options.maxClusters;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
gateway.identifyPayload.compress = options.compress;
}
if (options.reshard) ws.reshard = options.reshard;
if (options.reshard) gateway.reshard = options.reshard;
// Once an hour check if resharding is necessary
setInterval(ws.resharder, 1000 * 60 * 60);
setInterval(() => gateway.resharder(gateway), 1000 * 60 * 60);
ws.identifyPayload.intents = options.intents.reduce(
(bits, next) => (bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next),
0
gateway.identifyPayload.intents = options.intents.reduce(
(bits, next) => (bits |= typeof next === "string" ? DiscordGatewayIntents[next] : next),
0
);
ws.botGatewayData = camelize(
await fetch(`https://discord.com/api/gateway/bot`, {
headers: { Authorization: ws.identifyPayload.token },
}).then((res) => res.json())
gateway.botGatewayData = camelize(
await fetch(`https://discord.com/api/gateway/bot`, {
headers: { Authorization: gateway.identifyPayload.token },
}).then((res) => res.json())
) as GetGatewayBot;
ws.maxShards = options.maxShards || ws.botGatewayData.shards;
ws.lastShardId = options.lastShardId || ws.botGatewayData.shards - 1;
gateway.maxShards = options.maxShards || gateway.botGatewayData.shards;
gateway.lastShardId = options.lastShardId || gateway.botGatewayData.shards - 1;
ws.spawnShards(ws.firstShardId);
gateway.spawnShards(gateway, gateway.firstShardId);
}

View File

@@ -1,6 +1,6 @@
import { ws } from "./ws.ts";
import {GatewayManager} from "../bot.ts";
/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */
export async function tellClusterToIdentify(_workerId: number, shardId: number, _bucketId: number) {
await ws.identify(shardId, ws.maxShards);
export async function tellClusterToIdentify(gateway: GatewayManager, _workerId: number, shardId: number, _bucketId: number) {
await gateway.identify(gateway, shardId, gateway.maxShards);
}