fix: new leaky bucket system

This commit is contained in:
Skillz4Killz
2023-03-12 16:14:08 +00:00
parent b0c482b880
commit bb85624842
4 changed files with 104 additions and 243 deletions

View File

@@ -6,10 +6,10 @@ import type {
DiscordHello,
DiscordMember,
DiscordReady,
RequestGuildMembers,
RequestGuildMembers
} from '@discordeno/types'
import { GatewayCloseEventCodes, GatewayIntents, GatewayOpcodes } from '@discordeno/types'
import { camelize, Collection, createLeakyBucket, delay, logger } from '@discordeno/utils'
import { camelize, Collection, delay, LeakyBucket, logger } from '@discordeno/utils'
import { inflateSync } from 'node:zlib'
import WebSocket from 'ws'
import type { RequestMemberRequest } from './manager.js'
@@ -44,11 +44,7 @@ export class DiscordenoShard {
/** Resolve internal waiting states. Mapped by SelectedEvents => ResolveFunction */
resolves = new Map<'READY' | 'RESUMED' | 'INVALID_SESSION', (payload: DiscordGatewayPayload) => void>()
/** Shard bucket. Only access this if you know what you are doing. Bucket for handling shard request rate limits. */
bucket = createLeakyBucket({
max: 120,
refillInterval: 60000,
refillAmount: 120,
})
bucket: LeakyBucket
/** This managers cache related settings. */
cache = {
@@ -74,6 +70,12 @@ export class DiscordenoShard {
}
if (options.requestIdentify) this.requestIdentify = options.requestIdentify
this.bucket = new LeakyBucket({
max: this.calculateSafeRequests(),
refillAmount: this.calculateSafeRequests(),
refillInterval: 60000,
})
}
/** The gateway configuration which is used to connect to Discord. */
@@ -265,7 +267,7 @@ export class DiscordenoShard {
// Else bucket and token wait time just get wasted.
await this.checkOffline(highPriority)
await this.bucket.acquire(1, highPriority)
await this.bucket.acquire(highPriority)
// It's possible, that the shard went offline after a token has been acquired from the bucket.
await this.checkOffline(highPriority)
@@ -376,16 +378,18 @@ export class DiscordenoShard {
this.startHeartbeating(interval)
if (this.state !== ShardState.Resuming) {
const currentQueue = [...this.bucket.queue];
// HELLO has been send on a non resume action.
// This means that the shard starts a new session,
// therefore the rate limit interval has been reset too.
this.bucket = createLeakyBucket({
this.bucket = new LeakyBucket({
max: this.calculateSafeRequests(),
refillInterval: 60000,
refillAmount: this.calculateSafeRequests(),
// Waiting acquires should not be lost on a re-identify.
waiting: this.bucket.waiting,
})
// Queue should not be lost on a re-identify.
this.bucket.queue.unshift(...currentQueue);
}
this.events.hello?.(this)

View File

@@ -1,6 +1,5 @@
import type { AtLeastOne, BigString, Camelize, DiscordGetGatewayBot, DiscordMember, RequestGuildMembers } from '@discordeno/types'
import type { LeakyBucket } from '@discordeno/utils'
import { Collection, createLeakyBucket, delay, logger } from '@discordeno/utils'
import { Collection, delay, LeakyBucket, logger } from '@discordeno/utils'
import Shard from './Shard.js'
import type { ShardEvents, StatusUpdate, UpdateVoiceState } from './types.js'
@@ -83,10 +82,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
logger.debug(`[Gateway] Preparing buckets for concurrency: ${i}`)
gateway.buckets.set(i, {
workers: [],
leak: createLeakyBucket({
max: 1,
refillAmount: 1,
// special number which is proven to be working dont change
leak: new LeakyBucket({
refillInterval: gateway.spawnShardDelay,
}),
})
@@ -163,7 +159,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
},
events: options.events,
requestIdentify: async () => {
await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire(1)
await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire()
},
})
@@ -188,7 +184,7 @@ export function createGatewayManager(options: CreateGatewayManagerOptions): Gate
async requestIdentify(shardId: number) {
logger.debug(`[Gateway] requesting identify`)
await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire(1)
await gateway.buckets.get(shardId % gateway.connection.sessionStartLimit.maxConcurrency)!.leak.acquire()
},
// Helpers methods below this

View File

@@ -1,160 +1,96 @@
import type { PickPartial } from '@discordeno/types'
import { delay } from './utils.js'
/** A Leaky Bucket.
* Useful for rate limiting purposes.
* This uses `performance.now()` instead of `Date.now()` for higher accuracy.
*
* NOTE: This bucket is lazy, means it only updates when a related method is called.
*/
export interface LeakyBucket {
/** How many tokens this bucket can hold. */
export class LeakyBucket implements LeakyBucketOptions {
max: number
/** Amount of tokens gained per interval.
* If bigger than `max` it will be pressed to `max`.
*/
refillAmount: number
/** Interval at which the bucket gains tokens. */
refillInterval: number
refillAmount: number
/** Acquire tokens from the bucket.
* Resolves when the tokens are acquired and available.
* @param {boolean} [highPriority=false] Whether this acquire is should be done asap.
*/
acquire: (amount: number, highPriority?: boolean) => Promise<void>
/** The amount of requests that have been used up already. */
used: number = 0
/** The queue of requests to acquire an available request. */
queue: Array<(value: void | PromiseLike<void>) => void> = []
/** Whether or not the queue is already processing. */
processing: boolean = false
/** The timeout id for the timer to reduce the used amount by the refill amount. */
timeoutId?: NodeJS.Timeout
/** The timestamp in milliseconds when the next refill is scheduled. */
refillsAt?: number
/** Returns the number of milliseconds until the next refill. */
nextRefill: () => number
/** Current tokens in the bucket. */
tokens: () => number
/** @private Internal track of when the last refill of tokens was.
* DO NOT TOUCH THIS! Unless you know what you are doing ofc :P
*/
lastRefill: number
/** @private Internal state of whether currently it is allowed to acquire tokens.
* DO NOT TOUCH THIS! Unless you know what you are doing ofc :P
*/
allowAcquire: boolean
/** @private Internal number of currently available tokens.
* DO NOT TOUCH THIS! Unless you know what you are doing ofc :P
*/
tokensState: number
/** @private Internal array of promises necessary to guarantee no race conditions.
* DO NOT TOUCH THIS! Unless you know what you are doing ofc :P
*/
waiting: Array<(_?: unknown) => void>
}
export function createLeakyBucket({
max,
refillInterval,
refillAmount,
tokens,
waiting,
...rest
}: Omit<PickPartial<LeakyBucket, 'max' | 'refillInterval' | 'refillAmount'>, 'tokens'> & {
/** Current tokens in the bucket.
* @default max
*/
tokens?: number
}): LeakyBucket {
return {
max,
refillInterval,
refillAmount: refillAmount > max ? max : refillAmount,
lastRefill: performance.now(),
allowAcquire: true,
nextRefill: function () {
return nextRefill(this)
},
tokens: function () {
return updateTokens(this)
},
acquire: async function (amount, highPriority) {
return await acquire(this, amount, highPriority)
},
tokensState: tokens ?? max,
waiting: waiting ?? [],
...rest,
constructor(options?: LeakyBucketOptions) {
this.max = options?.max ?? 1
this.refillAmount = options?.refillAmount ? options.refillAmount > this.max ? this.max : options.refillAmount : 1
this.refillInterval = options?.refillInterval ?? 5000
}
}
/** Update the tokens of that bucket.
* @returns {number} The amount of current available tokens.
*/
export function updateTokens(bucket: LeakyBucket): number {
const timePassed = performance.now() - bucket.lastRefill
const missedRefills = Math.floor(timePassed / bucket.refillInterval)
/** The amount of requests that still remain. */
get remaining(): number {
return this.max < this.used ? 0 : this.max - this.used
}
// The refill shall not exceed the max amount of tokens.
bucket.tokensState = Math.min(bucket.tokensState + bucket.refillAmount * missedRefills, bucket.max)
bucket.lastRefill += bucket.refillInterval * missedRefills
/** Begin processing the queue. */
async processQueue(): Promise<void> {
// There is already a queue that is processing
if (this.processing) return
return bucket.tokensState
}
// Begin going through the queue.
while (this.queue.length) {
if (this.remaining) {
// Resolves the promise allowing the paused execution of this request to resolve and continue.
this.queue.shift()?.()
// A request can be made
this.used++
export function nextRefill(bucket: LeakyBucket): number {
// Since this bucket is lazy update the tokens before calculating the next refill.
updateTokens(bucket)
return bucket.refillInterval - performance.now() + bucket.lastRefill
}
export async function acquire(bucket: LeakyBucket, amount: number, highPriority = false): Promise<void> {
// To prevent the race condition of 2 acquires happening at once,
// check whether its currently allowed to acquire.
if (!bucket.allowAcquire) {
// create, push, and wait until the current running acquiring is finished.
await new Promise((resolve) => {
if (highPriority) {
bucket.waiting.unshift(resolve)
} else {
bucket.waiting.push(resolve)
// Create a new timeout for this request if none exists.
if (!this.timeoutId) {
this.timeoutId = setTimeout(() => {
// Lower the used amount by the refill amount
this.used -= this.refillAmount
// Reset the refillsAt timestamp since it just got refilled
this.refillsAt = undefined
}, this.refillInterval)
// Set the time for when this refill will occur.
this.refillsAt = Date.now() + this.refillInterval
}
}
})
// Somehow another acquire has started,
// so need to wait again.
if (!bucket.allowAcquire) {
return await acquire(bucket, amount)
// Check if a refill is scheduled, since we have used up all available requests
if (this.refillsAt) {
const now = Date.now()
// If there is time left until next refill, just delay execution.
if (this.refillsAt > now) {
await delay(this.refillsAt - now)
}
}
}
}
bucket.allowAcquire = false
// Since the bucket is lazy update the tokens now,
// and also get the current amount of available tokens
const currentTokens = updateTokens(bucket)
// It's possible that more than available tokens have been acquired,
// so calculate the amount of milliseconds to wait until this acquire is good to go.
if (currentTokens < amount) {
const tokensNeeded = amount - currentTokens
const refillsNeeded = Math.ceil(tokensNeeded / bucket.refillAmount)
const waitTime = bucket.refillInterval * refillsNeeded
await delay(waitTime)
// Update the tokens again to ensure nothing has been missed.
updateTokens(bucket)
/** Pauses the execution until the request is available to be made. */
async acquire(highPriority?: boolean): Promise<void> {
return await new Promise((resolve) => {
// High priority requests get added to the start of the queue
if (highPriority) this.queue.unshift(resolve)
// All other requests get pushed to the end.
else this.queue.push(resolve)
// Each request should trigger the queue to be processesd.
void this.processQueue()
})
}
// In order to not subtract too much from the tokens,
// calculate what is actually needed to subtract.
const toSubtract = amount % bucket.refillAmount ?? amount
bucket.tokensState -= toSubtract
// Allow the next acquire to happen.
bucket.allowAcquire = true
// If there is an acquire waiting, let it continue.
bucket.waiting.shift()?.()
}
export interface LeakyBucketOptions {
/**
* Max requests allowed at once.
* @default 1
*/
max?: number
/**
* Interval in milliseconds between refills.
* @default 5000
*/
refillInterval?: number
/**
* Amount of requests to refill at each interval.
* @default 1
*/
refillAmount?: number
}

View File

@@ -1,8 +1,7 @@
import { expect } from 'chai'
import { afterEach, beforeEach, describe, it } from 'mocha'
import sinon from 'sinon'
import * as bucketJs from '../src/bucket.js'
import { createLeakyBucket, updateTokens } from '../src/bucket.js'
import { LeakyBucket } from '../src/bucket.js'
describe('bucket.ts', () => {
let clock: sinon.SinonFakeTimers
@@ -16,7 +15,7 @@ describe('bucket.ts', () => {
clock.restore()
})
describe('createLeakyBucket function', () => {
describe('LeakyBucket function', () => {
it('will return bucket with given options', () => {
const options = {
max: 6006,
@@ -27,13 +26,10 @@ describe('bucket.ts', () => {
thing: 'else',
},
}
const bucket = createLeakyBucket(options)
const bucket = new LeakyBucket(options)
expect(bucket.max).to.equal(options.max)
expect(bucket.refillInterval).to.equal(options.refillInterval)
expect(bucket.refillAmount).to.equal(options.refillAmount)
expect(bucket.tokensState).to.equal(options.tokens)
// @ts-expect-error
expect(bucket.someThingElse).to.equal(options.someThingElse)
})
it('will return bucket with refillAmount within max', () => {
@@ -43,7 +39,7 @@ describe('bucket.ts', () => {
refillAmount: 3003,
tokens: 4004,
}
const bucket = createLeakyBucket(options)
const bucket = new LeakyBucket(options)
expect(bucket.refillAmount).to.equal(options.max)
})
@@ -54,88 +50,17 @@ describe('bucket.ts', () => {
refillAmount: 3003,
tokens: 4004,
}
const bucket = createLeakyBucket(options)
const bucket = new LeakyBucket(options)
expect(bucket.refillAmount).to.equal(options.max)
})
it('will return bucket with default property', () => {
const bucket = createLeakyBucket({
const bucket = new LeakyBucket({
max: 111,
refillInterval: 2002,
refillAmount: 3003,
tokens: 4004,
})
expect(bucket.lastRefill).to.equal(Date.now())
expect(bucket.allowAcquire).to.equal(true)
expect(bucket.waiting).to.deep.equal([])
})
it.skip('will call nextRefill with itself when called nextRefill', () => {
sinon.stub(bucketJs, 'nextRefill')
})
it.skip('will call updateTokens with itself when called tokens', () => {
sinon.stub(bucketJs, 'nextRefill')
})
it.skip('will call acquire with itself when called acquire', () => {
sinon.stub(bucketJs, 'nextRefill')
expect(bucket.queue).to.deep.equal([])
})
})
describe('updateTokens function', () => {
it('will not increase bucket token after <1 refillInterval passed', () => {
const bucket = createLeakyBucket({
max: 10,
refillInterval: 100,
refillAmount: 1,
tokens: 0,
})
expect(bucket.tokens()).to.equal(0)
updateTokens(bucket)
expect(bucket.tokens()).to.equal(0)
clock.tick(99)
expect(bucket.tokens()).to.equal(0)
})
it('will increase 5 bucket token after 5 refillInterval passed', () => {
const bucket = createLeakyBucket({
max: 10,
refillInterval: 100,
refillAmount: 1,
tokens: 1,
})
expect(bucket.tokens()).to.equal(1)
updateTokens(bucket)
expect(bucket.tokens()).to.equal(1)
clock.tick(599)
expect(bucket.tokens()).to.equal(6)
})
it('will increate lastRefill according to number of refill', () => {
const bucket = createLeakyBucket({
max: 10,
refillInterval: 100,
refillAmount: 1,
tokens: 1,
})
expect(bucket.lastRefill).to.equal(Date.now())
clock.tick(699)
updateTokens(bucket)
expect(bucket.lastRefill).to.equal(Date.now() - 99)
})
it('will return bucket token of the bucket', () => {
const bucket = createLeakyBucket({
max: 10,
refillInterval: 100,
refillAmount: 1,
tokens: 1,
})
expect(updateTokens(bucket)).to.equal(1)
clock.tick(500)
expect(updateTokens(bucket)).to.equal(6)
})
})
describe('nextRefill function', () => {})
describe('acquire function', () => {})
})