This commit is contained in:
Skillz4Killz
2021-01-29 21:56:44 +00:00
committed by GitHub
parent 412fb71720
commit 5734e2cc64
6 changed files with 470 additions and 1 deletions
+1
View File
@@ -0,0 +1 @@
export { decompress_with as decompressWith } from "https://unpkg.com/@evan/wasm@0.0.40/target/zlib/deno.js";
+80
View File
@@ -0,0 +1,80 @@
import { getGatewayBot } from "../../api/handlers/gateway.ts";
import { Intents } from "../../types/options.ts";
import { ws } from "./ws.ts";
/** ADVANCED DEVS ONLY!!!!!!
* Starts the standalone gateway.
* This will require starting the bot separately.
*/
export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.firstShardID = options.firstShardID;
ws.url = options.url;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
}
ws.identifyPayload.intents = options.intents.reduce(
(bits, next) => (bits |= typeof next === "string" ? Intents[next] : next),
0,
);
const data = await getGatewayBot();
ws.maxShards = options.maxShards || data.shards;
ws.lastShardID = options.lastShardID || data.shards - 1;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
}
export function spawnShards(shardID: number) {
let skipChecks = 0;
while (shardID <= ws.lastShardID) {
if (skipChecks) {
// Start The shard
ws.identify(shardID, ws.maxShards);
shardID++;
skipChecks--;
continue;
}
// Previous shards is still not fully ready.
if (!ws.createNextShard) continue;
// Allows next iteration to create shard
ws.createNextShard = false;
// Set the amount of shards to start up be the bots max concurrency limit
skipChecks = ws.botGatewayData.sessionStartLimit.maxConcurrency;
}
}
export interface StartGatewayOptions {
/** The bot token. */
token: string;
/** Whether or not to use compression for gateway payloads. */
compress?: boolean;
/** The intents you would like to enable. */
intents: (Intents | keyof typeof Intents)[];
/** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */
maxShards?: number;
/** The first shard ID for this group of shards. */
firstShardID: number;
/** The last shard ID for this group. If none is provided, it will default to loading all shards. */
lastShardID?: number;
/** The url to forward all payloads to. */
url: string;
}
View File
+293
View File
@@ -0,0 +1,293 @@
import {
DiscordHeartbeatPayload,
DiscordPayload,
GatewayOpcode,
ReadyPayload,
} from "../../types/discord.ts";
import { decompressWith } from "./deps.ts";
import { ws } from "./ws.ts";
export function resume(shardID: number) {
// TODO: Log that this is happening
// CREATE A SHARD
const socket = ws.createShard(shardID);
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardID);
// TODO: HOW TO CLOSE OLD SHARD SOCKET!!!
// TODO: STOP OLD HEARTBEAT
const sessionID = oldShard?.sessionID || "";
const previousSequenceNumber = oldShard?.previousSequenceNumber || 0;
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID,
previousSequenceNumber,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
// Resume on open
socket.onopen = () => {
socket.send(JSON.stringify({
op: GatewayOpcode.Resume,
d: {
token: ws.identifyPayload.token,
session_id: sessionID,
seq: previousSequenceNumber,
},
}));
};
}
export function identify(shardID: number, maxShards: number) {
// TODO: Log that this is happening
// CREATE A SHARD
const socket = ws.createShard(shardID);
// Identify can just set/reset the settings for the shard
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID: "",
previousSequenceNumber: 0,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
socket.send(
JSON.stringify(
{
op: GatewayOpcode.Identify,
d: { ...ws.identifyPayload, shard: [shardID, maxShards] },
},
),
);
}
export function heartbeat(shardID: number, interval: number) {
// TODO: Log that this is happening
const shard = ws.shards.get(shardID);
if (!shard) return;
shard.heartbeat.keepAlive = true;
shard.heartbeat.acknowledged = false;
shard.heartbeat.lastSentAt = Date.now();
shard.heartbeat.interval = interval;
shard.heartbeat.intervalID = setInterval(() => {
// TODO: Log that this is happening
const currentShard = ws.shards.get(shardID);
if (!currentShard) return;
if (
currentShard.ws.readyState === WebSocket.CLOSED ||
!currentShard.heartbeat.keepAlive
) {
// TODO: Log that this is happening
// STOP THE HEARTBEAT
return clearInterval(currentShard.heartbeat.intervalID);
}
currentShard.ws.send(
JSON.stringify(
{
op: GatewayOpcode.Heartbeat,
d: currentShard.previousSequenceNumber,
},
),
);
}, interval);
}
export function createShard(shardID: number) {
const socket = new WebSocket(ws.botGatewayData.url);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {
// TODO: Log that this is happening
// eventHandlers.debug?.({
// type: "wsError",
// data: { shardID, ...errorEvent },
// });
};
socket.onmessage = ({ data: message }) => {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(
message,
0,
(slice: Uint8Array) => ws.utf8decoder.decode(slice),
);
}
if (typeof message !== "string") return;
const messageData = JSON.parse(message);
// TODO: Log that this is happening
// if (!messageData.t) eventHandlers.rawGateway?.(messageData);
switch (messageData.op) {
case GatewayOpcode.Hello:
ws.heartbeat(
shardID,
(messageData.d as DiscordHeartbeatPayload).heartbeat_interval,
);
break;
case GatewayOpcode.HeartbeatACK:
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.heartbeat.acknowledged = true;
}
break;
case GatewayOpcode.Reconnect:
// TODO: Log that this is happening
// eventHandlers.debug?.(
// { type: "gatewayReconnect", data: { shardID } },
// );
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
case GatewayOpcode.InvalidSession:
// TODO: Log that this is happening
// eventHandlers.debug?.(
// {
// type: "gatewayInvalidSession",
// data: { shardID, data },
// },
// );
// When d is false we need to reidentify
if (!messageData.d) {
identify(shardID, ws.maxShards);
break;
}
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
default:
if (messageData.t === "RESUMED") {
// TODO: Log that this is happening
// eventHandlers.debug?.(
// { type: "gatewayResumed", data: { shardID } },
// );
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = false;
}
break;
}
// Important for RESUME
if (messageData.t === "READY") {
const shard = ws.shards.get(shardID);
if (shard) {
shard.sessionID = (messageData.d as ReadyPayload).session_id;
}
}
// Update the sequence number if it is present
if (messageData.s) {
const shard = ws.shards.get(shardID);
if (shard) {
shard.previousSequenceNumber = messageData.s;
}
}
ws.handleDiscordPayload(messageData, shardID);
break;
}
};
socket.onclose = ({ reason, code, wasClean }) => {
// TODO: Log that this is happening
// eventHandlers.debug?.(
// {
// type: "wsClose",
// data: { shardID, code, reason, wasClean },
// },
// );
// TODO: ENUM FOR THESE CODES?
switch (code) {
case 4001:
case 4002:
case 4004:
case 4005:
case 4010:
case 4011:
case 4012:
case 4013:
case 4014:
throw new Error(
reason || "Discord gave no reason! GG! You broke Discord!",
);
// THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY!
case 4003:
case 4007:
case 4008:
case 4009:
// TODO: Log that this is happening
// eventHandlers.debug?.({
// type: "wsReconnect",
// data: { shardID, code, reason, wasClean },
// });
identify(shardID, ws.maxShards);
break;
default:
resume(shardID);
break;
}
};
return socket;
}
export async function handleDiscordPayload(
data: DiscordPayload,
shardID: number,
) {
// TODO: Log that this is happening
// eventHandlers.raw?.(data);
// await eventHandlers.dispatchRequirements?.(data, shardID);
await fetch(ws.url, {
method: "post",
body: JSON.stringify({
shardID,
data,
}),
}).catch(console.error);
}
+95
View File
@@ -0,0 +1,95 @@
import { Collection } from "../../util/collection.ts";
import { spawnShards, startGateway } from "./manager.ts";
import {
createShard,
handleDiscordPayload,
heartbeat,
identify,
} from "./shard.ts";
// CONTROLLER LIKE INTERFACE FOR WS HANDLING
export const ws = {
/** The url that all discord payloads for the dispatch type should be sent to. */
url: "",
/** The maximum shard ID number. Useful for zero-downtime updates or resharding. */
maxShards: 1,
/** The first shard ID to start spawning. */
firstShardID: 0,
/** The last shard ID for this cluster. */
lastShardID: 1,
/** This prop decides whether Discord allows our next shard to be started. When 1 starts, this is set to false until it is ready for the next one. */
createNextShard: true,
/** The identify payload holds the necessary data to connect and stay connected with Discords WSS. */
identifyPayload: {
token: "",
compress: false,
properties: {
$os: "linux",
$browser: "Discordeno",
$device: "Discordeno",
},
intents: 0,
shard: [0, 0],
},
botGatewayData: {
/** The WSS URL that can be used for connecting to the gateway. */
url: "wss://gateway.discord.gg/?v=8&encoding=json",
/** The recommended number of shards to use when connecting. */
shards: 1,
/** Info on the current start limit. */
sessionStartLimit: {
/** The total number of session starts the current user is allowed. */
total: 1000,
/** The remaining number of session starts the current user is allowed. */
remaining: 1000,
/** Milliseconds left until limit is reset. */
resetAfter: 0,
/** The number of identify requests allowed per 5 seconds.
* So, if you had a max concurrency of 16, and 16 shards for example, you could start them all up at the same time.
* Whereas if you had 32 shards, if you tried to start up shard 0 and 16 at the same time for example, it would not work. You can start shards 0-15 concurrently, then 16-31...
* */
maxConcurrency: 1,
},
},
shards: new Collection<number, DiscordenoShard>(),
utf8decoder: new TextDecoder(),
// METHODS
/** The handler function that starts the gateway. */
startGateway,
/** The handler for spawning ALL the shards. */
spawnShards,
createShard,
identify,
heartbeat,
handleDiscordPayload,
};
export interface DiscordenoShard {
/** The shard id number */
id: number;
/** The websocket for this shard */
ws: WebSocket;
resumeInterval: number;
/** The session id important for resuming connections. */
sessionID: string;
/** The previous sequence number, important for resuming connections. */
previousSequenceNumber: number | null;
/** Whether the shard is currently resuming. */
resuming: boolean;
heartbeat: {
/** The exact timestamp the last heartbeat was sent */
lastSentAt: number;
/** The timestamp the last heartbeat ACK was received from discord. */
lastReceivedAt: number;
/** Whether or not the heartbeat was acknowledged by discord in time. */
acknowledged: boolean;
/** Whether or not to keep heartbeating. Useful for when needing to stop heartbeating. */
keepAlive: boolean;
/** The interval between heartbeats requested by discord. */
interval: number;
/** The id of the interval, useful for stopping the interval if ws closed. */
intervalID: number;
};
}
+1 -1
View File
@@ -39,7 +39,7 @@ export async function spawnShards(
shardID,
data.shards > lastShardID ? data.shards : lastShardID,
];
// Start The shard
// Startx The shard
await createShard(data, payload, false, shardID);
// Spawn next shard
await spawnShards(