From 6d32d283b9ec6caa5edc529c423fdb782759c35f Mon Sep 17 00:00:00 2001 From: Skillz4Killz <23035000+Skillz4Killz@users.noreply.github.com> Date: Mon, 15 Feb 2021 19:08:08 +0000 Subject: [PATCH] add log event --- src/ws/proxy/events.ts | 21 ++++++++++ src/ws/proxy/shard.ts | 87 +++++++++++++++++------------------------- src/ws/proxy/ws.ts | 2 + 3 files changed, 59 insertions(+), 51 deletions(-) create mode 100644 src/ws/proxy/events.ts diff --git a/src/ws/proxy/events.ts b/src/ws/proxy/events.ts new file mode 100644 index 000000000..76efa8ec9 --- /dev/null +++ b/src/ws/proxy/events.ts @@ -0,0 +1,21 @@ +import { DiscordPayload } from "../../types/discord.ts"; +import { DiscordenoShard } from "./ws.ts"; + +/** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */ +export function log(type: "CLOSED", data: { shardID: number, payload: CloseEvent }): unknown; +export function log(type: "CLOSED_RECONNECT", data: { shardID: number, payload: CloseEvent }): unknown; +export function log(type: "ERROR", data: Record & { shardID: number }): unknown; +export function log(type: "HEARTBEATING", data: { shardID: number, shard: DiscordenoShard }): unknown; +export function log(type: "HEARTBEATING_CLOSED", data: { shardID: number, shard: DiscordenoShard }): unknown; +export function log(type: "HEARTBEATING_DETAILS", data: { shardID: number, interval: number, shard: DiscordenoShard }): unknown; +export function log(type: "HEARTBEATING_STARTED", data: { shardID: number, interval: number }): unknown; +export function log(type: "IDENTIFYING", data: { shardID: number, maxShards: number }): unknown; +export function log(type: "INVALID_SESSION", data: { shardID: number, payload: DiscordPayload }): unknown; +export function log(type: "RAW", data: Record): unknown; +export function log(type: "RECONNECT", data: { shardID: number }): unknown; +export function log(type: "RESUMED", data: { shardID: number }): unknown; +export function log(type: "RESUMING", data: { shardID: number }): unknown; +export function log(type: "CLOSED" | "CLOSED_RECONNECT" | "ERROR" | "HEARTBEATING" | "HEARTBEATING_CLOSED" | "HEARTBEATING_DETAILS" | "HEARTBEATING_STARTED" | "IDENTIFYING" | "INVALID_SESSION" | "RAW" | "RECONNECT" | "RESUMED" | "RESUMING", data: unknown) { + console.log(type, data); +} + diff --git a/src/ws/proxy/shard.ts b/src/ws/proxy/shard.ts index 41b8da7db..15a735584 100644 --- a/src/ws/proxy/shard.ts +++ b/src/ws/proxy/shard.ts @@ -7,17 +7,23 @@ import { import { decompressWith } from "./deps.ts"; import { ws } from "./ws.ts"; -export function resume(shardID: number) { - // TODO: Log that this is happening +export async function resume(shardID: number) { + ws.log("RESUMING", { shardID }); // CREATE A SHARD - const socket = ws.createShard(shardID); + const socket = await ws.createShard(shardID); // NOW WE HANDLE RESUMING THIS SHARD // Get the old data for this shard necessary for resuming const oldShard = ws.shards.get(shardID); - // TODO: HOW TO CLOSE OLD SHARD SOCKET!!! - // TODO: STOP OLD HEARTBEAT + + if (oldShard) { + // HOW TO CLOSE OLD SHARD SOCKET!!! + oldShard.ws.close(4009, "Resuming the shard, closing old shard."); + // STOP OLD HEARTBEAT + clearInterval(oldShard.heartbeat.intervalID); + } + const sessionID = oldShard?.sessionID || ""; const previousSequenceNumber = oldShard?.previousSequenceNumber || 0; @@ -51,11 +57,11 @@ export function resume(shardID: number) { }; } -export function identify(shardID: number, maxShards: number) { - // TODO: Log that this is happening +export async function identify(shardID: number, maxShards: number) { + ws.log("IDENTIFYING", { shardID, maxShards }) // CREATE A SHARD - const socket = ws.createShard(shardID); + const socket = await ws.createShard(shardID); // Identify can just set/reset the settings for the shard ws.shards.set(shardID, { @@ -86,27 +92,29 @@ export function identify(shardID: number, maxShards: number) { } export function heartbeat(shardID: number, interval: number) { - // TODO: Log that this is happening + ws.log("HEARTBEATING_STARTED", { shardID, interval }); const shard = ws.shards.get(shardID); if (!shard) return; + ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard }); + shard.heartbeat.keepAlive = true; shard.heartbeat.acknowledged = false; shard.heartbeat.lastSentAt = Date.now(); shard.heartbeat.interval = interval; shard.heartbeat.intervalID = setInterval(() => { - // TODO: Log that this is happening - const currentShard = ws.shards.get(shardID); if (!currentShard) return; + ws.log("HEARTBEATING", { shardID, shard: currentShard }); + if ( currentShard.ws.readyState === WebSocket.CLOSED || !currentShard.heartbeat.keepAlive ) { - // TODO: Log that this is happening + ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard }); // STOP THE HEARTBEAT return clearInterval(currentShard.heartbeat.intervalID); @@ -123,17 +131,13 @@ export function heartbeat(shardID: number, interval: number) { }, interval); } -export function createShard(shardID: number) { +// deno-lint-ignore require-await +export async function createShard(shardID: number) { const socket = new WebSocket(ws.botGatewayData.url); socket.binaryType = "arraybuffer"; socket.onerror = (errorEvent) => { - // TODO: Log that this is happening - - // eventHandlers.debug?.({ - // type: "wsError", - // data: { shardID, ...errorEvent }, - // }); + ws.log("ERROR", { shardID, error: errorEvent }); }; socket.onmessage = ({ data: message }) => { @@ -152,8 +156,8 @@ export function createShard(shardID: number) { if (typeof message !== "string") return; const messageData = JSON.parse(message); - // TODO: Log that this is happening - // if (!messageData.t) eventHandlers.rawGateway?.(messageData); + ws.log("RAW", messageData); + switch (messageData.op) { case GatewayOpcode.Hello: ws.heartbeat( @@ -167,10 +171,8 @@ export function createShard(shardID: number) { } break; case GatewayOpcode.Reconnect: - // TODO: Log that this is happening - // eventHandlers.debug?.( - // { type: "gatewayReconnect", data: { shardID } }, - // ); + ws.log("RECONNECT", { shardID }); + if (ws.shards.has(shardID)) { ws.shards.get(shardID)!.resuming = true; } @@ -178,13 +180,8 @@ export function createShard(shardID: number) { resume(shardID); break; case GatewayOpcode.InvalidSession: - // TODO: Log that this is happening - // eventHandlers.debug?.( - // { - // type: "gatewayInvalidSession", - // data: { shardID, data }, - // }, - // ); + ws.log("INVALID_SESSION", { shardID, payload: messageData }); + // When d is false we need to reidentify if (!messageData.d) { identify(shardID, ws.maxShards); @@ -199,10 +196,7 @@ export function createShard(shardID: number) { break; default: if (messageData.t === "RESUMED") { - // TODO: Log that this is happening - // eventHandlers.debug?.( - // { type: "gatewayResumed", data: { shardID } }, - // ); + ws.log("RESUMED", { shardID }); if (ws.shards.has(shardID)) { ws.shards.get(shardID)!.resuming = false; @@ -231,17 +225,11 @@ export function createShard(shardID: number) { } }; - socket.onclose = ({ reason, code, wasClean }) => { - // TODO: Log that this is happening - // eventHandlers.debug?.( - // { - // type: "wsClose", - // data: { shardID, code, reason, wasClean }, - // }, - // ); + socket.onclose = (event) => { + ws.log("CLOSED", { shardID, payload: event }); // TODO: ENUM FOR THESE CODES? - switch (code) { + switch (event.code) { case 4001: case 4002: case 4004: @@ -252,18 +240,14 @@ export function createShard(shardID: number) { case 4013: case 4014: throw new Error( - reason || "Discord gave no reason! GG! You broke Discord!", + event.reason || "Discord gave no reason! GG! You broke Discord!", ); // THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY! case 4003: case 4007: case 4008: case 4009: - // TODO: Log that this is happening - // eventHandlers.debug?.({ - // type: "wsReconnect", - // data: { shardID, code, reason, wasClean }, - // }); + ws.log("CLOSED_RECONNECT", { shardID, payload: event }); identify(shardID, ws.maxShards); break; default: @@ -275,6 +259,7 @@ export function createShard(shardID: number) { return socket; } +/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */ export async function handleDiscordPayload( data: DiscordPayload, shardID: number, diff --git a/src/ws/proxy/ws.ts b/src/ws/proxy/ws.ts index b2c225e08..c1de500cd 100644 --- a/src/ws/proxy/ws.ts +++ b/src/ws/proxy/ws.ts @@ -6,6 +6,7 @@ import { heartbeat, identify, } from "./shard.ts"; +import { log } from "./events.ts"; // CONTROLLER LIKE INTERFACE FOR WS HANDLING export const ws = { @@ -69,6 +70,7 @@ export const ws = { heartbeat, handleDiscordPayload, tellClusterToIdentify, + log }; export interface DiscordenoShard {