mirror of
https://github.com/discordeno/discordeno.git
synced 2026-06-02 17:00:08 +00:00
fix: shard
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import { Shard } from './types.js'
|
||||
|
||||
export function calculateSafeRequests (shard: Shard) {
|
||||
export function calculateSafeRequests (shard: Shard): number {
|
||||
// * 2 adds extra safety layer for discords OP 1 requests that we need to respond to
|
||||
const safeRequests = shard.maxRequestsPerRateLimitTick -
|
||||
Math.ceil(shard.rateLimitResetInterval / shard.heart.interval) * 2
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import { StatusUpdate } from '../../helpers/misc/editShardStatus.js'
|
||||
import { DiscordGatewayPayload } from '../../types/discord.js'
|
||||
import { PickPartial } from '../../types/shared.js'
|
||||
import { createLeakyBucket, LeakyBucket } from '../../util/bucket.js'
|
||||
import { API_VERSION } from '../../util/constants.js'
|
||||
import { ActivityTypes, DiscordGatewayPayload, PickPartial, PresenceStatus } from '@discordeno/types'
|
||||
import { API_VERSION, createLeakyBucket, LeakyBucket } from '@discordeno/utils'
|
||||
import { calculateSafeRequests } from './calculateSafeRequests.js'
|
||||
import { close } from './close.js'
|
||||
import { connect } from './connect.js'
|
||||
@@ -32,6 +29,7 @@ import {
|
||||
// TODO: improve shard event resolving
|
||||
|
||||
/** */
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
|
||||
export function createShard (
|
||||
options: CreateShard
|
||||
) {
|
||||
@@ -55,11 +53,12 @@ export function createShard (
|
||||
// ----------
|
||||
|
||||
/** The gateway configuration which is used to connect to Discord. */
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
gatewayConfig: {
|
||||
compress: options.gatewayConfig.compress ?? false,
|
||||
intents: options.gatewayConfig.intents ?? 0,
|
||||
properties: {
|
||||
os: options.gatewayConfig?.properties?.os ?? Deno.build.os,
|
||||
os: options.gatewayConfig?.properties?.os ?? process.platform,
|
||||
browser: options.gatewayConfig?.properties?.browser ?? 'Discordeno',
|
||||
device: options.gatewayConfig?.properties?.device ?? 'Discordeno'
|
||||
},
|
||||
@@ -68,6 +67,7 @@ export function createShard (
|
||||
version: options.gatewayConfig.version ?? API_VERSION
|
||||
} as ShardGatewayConfig,
|
||||
/** This contains all the heartbeat information */
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
heart: {
|
||||
acknowledged: false,
|
||||
interval: DEFAULT_HEARTBEAT_INTERVAL
|
||||
@@ -79,7 +79,7 @@ export function createShard (
|
||||
*/
|
||||
maxRequestsPerRateLimitTick: MAX_GATEWAY_REQUESTS_PER_INTERVAL,
|
||||
/** The previous payload sequence number. */
|
||||
previousSequenceNumber: options.previousSequenceNumber || null,
|
||||
previousSequenceNumber: options.previousSequenceNumber ?? null,
|
||||
/** In which interval (in milliseconds) the gateway resets it's rate limit. */
|
||||
rateLimitResetInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL,
|
||||
/** Current session id of the shard if present. */
|
||||
@@ -97,6 +97,7 @@ export function createShard (
|
||||
// ----------
|
||||
|
||||
/** The shard related event handlers. */
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
events: options.events ?? {} as ShardEvents,
|
||||
|
||||
/** Calculate the amount of requests which can safely be made per rate limit interval,
|
||||
@@ -224,6 +225,25 @@ export function createShard (
|
||||
}
|
||||
}
|
||||
|
||||
/** https://discord.com/developers/docs/topics/gateway-events#activity-object */
|
||||
export interface BotActivity {
|
||||
name: string
|
||||
type: ActivityTypes
|
||||
url?: string
|
||||
}
|
||||
|
||||
/** https://discord.com/developers/docs/topics/gateway-events#update-presence */
|
||||
export interface BotStatusUpdate {
|
||||
// /** Unix time (in milliseconds) of when the client went idle, or null if the client is not idle */
|
||||
since: number | null
|
||||
/** The user's activities */
|
||||
activities: BotActivity[]
|
||||
/** The user's new status */
|
||||
status: keyof typeof PresenceStatus
|
||||
// /** Whether or not the client is afk */
|
||||
// afk: boolean;
|
||||
}
|
||||
|
||||
export interface CreateShard {
|
||||
/** Id of the shard which should be created. */
|
||||
id: number
|
||||
@@ -275,7 +295,7 @@ export interface CreateShard {
|
||||
isOpen?: typeof isOpen
|
||||
|
||||
/** Function which can be overwritten in order to get the shards presence. */
|
||||
makePresence?: (shardId: number) => Promise<StatusUpdate> | StatusUpdate
|
||||
makePresence?: (shardId: number) => Promise<BotStatusUpdate> | BotStatusUpdate
|
||||
|
||||
/** The maximum of requests which can be send to discord per rate limit tick.
|
||||
* Typically this value should not be changed.
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
export { decompress_with as decompressWith } from 'https://unpkg.com/@evan/wasm@0.0.94/target/zlib/deno.js'
|
||||
@@ -1,4 +1,4 @@
|
||||
import { GatewayCloseEventCodes } from '../../types/shared.js'
|
||||
import { GatewayCloseEventCodes } from '@discordeno/types'
|
||||
import { Shard, ShardSocketCloseCodes, ShardState } from './types.js'
|
||||
|
||||
export async function handleClose (shard: Shard, close: CloseEvent): Promise<void> {
|
||||
|
||||
@@ -1,23 +1,16 @@
|
||||
import { DiscordGatewayPayload, DiscordHello, DiscordReady } from '../../types/discord.js'
|
||||
import { GatewayOpcodes } from '../../types/shared.js'
|
||||
import { createLeakyBucket } from '../../util/bucket.js'
|
||||
import { delay } from '../../util/utils.js'
|
||||
import { decompressWith } from './deps.js'
|
||||
import { DiscordGatewayPayload, DiscordHello, DiscordReady, GatewayOpcodes } from '@discordeno/types'
|
||||
import { createLeakyBucket, delay } from '@discordeno/utils'
|
||||
import { inflateSync } from 'node:zlib'
|
||||
import { GATEWAY_RATE_LIMIT_RESET_INTERVAL, Shard, ShardState } from './types.js'
|
||||
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
export async function handleMessage (shard: Shard, message: MessageEvent<any>): Promise<void> {
|
||||
message = message.data
|
||||
|
||||
// If message compression is enabled,
|
||||
// Discord might send zlib compressed payloads.
|
||||
if (shard.gatewayConfig.compress && message instanceof Blob) {
|
||||
message = decompressWith(
|
||||
new Uint8Array(await message.arrayBuffer()),
|
||||
0,
|
||||
(slice: Uint8Array) => decoder.decode(slice)
|
||||
)
|
||||
// @ts-expect-error
|
||||
message = inflateSync(await message.arrayBuffer()).toString()
|
||||
}
|
||||
|
||||
// Safeguard incase decompression failed to make a string.
|
||||
@@ -126,8 +119,9 @@ export async function handleMessage (shard: Shard, message: MessageEvent<any>):
|
||||
|
||||
shard.resolves.get('RESUMED')?.(messageData)
|
||||
shard.resolves.delete('RESUMED')
|
||||
} // Important for future resumes.
|
||||
else if (messageData.t === 'READY') {
|
||||
} else if (messageData.t === 'READY') {
|
||||
// Important for future resumes.
|
||||
|
||||
const payload = messageData.d as DiscordReady
|
||||
|
||||
shard.resumeGatewayUrl = payload.resume_gateway_url
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import { GatewayOpcodes } from '../../types/shared.js'
|
||||
import { GatewayOpcodes } from '@discordeno/types'
|
||||
import { Shard, ShardSocketCloseCodes, ShardState } from './types.js'
|
||||
|
||||
export async function identify (shard: Shard): Promise<void> {
|
||||
// A new identify has been requested even though there is already a connection open.
|
||||
// Therefore we need to close the old connection and heartbeating before creating a new one.
|
||||
if (shard.isOpen()) {
|
||||
console.log('CLOSING EXISTING SHARD: #' + shard.id)
|
||||
console.log(`CLOSING EXISTING SHARD: #${shard.id}`)
|
||||
shard.close(ShardSocketCloseCodes.ReIdentifying, 'Re-identifying closure of old connection.')
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ export async function identify (shard: Shard): Promise<void> {
|
||||
// Wait until an identify is free for this shard.
|
||||
await shard.requestIdentify()
|
||||
|
||||
shard.send({
|
||||
void shard.send({
|
||||
op: GatewayOpcodes.Identify,
|
||||
d: {
|
||||
token: `Bot ${shard.gatewayConfig.token}`,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { GatewayOpcodes } from '../../types/shared.js'
|
||||
import { GatewayOpcodes } from '@discordeno/types'
|
||||
import { Shard, ShardSocketCloseCodes, ShardState } from './types.js'
|
||||
|
||||
export async function resume (shard: Shard): Promise<void> {
|
||||
@@ -26,7 +26,7 @@ export async function resume (shard: Shard): Promise<void> {
|
||||
// Before we can resume, we need to create a new connection with Discord's gateway.
|
||||
await shard.connect()
|
||||
|
||||
shard.send({
|
||||
void shard.send({
|
||||
op: GatewayOpcodes.Resume,
|
||||
d: {
|
||||
token: `Bot ${shard.gatewayConfig.token}`,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { GatewayOpcodes } from '../../types/shared.js'
|
||||
import { GatewayOpcodes } from '@discordeno/types'
|
||||
import { Shard, ShardSocketCloseCodes, ShardState } from './types.js'
|
||||
|
||||
export function startHeartbeating (shard: Shard, interval: number) {
|
||||
export function startHeartbeating (shard: Shard, interval: number): void {
|
||||
// gateway.debug("GW HEARTBEATING_STARTED", { shardId, interval });
|
||||
|
||||
shard.heart.interval = interval
|
||||
@@ -28,37 +28,39 @@ export function startHeartbeating (shard: Shard, interval: number) {
|
||||
shard.heart.acknowledged = false
|
||||
|
||||
// After the random heartbeat jitter we can start a normal interval.
|
||||
shard.heart.intervalId = setInterval(async () => {
|
||||
// gateway.debug("GW DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`);
|
||||
shard.heart.intervalId = setInterval(() => {
|
||||
void (async () => {
|
||||
// gateway.debug("GW DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`);
|
||||
|
||||
// gateway.debug("GW HEARTBEATING", { shardId, shard: currentShard });
|
||||
// gateway.debug("GW HEARTBEATING", { shardId, shard: currentShard });
|
||||
|
||||
// The Shard did not receive a heartbeat ACK from Discord in time,
|
||||
// therefore we have to assume that the connection has failed or got "zombied".
|
||||
// The Shard needs to start a re-identify action accordingly.
|
||||
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating-example-gateway-heartbeat-ack
|
||||
if (!shard.heart.acknowledged) {
|
||||
shard.close(
|
||||
ShardSocketCloseCodes.ZombiedConnection,
|
||||
'Zombied connection, did not receive an heartbeat ACK in time.'
|
||||
// The Shard did not receive a heartbeat ACK from Discord in time,
|
||||
// therefore we have to assume that the connection has failed or got "zombied".
|
||||
// The Shard needs to start a re-identify action accordingly.
|
||||
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating-example-gateway-heartbeat-ack
|
||||
if (!shard.heart.acknowledged) {
|
||||
shard.close(
|
||||
ShardSocketCloseCodes.ZombiedConnection,
|
||||
'Zombied connection, did not receive an heartbeat ACK in time.'
|
||||
)
|
||||
|
||||
return await shard.identify()
|
||||
}
|
||||
|
||||
shard.heart.acknowledged = false
|
||||
|
||||
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
||||
shard.socket?.send(
|
||||
JSON.stringify({
|
||||
op: GatewayOpcodes.Heartbeat,
|
||||
d: shard.previousSequenceNumber
|
||||
})
|
||||
)
|
||||
|
||||
return await shard.identify()
|
||||
}
|
||||
shard.heart.lastBeat = Date.now()
|
||||
|
||||
shard.heart.acknowledged = false
|
||||
|
||||
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
||||
shard.socket?.send(
|
||||
JSON.stringify({
|
||||
op: GatewayOpcodes.Heartbeat,
|
||||
d: shard.previousSequenceNumber
|
||||
})
|
||||
)
|
||||
|
||||
shard.heart.lastBeat = Date.now()
|
||||
|
||||
shard.events.heartbeat?.(shard)
|
||||
shard.events.heartbeat?.(shard)
|
||||
})()
|
||||
}, shard.heart.interval)
|
||||
}, jitter)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { DiscordGatewayPayload } from '../../types/discord.js'
|
||||
import { GatewayOpcodes } from '../../types/shared.js'
|
||||
import { DiscordGatewayPayload, GatewayOpcodes } from '@discordeno/types'
|
||||
import { createShard } from './createShard.js'
|
||||
|
||||
// TODO: think whether we also need an identifiedShard function
|
||||
@@ -80,7 +79,7 @@ export interface ShardHeart {
|
||||
/** Interval between heartbeats requested by Discord. */
|
||||
interval: number
|
||||
/** Id of the interval, which is used for sending the heartbeats. */
|
||||
intervalId?: number
|
||||
intervalId?: NodeJS.Timer
|
||||
/** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */
|
||||
lastAck?: number
|
||||
/** Unix timestamp (in milliseconds) when the last heartbeat was sent. */
|
||||
@@ -91,7 +90,7 @@ export interface ShardHeart {
|
||||
*/
|
||||
rtt?: number
|
||||
/** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */
|
||||
timeoutId?: number
|
||||
timeoutId?: NodeJS.Timeout
|
||||
}
|
||||
|
||||
export interface ShardEvents {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { GatewayManager } from '../gateway/manager/gatewayManager.js'
|
||||
import { GatewayManager } from '../manager/gatewayManager.js'
|
||||
|
||||
export function calculateShardId (gateway: GatewayManager, guildId: bigint) {
|
||||
export function calculateShardId (gateway: GatewayManager, guildId: bigint): number {
|
||||
if (gateway.manager.totalShards === 1) return 0
|
||||
|
||||
return Number((guildId >> 22n) % BigInt(gateway.manager.totalShards))
|
||||
|
||||
Reference in New Issue
Block a user