diff --git a/packages/gateway/src/Shard.ts b/packages/gateway/src/Shard.ts index b9b69e953..50d80e12d 100644 --- a/packages/gateway/src/Shard.ts +++ b/packages/gateway/src/Shard.ts @@ -6,10 +6,10 @@ import type { DiscordHello, DiscordMember, DiscordReady, - RequestGuildMembers, + RequestGuildMembers } from '@discordeno/types' import { GatewayCloseEventCodes, GatewayIntents, GatewayOpcodes } from '@discordeno/types' -import { camelize, Collection, createLeakyBucket, delay, logger } from '@discordeno/utils' +import { camelize, Collection, delay, LeakyBucket, logger } from '@discordeno/utils' import { inflateSync } from 'node:zlib' import WebSocket from 'ws' import type { RequestMemberRequest } from './manager.js' @@ -44,11 +44,7 @@ export class DiscordenoShard { /** Resolve internal waiting states. Mapped by SelectedEvents => ResolveFunction */ resolves = new Map<'READY' | 'RESUMED' | 'INVALID_SESSION', (payload: DiscordGatewayPayload) => void>() /** Shard bucket. Only access this if you know what you are doing. Bucket for handling shard request rate limits. */ - bucket = createLeakyBucket({ - max: 120, - refillInterval: 60000, - refillAmount: 120, - }) + bucket: LeakyBucket /** This managers cache related settings. */ cache = { @@ -74,6 +70,12 @@ export class DiscordenoShard { } if (options.requestIdentify) this.requestIdentify = options.requestIdentify + + this.bucket = new LeakyBucket({ + max: this.calculateSafeRequests(), + refillAmount: this.calculateSafeRequests(), + refillInterval: 60000, + }) } /** The gateway configuration which is used to connect to Discord. */ @@ -265,7 +267,7 @@ export class DiscordenoShard { // Else bucket and token wait time just get wasted. await this.checkOffline(highPriority) - await this.bucket.acquire(1, highPriority) + await this.bucket.acquire(highPriority) // It's possible, that the shard went offline after a token has been acquired from the bucket. await this.checkOffline(highPriority) @@ -376,16 +378,18 @@ export class DiscordenoShard { this.startHeartbeating(interval) if (this.state !== ShardState.Resuming) { + const currentQueue = [...this.bucket.queue]; // HELLO has been send on a non resume action. // This means that the shard starts a new session, // therefore the rate limit interval has been reset too. - this.bucket = createLeakyBucket({ + this.bucket = new LeakyBucket({ max: this.calculateSafeRequests(), refillInterval: 60000, refillAmount: this.calculateSafeRequests(), - // Waiting acquires should not be lost on a re-identify. - waiting: this.bucket.waiting, }) + + // Queue should not be lost on a re-identify. + this.bucket.queue.unshift(...currentQueue); } this.events.hello?.(this) diff --git a/packages/gateway/src/manager.ts b/packages/gateway/src/manager.ts index c1f5921ec..259fc089c 100644 --- a/packages/gateway/src/manager.ts +++ b/packages/gateway/src/manager.ts @@ -1,6 +1,5 @@ import type { AtLeastOne, BigString, Camelize, DiscordGetGatewayBot, DiscordMember, RequestGuildMembers } from '@discordeno/types' -import type { LeakyBucket } from '@discordeno/utils' -import { Collection, createLeakyBucket, delay, logger } from '@discordeno/utils' +import { Collection, delay, LeakyBucket, logger } from '@discordeno/utils' import Shard from './Shard.js' import type { ShardEvents, StatusUpdate, UpdateVoiceState } from './types.js' @@ -83,10 +82,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate logger.debug(`[Gateway] Preparing buckets for concurrency: ${i}`) gateway.buckets.set(i, { workers: [], - leak: createLeakyBucket({ - max: 1, - refillAmount: 1, - // special number which is proven to be working dont change + leak: new LeakyBucket({ refillInterval: gateway.spawnShardDelay, }), }) @@ -163,7 +159,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate }, events: options.events, requestIdentify: async () => { - await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire(1) + await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire() }, }) @@ -188,7 +184,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate async requestIdentify(shardId: number) { logger.debug(`[Gateway] requesting identify`) - await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire(1) + await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire() }, // Helpers methods below this diff --git a/packages/utils/src/bucket.ts b/packages/utils/src/bucket.ts index 3326805b0..f8f153c4b 100644 --- a/packages/utils/src/bucket.ts +++ b/packages/utils/src/bucket.ts @@ -1,160 +1,96 @@ -import type { PickPartial } from '@discordeno/types' import { delay } from './utils.js' -/** A Leaky Bucket. - * Useful for rate limiting purposes. - * This uses `performance.now()` instead of `Date.now()` for higher accuracy. - * - * NOTE: This bucket is lazy, means it only updates when a related method is called. - */ -export interface LeakyBucket { - /** How many tokens this bucket can hold. */ +export class LeakyBucket implements LeakyBucketOptions { max: number - /** Amount of tokens gained per interval. - * If bigger than `max` it will be pressed to `max`. - */ - refillAmount: number - /** Interval at which the bucket gains tokens. */ refillInterval: number + refillAmount: number - /** Acquire tokens from the bucket. - * Resolves when the tokens are acquired and available. - * @param {boolean} [highPriority=false] Whether this acquire is should be done asap. - */ - acquire: (amount: number, highPriority?: boolean) => Promise + /** The amount of requests that have been used up already. */ + used: number = 0 + /** The queue of requests to acquire an available request. */ + queue: Array<(value: void | PromiseLike) => void> = [] + /** Whether or not the queue is already processing. */ + processing: boolean = false + /** The timeout id for the timer to reduce the used amount by the refill amount. */ + timeoutId?: NodeJS.Timeout + /** The timestamp in milliseconds when the next refill is scheduled. */ + refillsAt?: number - /** Returns the number of milliseconds until the next refill. */ - nextRefill: () => number - - /** Current tokens in the bucket. */ - tokens: () => number - - /** @private Internal track of when the last refill of tokens was. - * DO NOT TOUCH THIS! Unless you know what you are doing ofc :P - */ - lastRefill: number - - /** @private Internal state of whether currently it is allowed to acquire tokens. - * DO NOT TOUCH THIS! Unless you know what you are doing ofc :P - */ - allowAcquire: boolean - - /** @private Internal number of currently available tokens. - * DO NOT TOUCH THIS! Unless you know what you are doing ofc :P - */ - tokensState: number - - /** @private Internal array of promises necessary to guarantee no race conditions. - * DO NOT TOUCH THIS! Unless you know what you are doing ofc :P - */ - waiting: Array<(_?: unknown) => void> -} - -export function createLeakyBucket({ - max, - refillInterval, - refillAmount, - tokens, - waiting, - ...rest -}: Omit, 'tokens'> & { - /** Current tokens in the bucket. - * @default max - */ - tokens?: number -}): LeakyBucket { - return { - max, - refillInterval, - refillAmount: refillAmount > max ? max : refillAmount, - lastRefill: performance.now(), - allowAcquire: true, - - nextRefill: function () { - return nextRefill(this) - }, - - tokens: function () { - return updateTokens(this) - }, - - acquire: async function (amount, highPriority) { - return await acquire(this, amount, highPriority) - }, - - tokensState: tokens ?? max, - waiting: waiting ?? [], - - ...rest, + constructor(options?: LeakyBucketOptions) { + this.max = options?.max ?? 1 + this.refillAmount = options?.refillAmount ? options.refillAmount > this.max ? this.max : options.refillAmount : 1 + this.refillInterval = options?.refillInterval ?? 5000 } -} -/** Update the tokens of that bucket. - * @returns {number} The amount of current available tokens. - */ -export function updateTokens(bucket: LeakyBucket): number { - const timePassed = performance.now() - bucket.lastRefill - const missedRefills = Math.floor(timePassed / bucket.refillInterval) + /** The amount of requests that still remain. */ + get remaining(): number { + return this.max < this.used ? 0 : this.max - this.used + } - // The refill shall not exceed the max amount of tokens. - bucket.tokensState = Math.min(bucket.tokensState + bucket.refillAmount * missedRefills, bucket.max) - bucket.lastRefill += bucket.refillInterval * missedRefills + /** Begin processing the queue. */ + async processQueue(): Promise { + // There is already a queue that is processing + if (this.processing) return - return bucket.tokensState -} + // Begin going through the queue. + while (this.queue.length) { + if (this.remaining) { + // Resolves the promise allowing the paused execution of this request to resolve and continue. + this.queue.shift()?.() + // A request can be made + this.used++ -export function nextRefill(bucket: LeakyBucket): number { - // Since this bucket is lazy update the tokens before calculating the next refill. - updateTokens(bucket) - - return bucket.refillInterval - performance.now() + bucket.lastRefill -} - -export async function acquire(bucket: LeakyBucket, amount: number, highPriority = false): Promise { - // To prevent the race condition of 2 acquires happening at once, - // check whether its currently allowed to acquire. - if (!bucket.allowAcquire) { - // create, push, and wait until the current running acquiring is finished. - await new Promise((resolve) => { - if (highPriority) { - bucket.waiting.unshift(resolve) - } else { - bucket.waiting.push(resolve) + // Create a new timeout for this request if none exists. + if (!this.timeoutId) { + this.timeoutId = setTimeout(() => { + // Lower the used amount by the refill amount + this.used -= this.refillAmount + // Reset the refillsAt timestamp since it just got refilled + this.refillsAt = undefined + }, this.refillInterval) + // Set the time for when this refill will occur. + this.refillsAt = Date.now() + this.refillInterval + } } - }) - // Somehow another acquire has started, - // so need to wait again. - if (!bucket.allowAcquire) { - return await acquire(bucket, amount) + // Check if a refill is scheduled, since we have used up all available requests + if (this.refillsAt) { + const now = Date.now() + // If there is time left until next refill, just delay execution. + if (this.refillsAt > now) { + await delay(this.refillsAt - now) + } + } } } - bucket.allowAcquire = false - // Since the bucket is lazy update the tokens now, - // and also get the current amount of available tokens - const currentTokens = updateTokens(bucket) - - // It's possible that more than available tokens have been acquired, - // so calculate the amount of milliseconds to wait until this acquire is good to go. - if (currentTokens < amount) { - const tokensNeeded = amount - currentTokens - const refillsNeeded = Math.ceil(tokensNeeded / bucket.refillAmount) - - const waitTime = bucket.refillInterval * refillsNeeded - await delay(waitTime) - - // Update the tokens again to ensure nothing has been missed. - updateTokens(bucket) + /** Pauses the execution until the request is available to be made. */ + async acquire(highPriority?: boolean): Promise { + return await new Promise((resolve) => { + // High priority requests get added to the start of the queue + if (highPriority) this.queue.unshift(resolve) + // All other requests get pushed to the end. + else this.queue.push(resolve) + // Each request should trigger the queue to be processesd. + void this.processQueue() + }) } - - // In order to not subtract too much from the tokens, - // calculate what is actually needed to subtract. - const toSubtract = amount % bucket.refillAmount ?? amount - bucket.tokensState -= toSubtract - - // Allow the next acquire to happen. - bucket.allowAcquire = true - // If there is an acquire waiting, let it continue. - bucket.waiting.shift()?.() +} + +export interface LeakyBucketOptions { + /** + * Max requests allowed at once. + * @default 1 + */ + max?: number + /** + * Interval in milliseconds between refills. + * @default 5000 + */ + refillInterval?: number + /** + * Amount of requests to refill at each interval. + * @default 1 + */ + refillAmount?: number } diff --git a/packages/utils/tests/bucket.spec.ts b/packages/utils/tests/bucket.spec.ts index 1a833f1fc..6275306df 100644 --- a/packages/utils/tests/bucket.spec.ts +++ b/packages/utils/tests/bucket.spec.ts @@ -1,8 +1,7 @@ import { expect } from 'chai' import { afterEach, beforeEach, describe, it } from 'mocha' import sinon from 'sinon' -import * as bucketJs from '../src/bucket.js' -import { createLeakyBucket, updateTokens } from '../src/bucket.js' +import { LeakyBucket } from '../src/bucket.js' describe('bucket.ts', () => { let clock: sinon.SinonFakeTimers @@ -16,7 +15,7 @@ describe('bucket.ts', () => { clock.restore() }) - describe('createLeakyBucket function', () => { + describe('LeakyBucket function', () => { it('will return bucket with given options', () => { const options = { max: 6006, @@ -27,13 +26,10 @@ describe('bucket.ts', () => { thing: 'else', }, } - const bucket = createLeakyBucket(options) + const bucket = new LeakyBucket(options) expect(bucket.max).to.equal(options.max) expect(bucket.refillInterval).to.equal(options.refillInterval) expect(bucket.refillAmount).to.equal(options.refillAmount) - expect(bucket.tokensState).to.equal(options.tokens) - // @ts-expect-error - expect(bucket.someThingElse).to.equal(options.someThingElse) }) it('will return bucket with refillAmount within max', () => { @@ -43,7 +39,7 @@ describe('bucket.ts', () => { refillAmount: 3003, tokens: 4004, } - const bucket = createLeakyBucket(options) + const bucket = new LeakyBucket(options) expect(bucket.refillAmount).to.equal(options.max) }) @@ -54,88 +50,17 @@ describe('bucket.ts', () => { refillAmount: 3003, tokens: 4004, } - const bucket = createLeakyBucket(options) + const bucket = new LeakyBucket(options) expect(bucket.refillAmount).to.equal(options.max) }) it('will return bucket with default property', () => { - const bucket = createLeakyBucket({ + const bucket = new LeakyBucket({ max: 111, refillInterval: 2002, refillAmount: 3003, - tokens: 4004, }) - expect(bucket.lastRefill).to.equal(Date.now()) - expect(bucket.allowAcquire).to.equal(true) - expect(bucket.waiting).to.deep.equal([]) - }) - - it.skip('will call nextRefill with itself when called nextRefill', () => { - sinon.stub(bucketJs, 'nextRefill') - }) - - it.skip('will call updateTokens with itself when called tokens', () => { - sinon.stub(bucketJs, 'nextRefill') - }) - - it.skip('will call acquire with itself when called acquire', () => { - sinon.stub(bucketJs, 'nextRefill') + expect(bucket.queue).to.deep.equal([]) }) }) - describe('updateTokens function', () => { - it('will not increase bucket token after <1 refillInterval passed', () => { - const bucket = createLeakyBucket({ - max: 10, - refillInterval: 100, - refillAmount: 1, - tokens: 0, - }) - expect(bucket.tokens()).to.equal(0) - updateTokens(bucket) - expect(bucket.tokens()).to.equal(0) - clock.tick(99) - expect(bucket.tokens()).to.equal(0) - }) - - it('will increase 5 bucket token after 5 refillInterval passed', () => { - const bucket = createLeakyBucket({ - max: 10, - refillInterval: 100, - refillAmount: 1, - tokens: 1, - }) - expect(bucket.tokens()).to.equal(1) - updateTokens(bucket) - expect(bucket.tokens()).to.equal(1) - clock.tick(599) - expect(bucket.tokens()).to.equal(6) - }) - - it('will increate lastRefill according to number of refill', () => { - const bucket = createLeakyBucket({ - max: 10, - refillInterval: 100, - refillAmount: 1, - tokens: 1, - }) - expect(bucket.lastRefill).to.equal(Date.now()) - clock.tick(699) - updateTokens(bucket) - expect(bucket.lastRefill).to.equal(Date.now() - 99) - }) - - it('will return bucket token of the bucket', () => { - const bucket = createLeakyBucket({ - max: 10, - refillInterval: 100, - refillAmount: 1, - tokens: 1, - }) - expect(updateTokens(bucket)).to.equal(1) - clock.tick(500) - expect(updateTokens(bucket)).to.equal(6) - }) - }) - describe('nextRefill function', () => {}) - describe('acquire function', () => {}) })