diff --git a/src/ws/heartbeat.ts b/src/ws/heartbeat.ts index 1b9842917..a32d964ee 100644 --- a/src/ws/heartbeat.ts +++ b/src/ws/heartbeat.ts @@ -1,8 +1,9 @@ import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts"; +import { delay } from "../util/utils.ts"; import { identify } from "./identify.ts"; import { ws } from "./ws.ts"; -export function heartbeat(shardId: number, interval: number) { +export async function heartbeat(shardId: number, interval: number) { ws.log("HEARTBEATING_STARTED", { shardId, interval }); const shard = ws.shards.get(shardId); @@ -11,56 +12,48 @@ export function heartbeat(shardId: number, interval: number) { ws.log("HEARTBEATING_DETAILS", { shardId, interval, shard }); shard.heartbeat.keepAlive = true; - shard.heartbeat.acknowledged = true; + shard.heartbeat.acknowledged = false; shard.heartbeat.lastSentAt = Date.now(); shard.heartbeat.interval = interval; - // The first heartbeat should not wait for the entire interval to pass: https://discord.com/developers/docs/topics/gateway#heartbeating - shard.heartbeat.timeoutId = setTimeout( - () => sendHeartbeat(shardId), - Math.floor(shard.heartbeat.interval * Math.random()), - ); -} + // The first heartbeat is special so we send it without setInterval: https://discord.com/developers/docs/topics/gateway#heartbeating + await delay(Math.floor(shard.heartbeat.interval * Math.random())); -function sendHeartbeat(shardId: number) { - ws.log("DEBUG", `Running setInterval in heartbeat file.`); - const currentShard = ws.shards.get(shardId); - if (!currentShard) return; - - ws.log("HEARTBEATING", { shardId, shard: currentShard }); - - if ( - currentShard.ws.readyState === WebSocket.CLOSED || - !currentShard.heartbeat.keepAlive - ) { - ws.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard }); - - // STOP THE HEARTBEAT - return; - } - - if (!currentShard.heartbeat.acknowledged) { - currentShard.ws.close(3065, "Did not receive an ACK in time."); - return identify(shardId, ws.maxShards); - } - - if (currentShard.ws.readyState !== WebSocket.OPEN) { - currentShard.heartbeat.timeoutId = setTimeout( - () => sendHeartbeat(shardId), - currentShard.heartbeat.interval, - ); - return; - } - - currentShard.ws.send(JSON.stringify({ + shard.queue.unshift({ op: DiscordGatewayOpcodes.Heartbeat, - d: currentShard.previousSequenceNumber, - })); + d: shard.previousSequenceNumber, + }); + ws.processQueue(shard.id); - currentShard.heartbeat.acknowledged = false; + shard.heartbeat.intervalId = setInterval(() => { + ws.log("DEBUG", `Running setInterval in heartbeat file.`); + const currentShard = ws.shards.get(shardId); + if (!currentShard) return; - currentShard.heartbeat.timeoutId = setTimeout( - () => sendHeartbeat(shardId), - currentShard.heartbeat.interval, - ); + ws.log("HEARTBEATING", { shardId, shard: currentShard }); + + if ( + currentShard.ws.readyState === WebSocket.CLOSED || + !currentShard.heartbeat.keepAlive + ) { + ws.log("HEARTBEATING_CLOSED", { shardId, shard: currentShard }); + + // STOP THE HEARTBEAT + return clearInterval(shard.heartbeat.intervalId); + } + + if (!currentShard.heartbeat.acknowledged) { + currentShard.ws.close(3065, "Did not receive an ACK in time."); + return identify(shardId, ws.maxShards); + } + + if (currentShard.ws.readyState !== WebSocket.OPEN) return; + + currentShard.ws.send(JSON.stringify({ + op: DiscordGatewayOpcodes.Heartbeat, + d: currentShard.previousSequenceNumber, + })); + + currentShard.heartbeat.acknowledged = false; + }, shard.heartbeat.interval); } diff --git a/src/ws/identify.ts b/src/ws/identify.ts index 5ca156e02..19e5a92ee 100644 --- a/src/ws/identify.ts +++ b/src/ws/identify.ts @@ -7,7 +7,10 @@ export async function identify(shardId: number, maxShards: number) { // Need to clear the old heartbeat interval const oldShard = ws.shards.get(shardId); if (oldShard) { - clearTimeout(oldShard.heartbeat.timeoutId); + if (oldShard.ws.readyState === WebSocket.OPEN) { + oldShard.ws.close(3065, "Reidentifying closure of old shard"); + } + clearInterval(oldShard.heartbeat.intervalId); } // CREATE A SHARD @@ -29,7 +32,7 @@ export async function identify(shardId: number, maxShards: number) { acknowledged: false, keepAlive: false, interval: 0, - timeoutId: 0, + intervalId: 0, }, queue: [], processingQueue: false, diff --git a/src/ws/resume.ts b/src/ws/resume.ts index 46c345f24..f54439905 100644 --- a/src/ws/resume.ts +++ b/src/ws/resume.ts @@ -19,7 +19,7 @@ export async function resume(shardId: number) { oldShard.ws.close(3065, "Resuming the shard, closing old shard."); } // STOP OLD HEARTBEAT - clearTimeout(oldShard.heartbeat.timeoutId); + clearInterval(oldShard.heartbeat.intervalId); ws.shards.set(shardId, { id: shardId, @@ -36,7 +36,7 @@ export async function resume(shardId: number) { acknowledged: false, keepAlive: false, interval: 0, - timeoutId: 0, + intervalId: 0, }, queue: oldShard.queue || [], processingQueue: false, diff --git a/src/ws/ws.ts b/src/ws/ws.ts index d08f8940f..164a38e0f 100644 --- a/src/ws/ws.ts +++ b/src/ws/ws.ts @@ -143,7 +143,7 @@ export interface DiscordenoShard { /** The interval between heartbeats requested by discord. */ interval: number; /** The id of the interval, useful for stopping the interval if ws closed. */ - timeoutId: number; + intervalId: number; }; /** The items/requestst that are in queue to be sent to this shard websocket. */ queue: WebSocketRequest[]; diff --git a/tests/ws/ws_close.ts b/tests/ws/ws_close.ts index 7ffcffcbb..3b313fc99 100644 --- a/tests/ws/ws_close.ts +++ b/tests/ws/ws_close.ts @@ -7,7 +7,7 @@ Deno.test({ name: "[ws] Close all shards manually.", async fn() { ws.shards.forEach((shard) => { - clearTimeout(shard.heartbeat.timeoutId); + clearInterval(shard.heartbeat.intervalId); shard.ws.close(3064, "Discordeno Testing Finished! Do Not RESUME!"); });