From d6898f2b7b1d6eadc48ce48874cf55c837a5d1e8 Mon Sep 17 00:00:00 2001 From: Fleny Date: Wed, 7 Aug 2024 08:16:55 +0200 Subject: [PATCH] feat(gateway): Use the native WebSocket implementation when available (#3808) * Use the native WebSocket implementation when available * merge types.js import in shard * Apply suggestions from code review Co-authored-by: Awesome Stickz * Do not use the WebSocket global on Node --------- Co-authored-by: Awesome Stickz --- packages/gateway/src/Shard.ts | 78 +++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/packages/gateway/src/Shard.ts b/packages/gateway/src/Shard.ts index 8432ef1ae..af7b24e8a 100644 --- a/packages/gateway/src/Shard.ts +++ b/packages/gateway/src/Shard.ts @@ -3,10 +3,15 @@ import type { DiscordGatewayPayload, DiscordHello, DiscordReady } from '@discord import { GatewayCloseEventCodes, GatewayOpcodes } from '@discordeno/types' import { LeakyBucket, camelize, delay, logger } from '@discordeno/utils' import NodeWebSocket from 'ws' -import type { BotStatusUpdate, ShardEvents, ShardGatewayConfig, ShardHeart, ShardSocketRequest } from './types.js' -import { ShardSocketCloseCodes, ShardState } from './types.js' - -declare let WebSocket: any +import { + type BotStatusUpdate, + type ShardEvents, + type ShardGatewayConfig, + type ShardHeart, + ShardSocketCloseCodes, + type ShardSocketRequest, + ShardState, +} from './types.js' export class DiscordenoShard { /** The id of the shard */ @@ -24,7 +29,7 @@ export class DiscordenoShard { /** Current session id of the shard if present. */ sessionId?: string /** This contains the WebSocket connection to Discord, if currently connected. */ - socket?: NodeWebSocket + socket?: WebSocket /** Current internal state of the this. */ state = ShardState.Offline /** The url provided by discord to use when resuming a connection for this this. */ @@ -105,29 +110,31 @@ export class DiscordenoShard { if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) { this.state = ShardState.Connecting } + this.events.connecting?.(this) const url = new URL(this.connectionUrl) url.searchParams.set('v', this.gatewayConfig.version.toString()) url.searchParams.set('encoding', 'json') - const socket: NodeWebSocket = - // @ts-expect-error Deno - globalThis.Deno !== undefined && Reflect.has(globalThis, 'Deno') ? new WebSocket(url.toString()) : new NodeWebSocket(url.toString()) + // We check for built-in WebSocket implementations in Bun or Deno, NodeJS v22 has an implementation too but it seems to be less optimized so for now it is better to use the ws npm package + const shouldUseBuiltin = Reflect.has(globalThis, 'WebSocket') && (Reflect.has(globalThis, 'Bun') || Reflect.has(globalThis, 'Deno')) + + // @ts-expect-error NodeWebSocket doesn't support "dispatchEvent", and while we don't use it, it is required on the "WebSocket" type + const socket: WebSocket = shouldUseBuiltin ? new WebSocket(url) : new NodeWebSocket(url) this.socket = socket - // TODO: proper event handling - socket.onerror = (event: NodeWebSocket.ErrorEvent) => console.log({ error: event, shardId: this.id }) - socket.onclose = async (event: NodeWebSocket.CloseEvent) => await this.handleClose(event) - socket.onmessage = async (message: NodeWebSocket.MessageEvent) => await this.handleMessage(message) + socket.onerror = (event) => this.handleError(event) + socket.onclose = async (closeEvent) => await this.handleClose(closeEvent) + socket.onmessage = async (messageEvent) => await this.handleMessage(messageEvent) return await new Promise((resolve) => { socket.onopen = () => { - // Only set the shard to `Unidentified` state, - // if the connection request does not come from an identify or resume action. + // Only set the shard to `Unidentified` state if the connection request does not come from an identify or resume action. if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) { this.state = ShardState.Unidentified } + this.events.connected?.(this) resolve(this) @@ -262,12 +269,15 @@ export class DiscordenoShard { this.state = ShardState.Offline } - /** Handle a gateway connection close. */ - async handleClose(close: NodeWebSocket.CloseEvent): Promise { - // gateway.debug("GW CLOSED", { shardId, payload: event }); + /** Handle a gateway connection error */ + handleError(error: Event): void { + this.logger.error(`[Shard] There was an error connecting shard ${this.id}.`, error) + } + /** Handle a gateway connection close. */ + async handleClose(close: CloseEvent): Promise { this.stopHeartbeating() - this.logger.debug(`[Shard] Gateway connection closed with code ${close.code}.`) + this.logger.debug(`[Shard] Gateway connection closed with code ${close.code} (${close.reason || ''}).`) switch (close.code) { case ShardSocketCloseCodes.TestingFinished: { @@ -327,6 +337,21 @@ export class DiscordenoShard { } } + /** Handle an incoming gateway message. */ + async handleMessage(message: MessageEvent): Promise { + let preProcessMessage = message.data + + // If message compression is enabled, Discord might send zlib compressed payloads. + if (this.gatewayConfig.compress && preProcessMessage instanceof Blob) { + preProcessMessage = inflateSync(await preProcessMessage.arrayBuffer()).toString() + } + + // Safeguard incase decompression failed to make a string. + if (typeof preProcessMessage !== 'string') return + + return await this.handleDiscordPacket(JSON.parse(preProcessMessage) as DiscordGatewayPayload) + } + /** Handles a incoming gateway packet. */ async handleDiscordPacket(packet: DiscordGatewayPayload): Promise { // Edge case start: https://github.com/discordeno/discordeno/issues/2311 @@ -471,23 +496,6 @@ export class DiscordenoShard { // Now the event can be safely forwarded. this.events.message?.(this, camelize(packet)) } - - /** Handle an incoming gateway message. */ - async handleMessage(message: NodeWebSocket.MessageEvent): Promise { - let preProcessMessage = message.data - - // If message compression is enabled, - // Discord might send zlib compressed payloads. - if (this.gatewayConfig.compress && preProcessMessage instanceof Blob) { - preProcessMessage = inflateSync(await preProcessMessage.arrayBuffer()).toString() - } - - // Safeguard incase decompression failed to make a string. - if (typeof preProcessMessage !== 'string') return - - return await this.handleDiscordPacket(JSON.parse(preProcessMessage) as DiscordGatewayPayload) - } - /** * Override in order to make the shards presence. * async in case devs create the presence based on eg. database values.