feat: allow for custom function to send the payload to shards (#3149)

* Initial work for sendPayload

* fix docs comment
This commit is contained in:
Fleny
2023-10-15 06:30:58 +02:00
committed by GitHub
parent 4a38f1a6e4
commit a8f9e5ec1b
3 changed files with 135 additions and 244 deletions

View File

@@ -1,21 +1,10 @@
/* eslint-disable @typescript-eslint/no-confusing-void-expression */
import type {
AtLeastOne,
BigString,
Camelize,
DiscordGatewayPayload,
DiscordGuildMembersChunk,
DiscordHello,
DiscordMember,
DiscordReady,
RequestGuildMembers,
} from '@discordeno/types'
import { GatewayCloseEventCodes, GatewayIntents, GatewayOpcodes } from '@discordeno/types'
import { Collection, LeakyBucket, camelize, delay, logger } from '@discordeno/utils'
import type { DiscordGatewayPayload, DiscordGuildMembersChunk, DiscordHello, DiscordReady } from '@discordeno/types'
import { GatewayCloseEventCodes, GatewayOpcodes } from '@discordeno/types'
import { camelize, delay, LeakyBucket, logger } from '@discordeno/utils'
import { inflateSync } from 'node:zlib'
import NodeWebSocket from 'ws'
import type { RequestMemberRequest } from './manager.js'
import type { BotStatusUpdate, ShardEvents, ShardGatewayConfig, ShardHeart, ShardSocketRequest, StatusUpdate, UpdateVoiceState } from './types.js'
import type { BotStatusUpdate, ShardEvents, ShardGatewayConfig, ShardHeart, ShardSocketRequest } from './types.js'
import { ShardSocketCloseCodes, ShardState } from './types.js'
declare let WebSocket: any
@@ -50,19 +39,6 @@ export class DiscordenoShard {
/** Shard bucket. Only access this if you know what you are doing. Bucket for handling shard request rate limits. */
bucket: LeakyBucket
/** This managers cache related settings. */
cache = {
requestMembers: {
/**
* Whether or not request member requests should be cached.
* @default false
*/
enabled: false,
/** The pending requests. */
pending: new Collection<string, RequestMemberRequest>(),
},
}
constructor(options: ShardCreateOptions) {
this.id = options.id
this.connection = options.connection
@@ -88,7 +64,7 @@ export class DiscordenoShard {
return this.connection
}
/** The url to connect to. Intially this is the discord gateway url, and then is switched to resume gateway url once a READY is received. */
/** The url to connect to. Initially this is the discord gateway url, and then is switched to resume gateway url once a READY is received. */
get connectionUrl(): string {
// Use || and not ?? here. ?? will cause a bug.
return this.resumeGatewayUrl || this.gatewayConfig.url
@@ -267,7 +243,7 @@ export class DiscordenoShard {
/** Send a message to Discord.
* @param {boolean} [highPriority=false] - Whether this message should be send asap.
*/
async send(message: ShardSocketRequest, highPriority = false): Promise<void> {
async send(message: ShardSocketRequest, highPriority: boolean = false): Promise<void> {
// Before acquiring a token from the bucket, check whether the shard is currently offline or not.
// Else bucket and token wait time just get wasted.
await this.checkOffline(highPriority)
@@ -473,27 +449,7 @@ export class DiscordenoShard {
break
}
case 'GUILD_MEMBERS_CHUNK': {
// If it's not enabled skip checks.
if (!this.cache.requestMembers.enabled) break
const payload = packet.d as DiscordGuildMembersChunk
// If this request has non nonce, skip checks.
if (!payload.nonce) break
const pending = this.cache.requestMembers.pending.get(payload.nonce)
if (!pending) break
// If this is not the final chunk, just save to cache.
if (payload.chunk_index + 1 < payload.chunk_count) {
pending.members.push(...payload.members)
break;
}
// Resolve the promise that all requests are done.
pending.resolve(camelize(pending.members))
// Delete the cache to clean up once its done.
this.cache.requestMembers.pending.delete(payload.nonce)
break
this.events.guildMemberChunk?.(packet.d as DiscordGuildMembersChunk)
}
}
@@ -630,166 +586,6 @@ export class DiscordenoShard {
// To go safe we should clear the related timeout too.
clearTimeout(this.heart.timeoutId)
}
/**
* Connects the bot user to a voice or stage channel.
*
* This function sends the _Update Voice State_ gateway command over the gateway behind the scenes.
*
* @param guildId - The ID of the guild the voice channel to leave is in.
* @param channelId - The ID of the channel you want to join.
*
* @remarks
* Requires the `CONNECT` permission.
*
* Fires a _Voice State Update_ gateway event.
*
* @see {@link https://discord.com/developers/docs/topics/gateway#update-voice-state}
*/
async joinVoiceChannel(
guildId: BigString,
channelId: BigString,
options?: AtLeastOne<Omit<UpdateVoiceState, 'guildId' | 'channelId'>>,
): Promise<void> {
logger.debug(`[Shard] joinVoiceChannel guildId: ${guildId} channelId: ${channelId}`)
await this.send({
op: GatewayOpcodes.VoiceStateUpdate,
d: {
guild_id: guildId.toString(),
channel_id: channelId.toString(),
self_mute: Boolean(options?.selfMute),
self_deaf: options?.selfDeaf ?? true,
},
})
}
/**
* Edits the bot status in all shards that this gateway manages.
*
* @param data The status data to set the bots status to.
* @returns Promise<void>
*/
async editBotStatus(data: StatusUpdate): Promise<void> {
logger.debug(`[Shard] editBotStatus data: ${JSON.stringify(data)}`)
await this.editShardStatus(data)
}
/**
* Edits the bot's status on one shard.
*
* @param shardId The shard id to edit the status for.
* @param data The status data to set the bots status to.
* @returns Promise<void>
*/
async editShardStatus(data: StatusUpdate): Promise<void> {
logger.debug(`[Shard] editShardStatus shardId: ${this.id} -> data: ${JSON.stringify(data)}`)
await this.send({
op: GatewayOpcodes.PresenceUpdate,
d: {
since: null,
afk: false,
activities: data.activities,
status: data.status,
},
})
}
/**
* Fetches the list of members for a guild over the gateway.
*
* @param guildId - The ID of the guild to get the list of members for.
* @param options - The parameters for the fetching of the members.
*
* @remarks
* If requesting the entire member list:
* - Requires the `GUILD_MEMBERS` intent.
*
* If requesting presences ({@link RequestGuildMembers.presences | presences} set to `true`):
* - Requires the `GUILD_PRESENCES` intent.
*
* If requesting a prefix ({@link RequestGuildMembers.query | query} non-`undefined`):
* - Returns a maximum of 100 members.
*
* If requesting a users by ID ({@link RequestGuildMembers.userIds | userIds} non-`undefined`):
* - Returns a maximum of 100 members.
*
* Fires a _Guild Members Chunk_ gateway event for every 1000 members fetched.
*
* @see {@link https://discord.com/developers/docs/topics/gateway#request-guild-members}
*/
async requestMembers(guildId: BigString, options?: Omit<RequestGuildMembers, 'guildId'>): Promise<Camelize<DiscordMember[]>> {
// You can request 1 member without the intent
// Check if intents is not 0 as proxy ws won't set intents in other instances
if (this.connection.intents && (!options?.limit || options.limit > 1) && !(this.connection.intents & GatewayIntents.GuildMembers)) {
throw new Error('MISSING_INTENT_GUILD_MEMBERS')
}
if (options?.userIds?.length) {
logger.debug(`[Shard] requestMembers guildId: ${guildId} -> setting user limit based on userIds length: ${options.userIds.length}`)
options.limit = options.userIds.length
}
// Gateway does not require caching these requests so directly send and return
if (!this.cache.requestMembers?.enabled) {
logger.debug(`[Shard] requestMembers guildId: ${guildId} -> skipping cache -> options ${JSON.stringify(options)}`)
await this.send({
op: GatewayOpcodes.RequestGuildMembers,
d: {
guild_id: guildId.toString(),
// If a query is provided use it, OR if a limit is NOT provided use ""
query: options?.query ?? (options?.limit ? undefined : ''),
limit: options?.limit ?? 0,
presences: options?.presences ?? false,
user_ids: options?.userIds?.map((id) => id.toString()),
nonce: options?.nonce,
},
})
return []
}
return await new Promise((resolve) => {
if (options?.nonce) this.cache.requestMembers?.pending.set(options.nonce, { nonce: options.nonce, resolve, members: [] })
logger.debug(`[Shard] requestMembers guildId: ${guildId} -> requesting members -> data: ${JSON.stringify(options)}`)
this.send({
op: GatewayOpcodes.RequestGuildMembers,
d: {
guild_id: guildId.toString(),
// If a query is provided use it, OR if a limit is NOT provided use ""
query: options?.query ?? (options?.limit ? undefined : ''),
limit: options?.limit ?? 0,
presences: options?.presences ?? false,
user_ids: options?.userIds?.map((id) => id.toString()),
nonce: options?.nonce,
},
})
})
}
/**
* Leaves the voice channel the bot user is currently in.
*
* This function sends the _Update Voice State_ gateway command over the gateway behind the scenes.
*
* @param guildId - The ID of the guild the voice channel to leave is in.
*
* @remarks
* Fires a _Voice State Update_ gateway event.
*
* @see {@link https://discord.com/developers/docs/topics/gateway#update-voice-state}
*/
async leaveVoiceChannel(guildId: BigString): Promise<void> {
logger.debug(`[Shard] leaveVoiceChannel guildId: ${guildId} Shard ${this.id}`)
await this.send({
op: GatewayOpcodes.VoiceStateUpdate,
d: {
guild_id: guildId.toString(),
channel_id: null,
self_mute: false,
self_deaf: false,
},
})
}
}
export interface ShardCreateOptions {

View File

@@ -1,16 +1,18 @@
/* eslint-disable @typescript-eslint/no-confusing-void-expression */
import type {
AtLeastOne,
BigString,
Camelize,
DiscordGetGatewayBot,
DiscordMember,
DiscordMemberWithUser,
RequestGuildMembers,
import {
GatewayIntents,
GatewayOpcodes,
type AtLeastOne,
type BigString,
type Camelize,
type DiscordGetGatewayBot,
type DiscordMember,
type DiscordMemberWithUser,
type RequestGuildMembers,
} from '@discordeno/types'
import { Collection, delay, logger } from '@discordeno/utils'
import { camelize, Collection, delay, logger } from '@discordeno/utils'
import Shard from './Shard.js'
import type { ShardEvents, StatusUpdate, UpdateVoiceState } from './types.js'
import type { ShardEvents, ShardSocketRequest, StatusUpdate, UpdateVoiceState } from './types.js'
export function createGatewayManager(options: CreateGatewayManagerOptions): GatewayManager {
const connectionOptions = options.connection ?? {
@@ -24,6 +26,29 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
}
options.events.guildMemberChunk ??= (payload) => {
// If it's not enabled skip checks.
if (!gateway.cache.requestMembers?.enabled) return
// If this request has no nonce, skip checks.
if (!payload.nonce) return
const pending = gateway.cache.requestMembers.pending.get(payload.nonce)
if (!pending) return
if (payload.chunk_count === 1) pending.members = payload.members
else pending.members.push(...payload.members)
// If this is not the final chunk, just save to cache.
if (payload.chunk_index + 1 < payload.chunk_count) return
// Resolve the promise that all requests are done.
pending.resolve(camelize(pending.members))
// Delete the cache to clean up once its done.
gateway.cache.requestMembers.pending.delete(payload.nonce)
}
const gateway: GatewayManager = {
events: options.events,
compress: options.compress ?? false,
@@ -142,6 +167,15 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
await delay(5000)
},
async sendPayload(shardId, payload) {
const shard = gateway.shards.get(shardId)
if (!shard) {
throw new Error(`Shard (id: ${shardId} not found`)
}
await shard.send(payload)
},
async tellWorkerToIdentify(workerId, shardId, bucketId) {
logger.debug(`[Gateway] tell worker to identify (${workerId}, ${shardId}, ${bucketId})`)
await gateway.identify(shardId)
@@ -204,7 +238,6 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
this.shards.delete(shardId)
await shard.shutdown()
},
async requestIdentify(shardId: number) {
logger.debug(`[Gateway] requesting identify`)
// const bucket = gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)
@@ -232,17 +265,23 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
async joinVoiceChannel(guildId, channelId, options) {
const shardId = gateway.calculateShardId(guildId)
const shard = gateway.shards.get(shardId)
if (!shard) {
throw new Error(`Shard (id: ${shardId} not found`)
}
logger.debug(`[Gateway] joinVoiceChannel guildId: ${guildId} channelId: ${channelId}`)
shard.joinVoiceChannel(guildId, channelId, options)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.VoiceStateUpdate,
d: {
guild_id: guildId.toString(),
channel_id: channelId.toString(),
self_mute: options?.selfMute ?? false,
self_deaf: options?.selfDeaf ?? true,
},
})
},
async editBotStatus(data) {
logger.debug(`[Gateway] editBotStatus data: ${JSON.stringify(data)}`)
await Promise.all(
[...gateway.shards.values()].map(async (shard) => {
gateway.editShardStatus(shard.id, data)
@@ -251,35 +290,80 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
async editShardStatus(shardId, data) {
const shard = gateway.shards.get(shardId)
if (!shard) {
throw new Error(`Shard (id: ${shardId}) not found.`)
}
logger.debug(`[Gateway] editShardStatus shardId: ${shardId} -> data: ${JSON.stringify(data)}`)
await shard.editShardStatus(data)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.PresenceUpdate,
d: {
since: null,
afk: false,
activities: data.activities,
status: data.status,
},
})
},
async requestMembers(guildId, options) {
const shardId = gateway.calculateShardId(guildId)
const shard = gateway.shards.get(shardId)
if (!shard) {
throw new Error(`Shard (id: ${shardId}) not found.`)
if (gateway.intents && (!options?.limit || options.limit > 1) && !(gateway.intents & GatewayIntents.GuildMembers))
throw new Error('Cannot fetch more then 1 member without the GUILD_MEMBERS intent')
logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> data: ${JSON.stringify(options)}`)
if (options?.userIds?.length) {
logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> setting user limit based on userIds length: ${options.userIds.length}`)
options.limit = options.userIds.length
}
logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> options ${JSON.stringify(options)}`)
return await shard.requestMembers(guildId, options)
const members =
!gateway.cache.requestMembers?.enabled || !options?.nonce
? []
: new Promise<Camelize<DiscordMember[]>>((resolve, reject) => {
// Should never happen.
if (!gateway.cache.requestMembers?.enabled || !options?.nonce) {
reject(new Error("Can't request the members without the nonce or with the feature disabled."))
return
}
gateway.cache.requestMembers.pending.set(options.nonce, {
nonce: options.nonce,
resolve,
members: [],
})
})
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.RequestGuildMembers,
d: {
guild_id: guildId.toString(),
// If a query is provided use it, OR if a limit is NOT provided use ""
query: options?.query ?? (options?.limit ? undefined : ''),
limit: options?.limit ?? 0,
presences: options?.presences ?? false,
user_ids: options?.userIds?.map((id) => id.toString()),
nonce: options?.nonce,
},
})
return await members
},
async leaveVoiceChannel(guildId) {
const shardId = gateway.calculateShardId(guildId)
const shard = gateway.shards.get(shardId)
if (!shard) {
throw new Error(`Shard (id: ${shardId} not found`)
}
logger.debug(`[Gateway] leaveVoiceChannel guildId: ${guildId} Shard ${shardId}`)
await shard.leaveVoiceChannel(guildId)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.VoiceStateUpdate,
d: {
guild_id: guildId.toString(),
channel_id: null,
self_mute: false,
self_deaf: false,
},
})
},
}
@@ -402,6 +486,7 @@ export interface GatewayManager extends Required<CreateGatewayManagerOptions> {
spawnShards: () => Promise<void>
/** Shutdown all shards. */
shutdown: (code: number, reason: string) => Promise<void>
sendPayload: (shardId: number, payload: ShardSocketRequest) => Promise<void>
/** Allows users to hook in and change to communicate to different workers across different servers or anything they like. For example using redis pubsub to talk to other servers. */
tellWorkerToIdentify: (workerId: number, shardId: number, bucketId: number) => Promise<void>
/** Tell the manager to identify a Shard. If this Shard is not already managed this will also add the Shard to the manager. */

View File

@@ -1,4 +1,12 @@
import type { ActivityTypes, Camelize, DiscordActivity, DiscordGatewayPayload, GatewayOpcodes, PresenceStatus } from '@discordeno/types'
import type {
ActivityTypes,
Camelize,
DiscordActivity,
DiscordGatewayPayload,
DiscordGuildMembersChunk,
GatewayOpcodes,
PresenceStatus,
} from '@discordeno/types'
import type Shard from './Shard.js'
export enum ShardState {
@@ -113,6 +121,8 @@ export interface ShardEvents {
identified?: (shard: Shard) => unknown
/** The shard has received a message from Discord. */
message?: (shard: Shard, payload: Camelize<DiscordGatewayPayload>) => unknown
/* The shard has received a GUILD_MEMBER_CHUNK from Discord and should be handled accordingly */
guildMemberChunk?: (payload: DiscordGuildMembersChunk) => unknown
}
export enum ShardSocketCloseCodes {