fix: rate limit queues and headers processing

This commit is contained in:
Skillz
2023-01-02 13:06:37 -06:00
parent e28cc2a732
commit a096411b37
5 changed files with 610 additions and 46 deletions

View File

@@ -0,0 +1,150 @@
import { delay } from '@discordeno/utils'
/**
* A invalid request bucket is used in a similar manner as a leaky bucket but a invalid request bucket can be refilled as needed.
* It's purpose is to make sure the bot does not hit the limit to getting a 1 hr ban.
*
* @param options The options used to configure this bucket.
* @returns RefillingBucket
*/
export function createInvalidRequestBucket (
options: InvalidRequestBucketOptions
): InvalidRequestBucket {
const bucket: InvalidRequestBucket = {
current: options.current ?? 0,
max: options.max ?? 10000,
interval: options.interval ?? 600000,
timeoutId: options.timeoutId,
safety: options.safety ?? 1,
frozenAt: options.frozenAt ?? 0,
errorStatuses: options.errorStatuses ?? [401, 403, 429],
requested: options.requested ?? 0,
processing: false,
waiting: [],
requestsAllowed: function () {
return bucket.max - bucket.current - bucket.requested - bucket.safety
},
isRequestAllowed: function () {
return bucket.requestsAllowed() > 0
},
waitUntilRequestAvailable: async function () {
// eslint-disable-next-line no-async-promise-executor
return await new Promise(async (resolve) => {
// If whatever amount of requests is left is more than the safety margin, allow the request
if (bucket.isRequestAllowed()) {
bucket.requested++
resolve()
} else {
bucket.waiting.push(resolve)
await bucket.processWaiting()
}
})
},
processWaiting: async function () {
// If already processing, that loop will handle all waiting requests.
if (bucket.processing) {
return
}
// Mark as processing so other loops don't start
bucket.processing = true
while (bucket.waiting.length > 0) {
if (bucket.isRequestAllowed()) {
bucket.requested++
// Resolve the next item in the queue
bucket.waiting.shift()?.()
} else {
await delay(1000)
}
}
// Mark as false so next pending request can be triggered by new loop.
bucket.processing = false
},
handleCompletedRequest: function (code, sharedScope) {
// Since request is complete, we can remove one from requested.
bucket.requested--
// Since it is as a valid request, we don't need to do anything
if (!bucket.errorStatuses.includes(code)) return
// Shared scope is not considered invalid
if (code === 429 && sharedScope) return
// INVALID REQUEST WAS MADE
// If it was not frozen before, mark it frozen
if (bucket.frozenAt === 0) bucket.frozenAt = Date.now()
// Mark a request has been invalid
bucket.current++
// If a timeout was not started, start a timeout to reset this bucket
if (bucket.timeoutId === undefined) {
bucket.timeoutId = setTimeout(() => {
bucket.frozenAt = 0
bucket.current = 0
bucket.timeoutId = undefined
}, bucket.frozenAt + bucket.interval)
}
}
}
return bucket
}
export interface InvalidRequestBucketOptions {
/** current invalid amount */
current?: number
/** max invalid requests allowed until ban. Defaults to 10,000 */
max?: number
/** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */
interval?: number
/** timer to reset to 0 */
timeoutId?: NodeJS.Timeout
/** how safe to be from max. Defaults to 1 */
safety?: number
/** when first request in this period was made */
frozenAt?: number
/** The request statuses that count as an invalid request. */
errorStatuses?: number[]
/** The amount of requests that were requested from this bucket. */
requested?: number
}
export interface InvalidRequestBucket {
/** current invalid amount */
current: number
/** max invalid requests allowed until ban. Defaults to 10,000 */
max: number
/** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */
interval: number
/** timer to reset to 0 */
timeoutId: NodeJS.Timeout | undefined
/** how safe to be from max. Defaults to 1 */
safety: number
/** when first request in this period was made */
frozenAt: number
/** The request statuses that count as an invalid request. */
errorStatuses: number[]
/** The amount of requests that were requested from this bucket. */
requested: number
/** The requests that are currently pending. */
waiting: Array<(value: void | PromiseLike<void>) => void>
/** Whether or not the waiting queue is already processing. */
processing: boolean
/** Gives the number of requests that are currently allowed. */
requestsAllowed: () => number
/** Checks if a request is allowed at this time. */
isRequestAllowed: () => boolean
/** Waits until a request is available */
waitUntilRequestAvailable: () => Promise<void>
/** Begins processing the waiting queue of requests. */
processWaiting: () => Promise<void>
/** Handler for whenever a request is validated. This should update the requested values or trigger any other necessary stuff. */
handleCompletedRequest: (code: number, sharedScope: boolean) => void
}

View File

@@ -2,6 +2,9 @@
import TRANSFORMERS from '@discordeno/transformer'
import type { BigString, Camelize, CreateMessageOptions, DiscordCreateMessage, DiscordMessage, DiscordUser, GetMessagesOptions } from '@discordeno/types'
import { camelize, delay } from '@discordeno/utils'
import type { InvalidRequestBucket } from './invalidBucket.js'
import { createInvalidRequestBucket } from './invalidBucket.js'
import { Queue } from './queue.js'
// TODO: make dynamic based on package.json file
const version = '18.0.0-alpha.1'
@@ -11,6 +14,12 @@ export function createRestManager (options: CreateRestManagerOptions): RestManag
token: options.token,
version: options.version ?? 10,
baseUrl: options.baseUrl ?? 'https://discord.com/api',
maxRetryCount: Infinity,
globallyRateLimited: false,
processingRateLimitedPaths: false,
queues: new Map(),
rateLimitedPaths: new Map(),
invalidBucket: createInvalidRequestBucket({}),
routes: {
// Channel Endpoints
@@ -48,6 +57,22 @@ export function createRestManager (options: CreateRestManagerOptions): RestManag
},
checkRateLimits (url) {
const ratelimited = rest.rateLimitedPaths.get(url)
const global = rest.rateLimitedPaths.get('global')
const now = Date.now()
if (ratelimited && now < ratelimited.resetTimestamp) {
return ratelimited.resetTimestamp - now
}
if (global && now < global.resetTimestamp) {
return global.resetTimestamp - now
}
return false
},
createRequest (options) {
const headers: Record<string, string> = {
'user-agent': `DiscordBot (https://github.com/discordeno/discordeno, v${version})`
@@ -88,65 +113,203 @@ export function createRestManager (options: CreateRestManagerOptions): RestManag
}
},
processRateLimitedPaths (): void {
const now = Date.now()
for (const [key, value] of rest.rateLimitedPaths.entries()) {
// rest.debug(
// `[REST - processRateLimitedPaths] Running for of loop. ${
// value.resetTimestamp - now
// }`
// )
// If the time has not reached cancel
if (value.resetTimestamp > now) continue
// Rate limit is over, delete the rate limiter
rest.rateLimitedPaths.delete(key)
// If it was global, also mark the global value as false
if (key === 'global') rest.globallyRateLimited = false
}
// ALL PATHS ARE CLEARED CAN CANCEL OUT!
if (rest.rateLimitedPaths.size === 0) {
rest.processingRateLimitedPaths = false
} else {
rest.processingRateLimitedPaths = true
// RECHECK IN 1 SECOND
setTimeout(() => {
// rest.debug('[REST - processRateLimitedPaths] Running setTimeout.')
rest.processRateLimitedPaths()
}, 1000)
}
},
/** Processes the rate limit headers and determines if it needs to be rate limited and returns the bucket id if available */
processHeaders (url: string, headers: Headers): string | undefined {
let rateLimited = false
// GET ALL NECESSARY HEADERS
const remaining = headers.get('x-ratelimit-remaining')
const retryAfter = headers.get('x-ratelimit-reset-after')
const reset = Date.now() + Number(retryAfter) * 1000
const global = headers.get('x-ratelimit-global')
// undefined override null needed for typings
const bucketId = headers.get('x-ratelimit-bucket') ?? undefined
rest.queues.get(url)?.handleCompletedRequest({
remaining: Number(remaining),
interval: Number(retryAfter) * 1000,
max: Number(headers.get('x-ratelimit-limit'))
})
// IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED
if (remaining === '0') {
rateLimited = true
// SAVE THE URL AS LIMITED, IMPORTANT FOR NEW REQUESTS BY USER WITHOUT BUCKET
rest.rateLimitedPaths.set(url, {
url,
resetTimestamp: reset,
bucketId
})
// SAVE THE BUCKET AS LIMITED SINCE DIFFERENT URLS MAY SHARE A BUCKET
if (bucketId) {
rest.rateLimitedPaths.set(bucketId, {
url,
resetTimestamp: reset,
bucketId
})
}
}
// IF THERE IS NO REMAINING GLOBAL LIMIT, MARK IT RATE LIMITED GLOBALLY
if (global) {
const retryAfter = headers.get('retry-after')
const globalReset = Date.now() + Number(retryAfter) * 1000
// rest.debug(
// `[REST = Globally Rate Limited] URL: ${url} | Global Rest: ${globalReset}`
// )
rest.globallyRateLimited = true
rateLimited = true
setTimeout(() => {
rest.globallyRateLimited = false
}, globalReset)
rest.rateLimitedPaths.set('global', {
url: 'global',
resetTimestamp: globalReset,
bucketId
})
if (bucketId) {
rest.rateLimitedPaths.set(bucketId, {
url: 'global',
resetTimestamp: globalReset,
bucketId
})
}
}
if (rateLimited && !rest.processingRateLimitedPaths) {
rest.processRateLimitedPaths()
}
return rateLimited ? bucketId : undefined
},
async sendRequest (options) {
// console.log('sending request', options.url, rest.createRequest({ method: options.method, url: options.url, body: options.body }))
const response = await fetch(
`${rest.baseUrl}/v${rest.version}/${options.url}`,
options.url,
rest.createRequest({ method: options.method, url: options.url, body: options.body })
)
// Set the bucket id if it was available on the headers
const bucketId = rest.processHeaders(rest.simplifyUrl(options.url, options.method), response.headers)
if (bucketId) options.bucketId = bucketId
if (response.status < 200 || response.status >= 400) {
// If NOT rate limited remove from queue
if (response.status === 429) {
// const json = await response.json()
// Too many attempts, get rid of request from queue.
if (options.retryCount++ >= rest.maxRetryCount) {
// rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(options)}`)
// Remove item from queue to prevent retry
return options.reject?.({
ok: false,
status: response.status,
error:
'The options was rate limited and it maxed out the retries limit.'
})
}
// TOO MANY ATTEMPTS, GET RID OF REQUEST FROM QUEUE.
// if (
// options.retryCount !== undefined &&
// options.retryCount++ >= rest.maxRetryCount
// ) {
// rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(options)}`)
// // REMOVE ITEM FROM QUEUE TO PREVENT RETRY
// options.reject?.({
// ok: false,
// status: response.status,
// error:
// 'The options was rate limited and it maxed out the retries limit.'
// })
// Rate limited, add back to queue
rest.invalidBucket.handleCompletedRequest(
response.status,
response.headers.get('X-RateLimit-Scope') === 'shared'
)
// // @ts-expect-error Code should never reach here
// return
// }
// RATE LIMITED, ADD BACK TO QUEUE
// rest.invalidBucket.handleCompletedRequest(
// response.status,
// response.headers.get('X-RateLimit-Scope') === 'shared'
// )
// console.log('rate limited', json.retry_after, response.headers)
const resetAfter = response.headers.get('x-ratelimit-reset-after')
if (resetAfter) await delay(Number(resetAfter) * 1000)
options.retryCount++
return await options.retryRequest?.(options)
}
// INVALID REQUEST
const body = JSON.stringify(await response.json())
return options.reject({
ok: false,
status: response.status,
body
})
}
options.resolve(await response.json())
},
// Credits: github.com/abalabahaha/eris lib/rest/RequestHandler.js#L397
// Modified for our use-case
simplifyUrl (url, method) {
let route = url
.replace(/\/([a-z-]+)\/(?:[0-9]{17,19})/g, function (match, p: string) {
return ['channels', 'guilds'].includes(p) ? match : `/${p}/x`
})
.replace(/\/reactions\/[^/]+/g, '/reactions/x')
// GENERAL /reactions and /reactions/emoji/@me share the buckets
if (route.includes('/reactions')) {
route = route.substring(0, route.indexOf('/reactions') + '/reactions'.length)
}
// Delete Message endpoint has its own rate limit
if (method === 'DELETE' && route.endsWith('/messages/x')) {
route = method + route
}
return route
},
processRequest (request: SendRequestOptions) {
const route = request.url.substring(request.url.indexOf('api/'))
const parts = route.split('/')
// Remove the api/
parts.shift()
// Removes the /v#/
if (parts[0]?.startsWith('v')) parts.shift()
// Set the full url to discord api in case it was recieved in a proxy rest
request.url = `${rest.baseUrl}/v${rest.version}/${parts.join('/')}`
const url = rest.simplifyUrl(request.url, request.method)
const queue = rest.queues.get(url)
if (queue !== undefined) {
queue.makeRequest(request)
} else {
// CREATES A NEW QUEUE
const bucketQueue = new Queue(rest, { url })
// Add request to queue
bucketQueue.makeRequest(request)
// Save queue
rest.queues.set(url, bucketQueue)
}
},
async makeRequest (method, url, body) {
return await new Promise((resolve, reject) => {
rest.sendRequest({
rest.processRequest({
url,
method,
body,
@@ -216,6 +379,18 @@ export interface RestManager {
* @default https://discord.com/api
*/
baseUrl: string
/** The maximum amount of times a request should be retried. Defaults to Infinity */
maxRetryCount: number
/** Whether or not the manager is rate limited globally across all requests. Defaults to false. */
globallyRateLimited: boolean
/** Whether or not the rate limited paths are being processed to allow requests to be made once time is up. Defaults to false. */
processingRateLimitedPaths: boolean
/** The queues that hold all the requests to be processed. */
queues: Map<string, Queue>
/** The paths that are currently rate limited. */
rateLimitedPaths: Map<string, RestRateLimitedPath>
/** The bucket for handling any invalid requests. */
invalidBucket: InvalidRequestBucket
/** The routes that are available for this manager. */
routes: {
/** A specific user route. */
@@ -228,12 +403,22 @@ export interface RestManager {
messages: (channelId: BigString, options?: GetMessagesOptions) => string
}
}
/** Check the rate limits for a url or a bucket. */
checkRateLimits: (url: string) => number | false
/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */
createRequest: (options: CreateRequestBodyOptions) => RequestBody
/** This will create a infinite loop running in 1 seconds using tail recursion to keep rate limits clean. When a rate limit resets, this will remove it so the queue can proceed. */
processRateLimitedPaths: () => void
/** Processes the rate limit headers and determines if it needs to be rate limited and returns the bucket id if available */
processHeaders: (url: string, headers: Headers) => string | undefined
/** Sends a request to the api. */
sendRequest: (options: SendRequestOptions) => Promise<void>
/** Split a url to separate rate limit buckets based on major/minor parameters. */
simplifyUrl: (url: string, method: RequestMethods) => string
/** Make a request to be sent to the api. */
makeRequest: <T = unknown>(method: RequestMethods, url: string, body?: Record<string, any>) => Promise<T>
/** Takes a request and processes it into a queue. */
processRequest: (request: SendRequestOptions) => void
/** Make a get request to the api */
get: <T = Record<string, unknown>>(url: string) => Promise<Camelize<T>>
/** Make a post request to the api. */
@@ -274,7 +459,7 @@ export interface RestManager {
sendMessage: (channelId: BigString, options: CreateMessageOptions) => Promise<Camelize<DiscordMessage>>
}
export type RequestMethods = 'GET' | 'POST'
export type RequestMethods = 'GET' | 'POST' | 'DELETE'
export type ApiVersions = 9 | 10
export interface CreateRequestBodyOptions {
@@ -306,4 +491,12 @@ export interface SendRequestOptions {
resolve: (value: any | PromiseLike<any>) => void
/** Reject handler when a request fails. */
reject: (reason?: any) => void
/** If this request has a bucket id which it falls under for rate limit */
bucketId?: string
}
export interface RestRateLimitedPath {
url: string
resetTimestamp: number
bucketId?: string
}

200
packages/rest/src/queue.ts Normal file
View File

@@ -0,0 +1,200 @@
import { delay } from '@discordeno/utils'
import type { RestManager, SendRequestOptions } from './manager'
export class Queue {
/** The rest manager */
rest: RestManager
/** Amount of requests that have are remaining. Defaults to 1. */
remaining: number = 1
/** Max requests for this this. Defaults to 1. */
max: number = 1
/** The time that discord allows to make the max number of requests. Defaults to 0 */
interval: number = 0
/** timer to reset to 0 */
timeoutId: NodeJS.Timeout | undefined
/** The requests that are currently pending. */
waiting: Array<(value: void | PromiseLike<void>) => void> = []
/** The requests that are currently pending. */
pending: SendRequestOptions[] = []
/** Whether or not the waiting queue is already processing. */
processing: boolean = false
/** Whether or not the pending queue is already processing. */
processingPending: boolean = false
/** Whether the first request is pending. */
firstRequest: boolean = false
/** The url that all the requests in this queue are sent to. */
url: string
constructor (rest: RestManager, options: QueueOptions) {
this.rest = rest
this.url = options.url
if (options?.interval) this.interval = options.interval
if (options?.max) this.max = options.max
if (options?.remaining) this.remaining = options.remaining
if (options?.timeoutId) this.timeoutId = options.timeoutId
}
/** Check if there is any remaining requests that are allowed. */
isRequestAllowed (): boolean {
return this.remaining > 0
}
/** Pauses the execution until a request is allowed to be made. */
async waitUntilRequestAvailable (): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
return await new Promise(async (resolve) => {
// If whatever amount of requests is left is more than the safety margin, allow the request
if (this.isRequestAllowed()) {
// this.remaining++;
resolve()
} else {
this.waiting.push(resolve)
await this.processWaiting()
}
})
}
/** Process the queue of requests waiting to be handled. */
async processWaiting (): Promise<void> {
// If already processing, that loop will handle all waiting requests.
if (this.processing) return
// Mark as processing so other loops don't start
this.processing = true
while (this.waiting.length > 0) {
if (this.isRequestAllowed()) {
// Resolve the next item in the queue
this.waiting.shift()?.()
} else {
await delay(1000)
}
}
// Mark as false so next pending request can be triggered by new loop.
this.processing = false
}
/** Process the queue of all requests pending to be sent. */
async processPending (): Promise<void> {
// If already processing, that loop will handle all pending requests.
if (this.processingPending) return
// Mark as processing so other loops don't start
this.processingPending = true
while (this.pending.length > 0) {
if (!this.firstRequest && !this.isRequestAllowed()) {
await delay(1000)
continue
}
const request = this.pending[0]
if (request) {
const basicURL = this.rest.simplifyUrl(
request.url,
request.method
)
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
// If this url is still rate limited, try again
const urlResetIn = this.rest.checkRateLimits(basicURL)
if (urlResetIn) await delay(urlResetIn)
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn =
request.bucketId
? this.rest.checkRateLimits(request.bucketId)
: false
if (bucketResetIn) await delay(bucketResetIn)
this.firstRequest = false
this.remaining--
if (this.timeoutId && this.remaining === 0 && this.interval !== 0) {
this.timeoutId = setTimeout(() => {
this.remaining = this.max
this.timeoutId = undefined
}, this.interval)
}
// Remove from queue, we are executing it.
this.pending.shift()
// Check if this request is able to be made globally
await this.rest.invalidBucket.waitUntilRequestAvailable()
await this.rest.sendRequest(request)
// Should be handled in sendRequest, this catch just prevents bots from dying
.catch(() => null)
}
}
// Mark as false so next pending request can be triggered by new loop.
this.processingPending = false
this.cleanup()
}
handleCompletedRequest (headers: { max: number, interval: number, remaining: number }): void {
if (headers.max === 0) {
this.remaining++
return
}
this.interval = headers.interval
this.remaining = headers.remaining
if (this.remaining <= 1) {
this.timeoutId = setTimeout(() => {
this.remaining = this.max
this.timeoutId = undefined
}, headers.interval)
}
}
async makeRequest (options: SendRequestOptions): Promise<void> {
await this.waitUntilRequestAvailable()
this.pending.push(options)
this.processPending()
}
/** Cleans up the queue by checking if there is nothing left and removing it. */
cleanup (): void {
if (!this.isQueueClearable()) {
this.processPending()
return
}
// Delete in a minute giving a bit of time to allow new requests that may reuse this queue
setTimeout(() => {
if (!this.isQueueClearable()) {
this.processPending()
return
}
// No requests have been requested for this queue so we nuke this queue
this.rest.queues.delete(this.url)
}, 60000)
}
isQueueClearable (): boolean {
if (this.firstRequest) return false
if (this.waiting.length > 0) return false
if (this.pending.length > 0) return false
if (this.interval === 0) return false
if (this.processing) return false
if (this.processingPending) return false
return true
}
}
export interface QueueOptions {
/** How many requests are remaining. Defaults to 1 */
remaining?: number
/** Max number of requests allowed in this this. Defaults to 1. */
max?: number
/** The time in milliseconds that discord allows to make the max number of invalid requests. Defaults to 0 */
interval?: number
/** timer to reset to 0 */
timeoutId?: NodeJS.Timeout
/** The url this queue will be handling. */
url: string
}

View File

@@ -5,7 +5,7 @@ import { rest } from './utils.js'
describe('[rest] Message related tests', () => {
describe('Send a message', () => {
it('With content', async () => {
const message = await rest.sendMessage('1057524844712964146', { content: 'testing rate limit manager' })
const message = await rest.sendMessage('1041029705790402611', { content: 'testing rate limit manager' })
expect(message.content).to.be.equal('testing rate limit manager')
})
})
@@ -13,7 +13,34 @@ describe('[rest] Message related tests', () => {
describe('Rate limit manager testing', () => {
it('Send 10 messages to 1 channel', async () => {
await Promise.all([0, 1, 2, 3, 4, 5, 6, 7, 8, 9].map(async (i) => {
await rest.sendMessage('1057524844712964146', { content: `testing rate limit manager ${i}` })
await rest.sendMessage('1041029705790402611', { content: `10 messages to 1 channel testing rate limit manager ${i}` })
}))
})
// TODO: Make this dynamic when we can create channels
const spamChannelIds = [
'1041029705790402611', '1041029706838966393',
'1041029707459731586', '1041029708004995199',
'1041029708453789766', '1041029709049385010',
'1041029709632377003', '1041029710227976313',
'1041029710764834856', '1041029711414956202',
'1041029712153149524', '1041029712933306459',
'1041029713566646313', '1041029714254508042',
'1041029714921406555', '1041029716334870629',
'1041029717127614636', '1041029717689647114',
'1041029718603997214', '1041029719925215302',
'1041029721179308082', '1041029721988812860',
'1041029722466943037', '1041029723217743964',
'1041029723872034826', '1041029724492804156',
'1041029725117743144', '1041029725818212474',
'1041029726531227741', '1041029727231684638'
]
it('Send 10 messages to 10 channels', async () => {
await Promise.all(spamChannelIds.map(async (channelId) => {
await Promise.all([...Array(10).keys()].map(async (_, index) => {
await rest.sendMessage(channelId, { content: `testing rate limit manager ${index}` })
}))
}))
})
})

View File

@@ -23,13 +23,7 @@ describe('[rest] User related tests', () => {
it('Has been camelized', () => {
const keys = Object.keys(user)
expect(keys.includes('mfa_enabled')).to.be.false
expect(keys.includes('accent_color')).to.be.false
expect(keys.includes('premium_type')).to.be.false
expect(keys.includes('public_flags')).to.be.false
expect(keys.includes('mfaEnabled')).to.be.true
expect(keys.includes('accentColor')).to.be.true
expect(keys.includes('premiumType')).to.be.true
expect(keys.includes('publicFlags')).to.be.true
})
})