feat(bot, rest, gateway, utils): Add support for custom loggers (#3383)

* Add support for custom loggers

* update docs for loggerFactory

* Use union for names in the bot logger factory

* Fix formaating errors
This commit is contained in:
Fleny
2024-03-17 12:22:45 +01:00
committed by GitHub
parent c5337508fe
commit efe56fbe06
9 changed files with 126 additions and 75 deletions

View File

@@ -3,7 +3,7 @@ import { ShardSocketCloseCodes, createGatewayManager } from '@discordeno/gateway
import type { CreateRestManagerOptions, RestManager } from '@discordeno/rest'
import { createRestManager } from '@discordeno/rest'
import type { BigString, DiscordEmoji, DiscordGatewayPayload, DiscordReady, GatewayIntents } from '@discordeno/types'
import { createLogger, getBotIdFromToken, type Collection } from '@discordeno/utils'
import { createLogger, getBotIdFromToken, type Collection, type logger } from '@discordeno/utils'
import { createBotGatewayHandlers } from './handlers.js'
import { createBotHelpers, type BotHelpers } from './helpers.js'
import { createTransformers, type Transformers } from './transformers.js'
@@ -38,9 +38,11 @@ import type { BotGatewayHandlerOptions } from './typings.js'
export function createBot(options: CreateBotOptions): Bot {
if (!options.rest) options.rest = { token: options.token, applicationId: options.applicationId }
if (!options.rest.token) options.rest.token = options.token
if (!options.rest.logger && options.loggerFactory) options.rest.logger = options.loggerFactory('REST')
if (!options.gateway) options.gateway = { token: options.token }
if (!options.gateway.token) options.gateway.token = options.token
if (!options.gateway.events) options.gateway.events = {}
if (!options.gateway.logger && options.loggerFactory) options.gateway.logger = options.loggerFactory('GATEWAY')
if (!options.gateway.events.message) {
options.gateway.events.message = async (shard, data) => {
// TRIGGER RAW EVENT
@@ -67,7 +69,7 @@ export function createBot(options: CreateBotOptions): Bot {
rest: createRestManager(options.rest),
gateway: createGatewayManager(options.gateway),
events: options.events ?? {},
logger: createLogger({ name: 'BOT' }),
logger: options.loggerFactory ? options.loggerFactory('BOT') : createLogger({ name: 'BOT' }),
// Set up helpers below.
helpers: {} as BotHelpers,
async start() {
@@ -122,6 +124,15 @@ export interface CreateBotOptions {
* @default false
*/
defaultDesiredPropertiesValue?: boolean
/**
* This factory will be invoked to create the logger for gateway, rest and bot
*
* @remarks
* If not provided the default logger will be used with rest and gateway sharing the same logger
*
* This function will be invoked 3 times, one with the name of `REST`, one with `GATEWAY` and the third one with name `BOT`
*/
loggerFactory?: (name: 'REST' | 'GATEWAY' | 'BOT') => Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}
export interface Bot {
@@ -136,7 +147,7 @@ export interface Bot {
/** The event handlers. */
events: Partial<EventHandlers>
/** A logger utility to make it easy to log nice and useful things in the bot code. */
logger: ReturnType<typeof createLogger>
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
/** The functions that should transform discord objects to discordeno shaped objects. */
transformers: Transformers
/** The handler functions that should handle incoming discord payloads from gateway and call an event. */

View File

@@ -573,16 +573,19 @@ export interface Transformers {
export interface CreateTransformerOptions {
defaultDesiredPropertiesValue: boolean
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}
export function createTransformers(options: Partial<Transformers>, opts?: CreateTransformerOptions): Transformers {
if (opts?.defaultDesiredPropertiesValue) {
logger.warn('[Transformers] WARNING WARNING WARNING!')
logger.warn(
const log = opts.logger ?? logger
log.warn('[Transformers] WARNING WARNING WARNING!')
log.warn(
'[Transformers] The defaultDesiredPropertiesValue property is being used and it is NOT RECOMMENDED. In fact it was WARNED AGAINST. It is extremely bad practice.',
)
logger.warn('[Transformers] It is a bit painful to work with and get started, but it has massive long term benefits.')
logger.warn('[Transformers] ----------------------------------------------------------------')
log.warn('[Transformers] It is a bit painful to work with and get started, but it has massive long term benefits.')
log.warn('[Transformers] ----------------------------------------------------------------')
}
return {

View File

@@ -38,11 +38,14 @@ export class DiscordenoShard {
resolves = new Map<'READY' | 'RESUMED' | 'INVALID_SESSION', (payload: DiscordGatewayPayload) => void>()
/** Shard bucket. Only access this if you know what you are doing. Bucket for handling shard request rate limits. */
bucket: LeakyBucket
/** Logger for the bucket */
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
constructor(options: ShardCreateOptions) {
this.id = options.id
this.connection = options.connection
this.events = options.events
this.logger = options.logger ?? logger
this.heart = {
acknowledged: false,
@@ -56,6 +59,7 @@ export class DiscordenoShard {
max: this.calculateSafeRequests(),
refillAmount: this.calculateSafeRequests(),
refillInterval: 60000,
logger: this.logger,
})
}
@@ -137,7 +141,7 @@ export class DiscordenoShard {
// A new identify has been requested even though there is already a connection open.
// Therefore we need to close the old connection and heartbeating before creating a new one.
if (this.isOpen()) {
logger.debug(`CLOSING EXISTING SHARD: #${this.id}`)
this.logger.debug(`CLOSING EXISTING SHARD: #${this.id}`)
this.close(ShardSocketCloseCodes.ReIdentifying, 'Re-identifying closure of old connection.')
}
@@ -190,27 +194,27 @@ export class DiscordenoShard {
/** Attempt to resume the previous shards session with the gateway. */
async resume(): Promise<void> {
logger.debug(`[Gateway] Resuming Shard #${this.id}`)
this.logger.debug(`[Gateway] Resuming Shard #${this.id}`)
// It has been requested to resume the Shards session.
// It's possible that the shard is still connected with Discord's gateway therefore we need to forcefully close it.
if (this.isOpen()) {
logger.debug(`[Gateway] Resuming Shard #${this.id} in isOpen`)
this.logger.debug(`[Gateway] Resuming Shard #${this.id} in isOpen`)
this.close(ShardSocketCloseCodes.ResumeClosingOldConnection, 'Reconnecting the shard, closing old connection.')
}
// Shard has never identified, so we cannot resume.
if (!this.sessionId) {
logger.debug(`[Shard] Trying to resume a shard #${this.id} that was NOT first identified. (No session id found)`)
this.logger.debug(`[Shard] Trying to resume a shard #${this.id} that was NOT first identified. (No session id found)`)
return await this.identify()
}
this.state = ShardState.Resuming
logger.debug(`[Gateway] Resuming Shard #${this.id}, before connecting`)
this.logger.debug(`[Gateway] Resuming Shard #${this.id}, before connecting`)
// Before we can resume, we need to create a new connection with Discord's gateway.
await this.connect()
logger.debug(
this.logger.debug(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`[Gateway] Resuming Shard #${this.id}, after connecting. ${this.gatewayConfig.token} | ${this.sessionId} | ${this.previousSequenceNumber}`,
)
@@ -226,7 +230,7 @@ export class DiscordenoShard {
},
true,
)
logger.debug(`[Gateway] Resuming Shard #${this.id} after send resumg`)
this.logger.debug(`[Gateway] Resuming Shard #${this.id} after send resumg`)
return await new Promise((resolve) => {
this.resolves.set('RESUMED', () => resolve())
@@ -293,7 +297,7 @@ export class DiscordenoShard {
case GatewayCloseEventCodes.InvalidSeq:
case GatewayCloseEventCodes.RateLimited:
case GatewayCloseEventCodes.SessionTimedOut: {
logger.debug(`[Shard] Gateway connection closing requiring re-identify. Code: ${close.code}`)
this.logger.debug(`[Shard] Gateway connection closing requiring re-identify. Code: ${close.code}`)
this.state = ShardState.Identifying
this.events.disconnected?.(this)
@@ -317,7 +321,7 @@ export class DiscordenoShard {
case GatewayCloseEventCodes.DecodeError:
case GatewayCloseEventCodes.AlreadyAuthenticated:
default: {
logger.info(`[Shard] closed shard #${this.id}. Resuming...`)
this.logger.info(`[Shard] closed shard #${this.id}. Resuming...`)
this.state = ShardState.Resuming
this.events.disconnected?.(this)
@@ -353,7 +357,7 @@ export class DiscordenoShard {
}
case GatewayOpcodes.Hello: {
const interval = (packet.d as DiscordHello).heartbeat_interval
logger.debug(`[Gateway] Hello on Shard #${this.id}`)
this.logger.debug(`[Gateway] Hello on Shard #${this.id}`)
this.startHeartbeating(interval)
if (this.state !== ShardState.Resuming) {
@@ -396,7 +400,7 @@ export class DiscordenoShard {
}
case GatewayOpcodes.InvalidSession: {
const resumable = packet.d as boolean
logger.debug(`[Shard] Received Invalid Session for Shard #${this.id} with resumeable as ${resumable.toString()}`)
this.logger.debug(`[Shard] Received Invalid Session for Shard #${this.id} with resumeable as ${resumable.toString()}`)
this.events.invalidSession?.(this, resumable)
@@ -501,7 +505,7 @@ export class DiscordenoShard {
/** Start sending heartbeat payloads to Discord in the provided interval. */
startHeartbeating(interval: number): void {
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id}`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id}`)
// If old heartbeast exist like after resume, clear the old ones.
if (this.heart.intervalId) clearInterval(this.heart.intervalId)
if (this.heart.timeoutId) clearTimeout(this.heart.timeoutId)
@@ -511,7 +515,7 @@ export class DiscordenoShard {
// Only set the shard's state to `Unidentified`
// if heartbeating has not been started due to an identify or resume action.
if ([ShardState.Disconnected, ShardState.Offline].includes(this.state)) {
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} a`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} a`)
this.state = ShardState.Unidentified
}
@@ -521,9 +525,9 @@ export class DiscordenoShard {
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating
const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5))
this.heart.timeoutId = setTimeout(() => {
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} b`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} b`)
if (!this.isOpen()) return
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} c ${this.previousSequenceNumber!}`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} c ${this.previousSequenceNumber!}`)
// Using a direct socket.send call here because heartbeat requests are reserved by us.
this.socket?.send(
@@ -533,15 +537,15 @@ export class DiscordenoShard {
}),
)
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} d`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} d`)
this.heart.lastBeat = Date.now()
this.heart.acknowledged = false
// After the random heartbeat jitter we can start a normal interval.
this.heart.intervalId = setInterval(async () => {
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} e`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} e`)
if (!this.isOpen()) return
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} f`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} f`)
// gateway.debug("GW DEBUG", `Running setInterval in heartbeat file. Shard: ${shardId}`);
// gateway.debug("GW HEARTBEATING", { shardId, shard: currentShard });
@@ -551,7 +555,7 @@ export class DiscordenoShard {
// The Shard needs to start a re-identify action accordingly.
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating-example-gateway-heartbeat-ack
if (!this.heart.acknowledged) {
logger.debug(`[Shard] Heartbeat not acknowledged for shard #${this.id}.`)
this.logger.debug(`[Shard] Heartbeat not acknowledged for shard #${this.id}.`)
this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection, did not receive an heartbeat ACK in time.')
return await this.identify()
@@ -559,7 +563,7 @@ export class DiscordenoShard {
this.heart.acknowledged = false
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} g`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} g`)
// Using a direct socket.send call here because heartbeat requests are reserved by us.
this.socket?.send(
JSON.stringify({
@@ -567,7 +571,7 @@ export class DiscordenoShard {
d: this.previousSequenceNumber,
}),
)
logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} h`)
this.logger.debug(`[Gateway] Start Heartbeating Shard #${this.id} h`)
this.heart.lastBeat = Date.now()
@@ -593,6 +597,8 @@ export interface ShardCreateOptions {
connection: ShardGatewayConfig
/** The event handlers for events on the shard. */
events: ShardEvents
/** The logger for the shard */
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
/** The handler to request a space to make an identify request. */
requestIdentify?: () => Promise<void>
/** The handler to alert the gateway manager that this shard has received a READY event. */

View File

@@ -53,15 +53,16 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
pending: new Collection(),
},
},
logger: options.logger ?? logger,
calculateTotalShards() {
// Bots under 100k servers do not have access to total shards.
if (gateway.totalShards < 100) {
logger.debug(`[Gateway] Calculating total shards: ${gateway.totalShards}`)
gateway.logger.debug(`[Gateway] Calculating total shards: ${gateway.totalShards}`)
return gateway.totalShards
}
logger.debug(`[Gateway] Calculating total shards`, gateway.totalShards, gateway.connection.sessionStartLimit.maxConcurrency)
gateway.logger.debug(`[Gateway] Calculating total shards`, gateway.totalShards, gateway.connection.sessionStartLimit.maxConcurrency)
// Calculate a multiple of `maxConcurrency` which can be used to connect to the gateway.
return (
Math.ceil(
@@ -73,14 +74,14 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
calculateWorkerId(shardId) {
const workerId = Math.min(shardId % gateway.shardsPerWorker, gateway.totalWorkers - 1)
logger.debug(
gateway.logger.debug(
`[Gateway] Calculating workerId: Shard: ${shardId} -> Worker: ${workerId} -> Per Worker: ${gateway.shardsPerWorker} -> Total: ${gateway.totalWorkers}`,
)
return workerId
},
prepareBuckets() {
for (let i = 0; i < gateway.connection.sessionStartLimit.maxConcurrency; ++i) {
logger.debug(`[Gateway] Preparing buckets for concurrency: ${i}`)
gateway.logger.debug(`[Gateway] Preparing buckets for concurrency: ${i}`)
gateway.buckets.set(i, {
workers: [],
identifyRequests: [],
@@ -89,7 +90,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let shardId = gateway.firstShardId; shardId <= gateway.lastShardId; ++shardId) {
logger.debug(`[Gateway] Preparing buckets for shard: ${shardId}`)
gateway.logger.debug(`[Gateway] Preparing buckets for shard: ${shardId}`)
if (shardId >= gateway.totalShards) {
throw new Error(`Shard (id: ${shardId}) is bigger or equal to the used amount of used shards which is ${gateway.totalShards}`)
}
@@ -153,12 +154,12 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
await shard.send(payload)
},
async tellWorkerToIdentify(workerId, shardId, bucketId) {
logger.debug(`[Gateway] tell worker to identify (${workerId}, ${shardId}, ${bucketId})`)
gateway.logger.debug(`[Gateway] tell worker to identify (${workerId}, ${shardId}, ${bucketId})`)
await gateway.identify(shardId)
},
async identify(shardId: number) {
let shard = this.shards.get(shardId)
logger.debug(`[Gateway] identifying ${shard ? 'existing' : 'new'} shard (${shardId})`)
gateway.logger.debug(`[Gateway] identifying ${shard ? 'existing' : 'new'} shard (${shardId})`)
if (!shard) {
shard = new Shard({
@@ -173,13 +174,14 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
version: this.version,
},
events: options.events ?? {},
logger: this.logger,
requestIdentify: async () => {
await gateway.identify(shardId)
},
shardIsReady: async () => {
logger.debug(`[Shard] Shard #${shardId} is ready`)
gateway.logger.debug(`[Shard] Shard #${shardId} is ready`)
await delay(gateway.spawnShardDelay)
logger.debug(`[Shard] Resolving shard identify request`)
gateway.logger.debug(`[Shard] Resolving shard identify request`)
gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.identifyRequests.shift()?.()
},
})
@@ -199,7 +201,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
return await new Promise((resolve) => {
// Mark that we are making an identify request so another is not made.
bucket.identifyRequests.push(resolve)
logger.debug(`[Gateway] identifying shard #(${shardId}).`)
gateway.logger.debug(`[Gateway] identifying shard #(${shardId}).`)
// This will trigger identify and when READY is received it will resolve the above request.
shard?.identify()
})
@@ -207,15 +209,15 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
async kill(shardId: number) {
const shard = this.shards.get(shardId)
if (!shard) {
return logger.debug(`[Gateway] kill shard but not found (${shardId})`)
return gateway.logger.debug(`[Gateway] kill shard but not found (${shardId})`)
}
logger.debug(`[Gateway] kill shard (${shardId})`)
gateway.logger.debug(`[Gateway] kill shard (${shardId})`)
this.shards.delete(shardId)
await shard.shutdown()
},
async requestIdentify(shardId: number) {
logger.debug(`[Gateway] requesting identify`)
gateway.logger.debug(`[Gateway] requesting identify`)
// const bucket = gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)
// if (!bucket) return
@@ -231,18 +233,18 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
if (!totalShards) totalShards = gateway.totalShards
// If it is only 1 shard, it will always be shard id 0
if (totalShards === 1) {
logger.debug(`[Gateway] calculateShardId (1 shard)`)
gateway.logger.debug(`[Gateway] calculateShardId (1 shard)`)
return 0
}
logger.debug(`[Gateway] calculateShardId (guildId: ${guildId}, totalShards: ${totalShards})`)
gateway.logger.debug(`[Gateway] calculateShardId (guildId: ${guildId}, totalShards: ${totalShards})`)
return Number((BigInt(guildId) >> 22n) % BigInt(totalShards))
},
async joinVoiceChannel(guildId, channelId, options) {
const shardId = gateway.calculateShardId(guildId)
logger.debug(`[Gateway] joinVoiceChannel guildId: ${guildId} channelId: ${channelId}`)
gateway.logger.debug(`[Gateway] joinVoiceChannel guildId: ${guildId} channelId: ${channelId}`)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.VoiceStateUpdate,
@@ -256,7 +258,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
async editBotStatus(data) {
logger.debug(`[Gateway] editBotStatus data: ${JSON.stringify(data)}`)
gateway.logger.debug(`[Gateway] editBotStatus data: ${JSON.stringify(data)}`)
await Promise.all(
[...gateway.shards.values()].map(async (shard) => {
@@ -266,7 +268,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
async editShardStatus(shardId, data) {
logger.debug(`[Gateway] editShardStatus shardId: ${shardId} -> data: ${JSON.stringify(data)}`)
gateway.logger.debug(`[Gateway] editShardStatus shardId: ${shardId} -> data: ${JSON.stringify(data)}`)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.PresenceUpdate,
@@ -285,10 +287,10 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
if (gateway.intents && (!options?.limit || options.limit > 1) && !(gateway.intents & GatewayIntents.GuildMembers))
throw new Error('Cannot fetch more then 1 member without the GUILD_MEMBERS intent')
logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> data: ${JSON.stringify(options)}`)
gateway.logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> data: ${JSON.stringify(options)}`)
if (options?.userIds?.length) {
logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> setting user limit based on userIds length: ${options.userIds.length}`)
gateway.logger.debug(`[Gateway] requestMembers guildId: ${guildId} -> setting user limit based on userIds length: ${options.userIds.length}`)
options.limit = options.userIds.length
}
@@ -329,7 +331,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
async leaveVoiceChannel(guildId) {
const shardId = gateway.calculateShardId(guildId)
logger.debug(`[Gateway] leaveVoiceChannel guildId: ${guildId} Shard ${shardId}`)
gateway.logger.debug(`[Gateway] leaveVoiceChannel guildId: ${guildId} Shard ${shardId}`)
await gateway.sendPayload(shardId, {
op: GatewayOpcodes.VoiceStateUpdate,
@@ -436,6 +438,11 @@ export interface CreateGatewayManagerOptions {
enabled?: boolean
}
}
/**
* The logger that the gateway manager will use
* @default logger // The logger exported by `@discordeno/utils`
*/
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}
export interface GatewayManager extends Required<CreateGatewayManagerOptions> {
@@ -450,6 +457,8 @@ export interface GatewayManager extends Required<CreateGatewayManagerOptions> {
>
/** The shards that are created. */
shards: Map<number, Shard>
/** The logger for the gateway manager */
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
/** Determine max number of shards to use based upon the max concurrency. */
calculateTotalShards: () => number
/** Determine the id of the worker which is handling a shard. */

View File

@@ -17,6 +17,7 @@ export function createInvalidRequestBucket(options: InvalidRequestBucketOptions)
errorStatuses: options.errorStatuses ?? [401, 403, 429],
activeRequests: options.requested ?? 0,
processing: false,
logger: options.logger ?? logger,
waiting: [],
@@ -55,14 +56,14 @@ export function createInvalidRequestBucket(options: InvalidRequestBucketOptions)
bucket.processing = true
while (bucket.waiting.length > 0) {
logger.info(
bucket.logger.info(
`[InvalidBucket] processing waiting queue while loop ran with ${bucket.waiting.length} pending requests to be made. ${JSON.stringify(
bucket,
)}`,
)
if (!bucket.isRequestAllowed() && bucket.resetAt !== undefined) {
logger.warn(
bucket.logger.warn(
`[InvalidBucket] processing waiting queue is now paused until more requests are available. ${
bucket.waiting.length
} pending requests. ${JSON.stringify(bucket)}`,
@@ -93,7 +94,7 @@ export function createInvalidRequestBucket(options: InvalidRequestBucketOptions)
}
bucket.invalidRequests += 1
logger.warn(`[InvalidBucket] an invalid request was made. Increasing invalidRequests count to ${bucket.invalidRequests}`)
bucket.logger.warn(`[InvalidBucket] an invalid request was made. Increasing invalidRequests count to ${bucket.invalidRequests}`)
},
}
@@ -115,6 +116,8 @@ export interface InvalidRequestBucketOptions {
errorStatuses?: number[]
/** The amount of requests that were requested from this bucket. */
requested?: number
/** The logger that will be used for the bucket */
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}
export interface InvalidRequestBucket {
@@ -136,6 +139,8 @@ export interface InvalidRequestBucket {
waiting: Array<(value: void | PromiseLike<void>) => void>
/** Whether or not the waiting queue is already processing. */
processing: boolean
/** The logger that will be used for the bucket */
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
/** Gives the number of requests that are currently allowed. */
requestsAllowed: () => number

View File

@@ -85,7 +85,7 @@ export function createRestManager(options: CreateRestManagerOptions): RestManage
baseUrl,
deleteQueueDelay: 60000,
globallyRateLimited: false,
invalidBucket: createInvalidRequestBucket({}),
invalidBucket: createInvalidRequestBucket({ logger: options.logger }),
isProxied: !baseUrl.startsWith(DISCORD_API_URL),
maxRetryCount: Infinity,
processingRateLimitedPaths: false,
@@ -93,6 +93,7 @@ export function createRestManager(options: CreateRestManagerOptions): RestManage
rateLimitedPaths: new Map(),
token: options.token,
version: options.version ?? DISCORD_API_VERSION,
logger: options.logger ?? logger,
routes: createRoutes(),
@@ -343,9 +344,9 @@ export function createRestManager(options: CreateRestManagerOptions): RestManage
loggingHeaders.authorization = `${authenticationScheme} tokenhere`
}
logger.debug(`sending request to ${url}`, 'with payload:', { ...payload, headers: loggingHeaders })
rest.logger.debug(`sending request to ${url}`, 'with payload:', { ...payload, headers: loggingHeaders })
const response = await fetch(url, payload).catch(async (error) => {
logger.error(error)
rest.logger.error(error)
// Mark request and completed
rest.invalidBucket.handleCompletedRequest(999, false)
options.reject({
@@ -355,7 +356,7 @@ export function createRestManager(options: CreateRestManagerOptions): RestManage
})
throw error
})
logger.debug(`request fetched from ${url} with status ${response.status} & ${response.statusText}`)
rest.logger.debug(`request fetched from ${url} with status ${response.status} & ${response.statusText}`)
// Mark request and completed
rest.invalidBucket.handleCompletedRequest(response.status, response.headers.get(RATE_LIMIT_SCOPE_HEADER) === 'shared')
@@ -369,17 +370,17 @@ export function createRestManager(options: CreateRestManagerOptions): RestManage
if (bucketId) options.bucketId = bucketId
if (response.status < HttpResponseCode.Success || response.status >= HttpResponseCode.Error) {
logger.debug(`Request to ${url} failed.`)
rest.logger.debug(`Request to ${url} failed.`)
if (response.status !== HttpResponseCode.TooManyRequests) {
options.reject({ ok: false, status: response.status, body: await response.text() })
return
}
logger.debug(`Request to ${url} was ratelimited.`)
rest.logger.debug(`Request to ${url} was ratelimited.`)
// Too many attempts, get rid of request from queue.
if (options.retryCount >= rest.maxRetryCount) {
logger.debug(`Request to ${url} exceeded the maximum allowed retries.`, 'with payload:', payload)
rest.logger.debug(`Request to ${url} exceeded the maximum allowed retries.`, 'with payload:', payload)
// rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(options)}`)
options.reject({
ok: false,

View File

@@ -1,4 +1,4 @@
import { delay, logger } from '@discordeno/utils'
import { delay } from '@discordeno/utils'
import type { RestManager, SendRequestOptions } from './types.js'
export class Queue {
@@ -71,7 +71,7 @@ export class Queue {
this.processing = true
while (this.waiting.length > 0) {
logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process waiting while loop ran.`)
this.rest.logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process waiting while loop ran.`)
if (this.isRequestAllowed()) {
// Resolve the next item in the queue
this.waiting.shift()?.()
@@ -93,7 +93,7 @@ export class Queue {
this.processingPending = true
while (this.pending.length > 0) {
logger.debug(`Queue ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process pending while loop ran with ${this.pending.length}.`)
this.rest.logger.debug(`Queue ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process pending while loop ran with ${this.pending.length}.`)
if (!this.firstRequest && !this.isRequestAllowed()) {
const now = Date.now()
const future = this.frozenAt + this.interval
@@ -135,7 +135,7 @@ export class Queue {
}
}
logger.debug(`Queue ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process pending while loop exited with ${this.pending.length}.`)
this.rest.logger.debug(`Queue ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url} process pending while loop exited with ${this.pending.length}.`)
// Mark as false so next pending request can be triggered by new loop.
this.processingPending = false
@@ -174,20 +174,20 @@ export class Queue {
return
}
logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url}. Delaying delete for ${this.deleteQueueDelay}ms`)
this.rest.logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url}. Delaying delete for ${this.deleteQueueDelay}ms`)
// Delete in a minute giving a bit of time to allow new requests that may reuse this queue
setTimeout(async () => {
if (!this.isQueueClearable()) {
logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url}. is not clearable. Restarting processing of queue.`)
this.rest.logger.debug(`[Queue] ${this.isOauth2Queue() ? '' : 'Bearer '}${this.url}. is not clearable. Restarting processing of queue.`)
this.processPending()
return
}
logger.debug(`[Queue] ${this.url}. Deleting`)
this.rest.logger.debug(`[Queue] ${this.url}. Deleting`)
if (this.timeoutId) clearTimeout(this.timeoutId)
// No requests have been requested for this queue so we nuke this queue
this.rest.queues.delete(`${this.authentication}${this.url}`)
logger.debug(
this.rest.logger.debug(
`[Queue] ${this.url}. Deleted! Remaining: (${this.rest.queues.size})`,
[...this.rest.queues.values()].map((queue) => `${queue.isOauth2Queue() ? '' : 'Bearer '}${queue.url}`),
)

View File

@@ -128,6 +128,7 @@ import type {
UpsertGlobalApplicationCommandOptions,
UpsertGuildApplicationCommandOptions,
} from '@discordeno/types'
import type { logger } from '@discordeno/utils'
import type { InvalidRequestBucket } from './invalidBucket.js'
import type { Queue } from './queue.js'
import type { RestRoutes } from './typings/routes.js'
@@ -166,6 +167,11 @@ export interface CreateRestManagerOptions {
* @default 10
*/
version?: ApiVersions
/**
* The logger that the rest manager will use
* @default logger // The logger exported by `@discordeno/utils`
*/
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}
export interface RestManager {
@@ -207,6 +213,8 @@ export interface RestManager {
invalidBucket: InvalidRequestBucket
/** The routes that are available for this manager. */
routes: RestRoutes
/** The logger to use for the rest manager */
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
/** Allows the user to inject custom headers that will be sent with every request. */
createBaseHeaders: () => Record<string, string>
/** Whether or not the rest manager should keep objects in raw snake case from discord. */

View File

@@ -16,11 +16,14 @@ export class LeakyBucket implements LeakyBucketOptions {
timeoutId?: NodeJS.Timeout
/** The timestamp in milliseconds when the next refill is scheduled. */
refillsAt?: number
/** Logger used in the leaky bucket */
logger: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
constructor(options?: LeakyBucketOptions) {
this.max = options?.max ?? 1
this.refillAmount = options?.refillAmount ? (options.refillAmount > this.max ? this.max : options.refillAmount) : 1
this.refillInterval = options?.refillInterval ?? 5000
this.logger = options?.logger ?? logger
}
/** The amount of requests that still remain. */
@@ -30,7 +33,7 @@ export class LeakyBucket implements LeakyBucketOptions {
/** Refills the bucket as needed. */
refillBucket(): void {
logger.debug(`[LeakyBucket] Timeout for leaky bucket requests executed. Refilling bucket.`)
this.logger.debug(`[LeakyBucket] Timeout for leaky bucket requests executed. Refilling bucket.`)
// Lower the used amount by the refill amount
this.used = this.refillAmount > this.used ? 0 : this.used - this.refillAmount
// Reset the refillsAt timestamp since it just got refilled
@@ -49,17 +52,17 @@ export class LeakyBucket implements LeakyBucketOptions {
/** Begin processing the queue. */
async processQueue(): Promise<void> {
logger.debug('[LeakyBucket] Processing queue')
this.logger.debug('[LeakyBucket] Processing queue')
// There is already a queue that is processing
if (this.processing) return logger.debug('[LeakyBucket] Queue is already processing.')
if (this.processing) return this.logger.debug('[LeakyBucket] Queue is already processing.')
this.processing = true
// Begin going through the queue.
while (this.queue.length) {
if (this.remaining) {
logger.debug(`[LeakyBucket] Processing queue. Remaining: ${this.remaining} Length: ${this.queue.length}`)
this.logger.debug(`[LeakyBucket] Processing queue. Remaining: ${this.remaining} Length: ${this.queue.length}`)
// Resolves the promise allowing the paused execution of this request to resolve and continue.
this.queue.shift()?.()
// A request can be made
@@ -67,7 +70,7 @@ export class LeakyBucket implements LeakyBucketOptions {
// Create a new timeout for this request if none exists.
if (!this.timeoutId) {
logger.debug(`[LeakyBucket] Creating new timeout for leaky bucket requests.`)
this.logger.debug(`[LeakyBucket] Creating new timeout for leaky bucket requests.`)
this.timeoutId = setTimeout(() => {
this.refillBucket()
@@ -82,14 +85,14 @@ export class LeakyBucket implements LeakyBucketOptions {
const now = Date.now()
// If there is time left until next refill, just delay execution.
if (this.refillsAt > now) {
logger.debug(`[LeakyBucket] Delaying execution of leaky bucket requests for ${this.refillsAt - now}ms`)
this.logger.debug(`[LeakyBucket] Delaying execution of leaky bucket requests for ${this.refillsAt - now}ms`)
await delay(this.refillsAt - now)
logger.debug(`[LeakyBucket] Resuming execution`)
this.logger.debug(`[LeakyBucket] Resuming execution`)
}
// If the refillsAt has passed but the timeout didn't yet execute delay the execution
else {
logger.debug(`[LeakyBucket] Delaying execution of leaky bucket requests for 1000ms`)
this.logger.debug(`[LeakyBucket] Delaying execution of leaky bucket requests for 1000ms`)
await delay(1000)
}
}
@@ -129,4 +132,9 @@ export interface LeakyBucketOptions {
* @default 1
*/
refillAmount?: number
/**
* The logger that the leaky bucket will use
* @default logger // The logger exported by `@discordeno/utils`
*/
logger?: Pick<typeof logger, 'debug' | 'info' | 'warn' | 'error' | 'fatal'>
}