From 76ea7d9a67a1a9e84d67c6f3008acdf8d2d0d494 Mon Sep 17 00:00:00 2001 From: Skillz4Killz <23035000+Skillz4Killz@users.noreply.github.com> Date: Mon, 14 Nov 2022 12:17:29 -0600 Subject: [PATCH] Bucking Rest (#2588) * Pending changes exported from your codespace * fix: more testing needed * fix: try fix * fix: global shared scope erro * fix: cleanup console logs --- .vscode/settings.json | 1 + debug.rest.ts | 138 ++++++++++++++++++++ gateway/shard/connect.ts | 2 +- gateway/shard/types.ts | 1 - rest/cleanupQueues.ts | 27 +++- rest/createInvalidRequestBucket.ts | 147 +++++++++++++++++++++ rest/createQueueBucket.ts | 199 +++++++++++++++++++++++++++++ rest/processGlobalQueue.ts | 107 ++++------------ rest/processQueue.ts | 95 +++++++------- rest/processRequest.ts | 21 ++- rest/processRequestHeaders.ts | 10 ++ rest/restManager.ts | 32 ++--- rest/sendRequest.ts | 31 ++--- tests/rest.ts | 5 +- 14 files changed, 634 insertions(+), 182 deletions(-) create mode 100644 debug.rest.ts create mode 100644 rest/createInvalidRequestBucket.ts create mode 100644 rest/createQueueBucket.ts diff --git a/.vscode/settings.json b/.vscode/settings.json index 65b53778a..724449b8a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -20,6 +20,7 @@ "dnt.ts", "mod.ts", "debug.ts", + "debug.rest.ts", "bot.ts" ], "editor.defaultFormatter": "denoland.vscode-deno", diff --git a/debug.rest.ts b/debug.rest.ts new file mode 100644 index 000000000..1ea7a5937 --- /dev/null +++ b/debug.rest.ts @@ -0,0 +1,138 @@ +// START FILE FOR REST PROCESS +import { config as dotenv } from "https://deno.land/x/dotenv@v3.2.0/mod.ts"; + +import { BASE_URL, Collection, createRestManager } from "./mod.ts"; + +dotenv({ export: true, path: `${Deno.cwd()}/.env` }); + +const col = new Collection(); + +const token = Deno.env.get("GAMER_TOKEN"); +if (!token) throw new Error("Token was not provided."); + +const REST_AUTHORIZATION_KEY = Deno.env.get("PROXY_REST_SECRET"); +const PROXY_REST_URL = Deno.env.get("PROXY_REST_URL"); +const REST_PORT = Number(PROXY_REST_URL?.substring(PROXY_REST_URL.lastIndexOf(":") + 1)) ?? 8080; + +// CREATES THE FUNCTIONALITY FOR MANAGING THE REST REQUESTS +const rest = createRestManager({ + token, + secretKey: REST_AUTHORIZATION_KEY, + customUrl: PROXY_REST_URL, + debug(text) { + if (text.startsWith("[REST - RequestCreate]")) { + const aaa = text.split(" "); + const method = aaa[4]; + const url = aaa[7]; + + col.set(method + url, Date.now()); + + // console.log("[DEBUG]", method, url); + } + + if (text.startsWith("[REST - processGlobalQueue] rate limited, running setTimeout.")) { + console.log("[POSSIBLE BUCKET ISSUE]"); + } + }, + fetching(options) { + // console.log("[FETCHING]", options.method, options.url, Date.now() - col.get(options.method + options.url)!); + }, +}); + +// START LISTENING TO THE URL(localhost) +const server = Deno.listen({ port: REST_PORT }); +console.log( + `HTTP webserver running. Access it at: ${PROXY_REST_URL}`, +); + +// Connections to the server will be yielded up as an async iterable. +for await (const conn of server) { + // In order to not be blocking, we need to handle each connection individually + // in its own async function. + handleRequest(conn); +} + +async function handleRequest(conn: Deno.Conn) { + // This "upgrades" a network connection into an HTTP connection. + const httpConn = Deno.serveHttp(conn); + // Each request sent over the HTTP connection will be yielded as an async + // iterator from the HTTP connection. + for await (const requestEvent of httpConn) { + if ( + !REST_AUTHORIZATION_KEY || + REST_AUTHORIZATION_KEY !== + requestEvent.request.headers.get("AUTHORIZATION") + ) { + return requestEvent.respondWith( + new Response(JSON.stringify({ error: "Invalid authorization key." }), { + status: 401, + }), + ); + } + + try { + const text = await requestEvent.request.text(); + const json = text ? JSON.parse(text) : undefined; + + if (json?.file) { + json.file = await Promise.all(json.file.map(async (f: any) => ({ + name: f.name, + blob: await (await fetch(f.blob)).blob(), + }))); + } + + const result = await rest.runMethod( + rest, + requestEvent.request.method as RequestMethod, + `${BASE_URL}${ + requestEvent.request.url.substring( + `http://localhost:${REST_PORT}`.length, + ) + }`, + json, + ); + + if (result) { + requestEvent.respondWith( + new Response(JSON.stringify(result), { + status: 200, + }), + ); + } else { + requestEvent.respondWith( + new Response(undefined, { + status: 204, + }), + ); + } + } catch (error) { + console.log( + "CATCH", + requestEvent.request.url, + requestEvent.request.method, + requestEvent.request.body, + error.code, + error, + ); + requestEvent.respondWith( + new Response( + JSON.stringify({ + message: error.message, + }), + { + status: error.code ?? 469, + }, + ), + ); + } + } +} + +type RequestMethod = "POST" | "PUT" | "DELETE" | "PATCH"; + +// // @ts-ignore +// rest.convertRestError = (errorStack, data) => { +// return data; +// }; + +// console.log(`Giveaway Boat REST Started At: ${new Date().toUTCString()}`); diff --git a/gateway/shard/connect.ts b/gateway/shard/connect.ts index a5bdf81a6..a9333abd5 100644 --- a/gateway/shard/connect.ts +++ b/gateway/shard/connect.ts @@ -18,7 +18,7 @@ export async function connect(shard: Shard): Promise { url.searchParams.set("encoding", "json"); } - const socket = new WebSocket(url); + const socket = new WebSocket(url.toString()); shard.socket = socket; diff --git a/gateway/shard/types.ts b/gateway/shard/types.ts index a9e326ded..4b9b1f97b 100644 --- a/gateway/shard/types.ts +++ b/gateway/shard/types.ts @@ -1,7 +1,6 @@ import { StatusUpdate } from "../../helpers/misc/editShardStatus.ts"; import { DiscordGatewayPayload } from "../../types/discord.ts"; import { GatewayOpcodes } from "../../types/shared.ts"; -import { LeakyBucket } from "../../util/bucket.ts"; import { createShard } from "./createShard.ts"; // TODO: think whether we also need an identifiedShard function diff --git a/rest/cleanupQueues.ts b/rest/cleanupQueues.ts index e4bee919c..709511ab3 100644 --- a/rest/cleanupQueues.ts +++ b/rest/cleanupQueues.ts @@ -1,14 +1,35 @@ +import { QueueBucket } from "./createQueueBucket.ts"; import { RestManager } from "./restManager.ts"; /** Cleans up the queues by checking if there is nothing left and removing it. */ export function cleanupQueues(rest: RestManager) { for (const [key, queue] of rest.pathQueues) { - rest.debug(`[REST - cleanupQueues] Running for of loop. ${key}`); - if (queue.requests.length) continue; + // rest.debug(`[REST - cleanupQueues] Running for of loop. ${key}`); + if (!isQueueClearable(queue)) continue; + // REMOVE IT FROM CACHE - rest.pathQueues.delete(key); + setTimeout(() => { + clearQueue(rest, key, queue); + }, 5000); } // NO QUEUE LEFT, DISABLE THE QUEUE if (!rest.pathQueues.size) rest.processingQueue = false; } + +export function clearQueue(rest: RestManager, key: string, queue: QueueBucket) { + if (!isQueueClearable(queue)) return; + + rest.pathQueues.delete(key); +} + +export function isQueueClearable(queue: QueueBucket) { + if (queue.firstRequest) return false; + if (queue.waiting.length) return false; + if (queue.pending.length) return false; + if (!queue.interval) return false; + if (queue.processing) return false; + if (queue.processingPending) return false; + + return true; +} diff --git a/rest/createInvalidRequestBucket.ts b/rest/createInvalidRequestBucket.ts new file mode 100644 index 000000000..340d6d8aa --- /dev/null +++ b/rest/createInvalidRequestBucket.ts @@ -0,0 +1,147 @@ +import { delay } from "../mod.ts"; + +/** + * A invalid request bucket is used in a similar manner as a leaky bucket but a invalid request bucket can be refilled as needed. + * It's purpose is to make sure the bot does not hit the limit to getting a 1 hr ban. + * + * @param options The options used to configure this bucket. + * @returns RefillingBucket + */ +export function createInvalidRequestBucket(options: InvalidRequestBucketOptions): InvalidRequestBucket { + const bucket: InvalidRequestBucket = { + current: options.current ?? 0, + max: options.max ?? 10000, + interval: options.interval ?? 600000, + timeoutId: options.timeoutId ?? 0, + safety: options.safety ?? 1, + frozenAt: options.frozenAt ?? 0, + errorStatuses: options.errorStatuses ?? [401, 403, 429], + requested: options.requested ?? 0, + processing: false, + + waiting: [], + + requestsAllowed: function () { + return bucket.max - bucket.current - bucket.requested - bucket.safety; + }, + + isRequestAllowed: function () { + return bucket.requestsAllowed() > 0; + }, + + waitUntilRequestAvailable: async function () { + return new Promise(async (resolve) => { + // If whatever amount of requests is left is more than the safety margin, allow the request + if (bucket.isRequestAllowed()) { + bucket.requested++; + resolve(); + } else { + bucket.waiting.push(resolve); + await bucket.processWaiting(); + } + }); + }, + + processWaiting: async function () { + // If already processing, that loop will handle all waiting requests. + if (bucket.processing) { + return; + } + + // Mark as processing so other loops don't start + bucket.processing = true; + + while (bucket.waiting.length) { + if (bucket.isRequestAllowed()) { + bucket.requested++; + // Resolve the next item in the queue + bucket.waiting.shift()?.(); + } else { + await delay(1000); + } + } + + // Mark as false so next pending request can be triggered by new loop. + bucket.processing = false; + }, + + handleCompletedRequest: function (code, sharedScope) { + // Since request is complete, we can remove one from requested. + bucket.requested--; + // Since it is as a valid request, we don't need to do anything + if (!bucket.errorStatuses.includes(code)) return; + // Shared scope is not considered invalid + if (code === 429 && sharedScope) return; + + // INVALID REQUEST WAS MADE + + // If it was not frozen before, mark it frozen + if (!bucket.frozenAt) bucket.frozenAt = Date.now(); + // Mark a request has been invalid + bucket.current++; + // If a timeout was not started, start a timeout to reset this bucket + if (!bucket.timeoutId) { + bucket.timeoutId = setTimeout(() => { + bucket.frozenAt = 0; + bucket.current = 0; + bucket.timeoutId = 0; + }, bucket.frozenAt + bucket.interval); + } + }, + }; + + return bucket; +} + +export interface InvalidRequestBucketOptions { + /** current invalid amount */ + current?: number; + /** max invalid requests allowed until ban. Defaults to 10,000 */ + max?: number; + /** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */ + interval?: number; + /** timer to reset to 0 */ + timeoutId?: number; + /** how safe to be from max. Defaults to 1 */ + safety?: number; + /** when first request in this period was made */ + frozenAt?: number; + /** The request statuses that count as an invalid request. */ + errorStatuses?: number[]; + /** The amount of requests that were requested from this bucket. */ + requested?: number; +} + +export interface InvalidRequestBucket { + /** current invalid amount */ + current: number; + /** max invalid requests allowed until ban. Defaults to 10,000 */ + max: number; + /** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */ + interval: number; + /** timer to reset to 0 */ + timeoutId: number; + /** how safe to be from max. Defaults to 1 */ + safety: number; + /** when first request in this period was made */ + frozenAt: number; + /** The request statuses that count as an invalid request. */ + errorStatuses: number[]; + /** The amount of requests that were requested from this bucket. */ + requested: number; + /** The requests that are currently pending. */ + waiting: ((value: void | PromiseLike) => void)[]; + /** Whether or not the waiting queue is already processing. */ + processing: boolean; + + /** Gives the number of requests that are currently allowed. */ + requestsAllowed: () => number; + /** Checks if a request is allowed at this time. */ + isRequestAllowed: () => boolean; + /** Waits until a request is available */ + waitUntilRequestAvailable: () => Promise; + /** Begins processing the waiting queue of requests. */ + processWaiting: () => Promise; + /** Handler for whenever a request is validated. This should update the requested values or trigger any other necessary stuff. */ + handleCompletedRequest: (code: number, sharedScope: boolean) => void; +} diff --git a/rest/createQueueBucket.ts b/rest/createQueueBucket.ts new file mode 100644 index 000000000..7db5c0e15 --- /dev/null +++ b/rest/createQueueBucket.ts @@ -0,0 +1,199 @@ +import { delay } from "../util/utils.ts"; +import { RestPayload, RestRequest } from "./rest.ts"; +import { RestManager } from "./restManager.ts"; + +/** + * A queue bucket is used in a similar manner as a leaky bucket. + * + * @param options The options used to configure this bucket. + * @returns RefillingBucket + */ +export function createQueueBucket(rest: RestManager, options: QueueBucketOptions): QueueBucket { + const bucket: QueueBucket = { + remaining: options.remaining ?? 1, + max: options.max ?? 1, + interval: options.interval ?? 0, + timeoutId: options.timeoutId ?? 0, + processing: false, + processingPending: false, + firstRequest: true, + + waiting: [], + pending: [], + + isRequestAllowed: function () { + return bucket.remaining > 0; + }, + + waitUntilRequestAvailable: async function () { + return new Promise(async (resolve) => { + // If whatever amount of requests is left is more than the safety margin, allow the request + if (bucket.isRequestAllowed()) { + // bucket.remaining++; + resolve(); + } else { + bucket.waiting.push(resolve); + await bucket.processWaiting(); + } + }); + }, + + processWaiting: async function () { + // If already processing, that loop will handle all waiting requests. + if (bucket.processing) { + return; + } + + // Mark as processing so other loops don't start + bucket.processing = true; + + while (bucket.waiting.length) { + if (bucket.isRequestAllowed()) { + // Resolve the next item in the queue + bucket.waiting.shift()?.(); + } else { + await delay(1000); + } + } + + // Mark as false so next pending request can be triggered by new loop. + bucket.processing = false; + }, + + processPending: async function () { + // If already processing, that loop will handle all pending requests. + if (bucket.processingPending) { + return; + } + + // Mark as processing so other loops don't start + bucket.processingPending = true; + + while (bucket.pending.length) { + + if (bucket.firstRequest || bucket.isRequestAllowed()) { + + const [queuedRequest] = bucket.pending; + if (queuedRequest) { + const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method); + + // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN + const urlResetIn = rest.checkRateLimits(rest, basicURL); + if (urlResetIn) { + setTimeout(() => { + bucket.processPending(); + }, urlResetIn); + break; + } + + // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS + const bucketResetIn = queuedRequest.payload.bucketId + ? rest.checkRateLimits(rest, queuedRequest.payload.bucketId) + : false; + if (bucketResetIn) { + setTimeout(() => { + bucket.processPending(); + }, bucketResetIn); + break; + } + + bucket.firstRequest = false; + bucket.remaining--; + + if (!bucket.timeoutId && !bucket.remaining && bucket.interval) { + bucket.timeoutId = setTimeout(() => { + bucket.remaining = bucket.max; + bucket.timeoutId = 0; + }, bucket.interval); + } + + // Remove from queue, we are executing it. + bucket.pending.shift(); + rest.processGlobalQueue(rest, { + ...queuedRequest, + urlToUse: queuedRequest.request.url, + basicURL, + }); + } + } else { + await delay(1000); + } + } + + // Mark as false so next pending request can be triggered by new loop. + bucket.processingPending = false; + rest.cleanupQueues(rest); + }, + + handleCompletedRequest: function (headers) { + bucket.max = headers.max; + bucket.interval = headers.interval; + bucket.remaining = headers.remaining; + + if (bucket.remaining <= 1) { + bucket.timeoutId = setTimeout(() => { + bucket.remaining = bucket.max; + bucket.timeoutId = 0; + }, headers.interval); + } + }, + + makeRequest: async function (options: BucketRequest) { + await bucket.waitUntilRequestAvailable(); + bucket.pending.push(options); + bucket.processPending(); + }, + }; + + return bucket; +} + +export interface QueueBucketOptions { + /** How many requests are remaining. Defaults to 1 */ + remaining?: number; + /** Max number of requests allowed in this bucket. Defaults to 1. */ + max?: number; + /** The time in milliseconds that discord allows to make the max number of invalid requests. Defaults to 0 */ + interval?: number; + /** timer to reset to 0 */ + timeoutId?: number; +} + +export interface QueueBucket { + /** Amount of requests that have are remaining. Defaults to 1. */ + remaining: number; + /** Max requests for this bucket. Defaults to 1. */ + max: number; + /** The time that discord allows to make the max number of requests. Defaults to 0 */ + interval: number; + /** timer to reset to 0 */ + timeoutId: number; + /** The requests that are currently pending. */ + waiting: ((value: void | PromiseLike) => void)[]; + /** The requests that are currently pending. */ + pending: BucketRequest[]; + /** Whether or not the waiting queue is already processing. */ + processing: boolean; + /** Whether or not the pending queue is already processing. */ + processingPending: boolean; + /** Whether the first request is pending. */ + firstRequest: boolean; + + /** Checks if a request is allowed at this time. */ + isRequestAllowed: () => boolean; + /** Waits until a request is available */ + waitUntilRequestAvailable: () => Promise; + /** Begins processing the waiting queue of requests. */ + processWaiting: () => Promise; + /** Begins processing the pending queue of requests. */ + processPending: () => Promise; + /** Handler for whenever a request is validated. This should update the requested values or trigger any other necessary stuff. */ + handleCompletedRequest: (headers: { remaining: number; interval: number; max: number }) => void; + /** Adds a request to the queue. */ + makeRequest: (options: BucketRequest) => Promise; +} + +export interface BucketRequest { + request: RestRequest; + payload: RestPayload; +} diff --git a/rest/processGlobalQueue.ts b/rest/processGlobalQueue.ts index 2d89f9ec6..c31af0c5b 100644 --- a/rest/processGlobalQueue.ts +++ b/rest/processGlobalQueue.ts @@ -1,85 +1,34 @@ +import { RestRequest,RestPayload } from "./rest.ts"; import { RestManager } from "./restManager.ts"; -import { HTTPResponseCodes } from "../types/shared.ts"; -export async function processGlobalQueue(rest: RestManager) { - // IF QUEUE IS EMPTY EXIT - if (!rest.globalQueue.length) return; - // IF QUEUE IS ALREADY RUNNING EXIT - if (rest.globalQueueProcessing) return; +export async function processGlobalQueue(rest: RestManager, request: { + request: RestRequest; + payload: RestPayload; + basicURL: string; + urlToUse: string; +}) { + // Check if this request is able to be made globally + await rest.invalidBucket.waitUntilRequestAvailable(); - // SET AS TRUE SO OTHER QUEUES DON'T START - rest.globalQueueProcessing = true; + // Check if this request is able to be made for it's specific bucket + // await rest.buckets.get() - while (rest.globalQueue.length) { - // IF THE BOT IS GLOBALLY RATE LIMITED TRY AGAIN - if (rest.globallyRateLimited) { - setTimeout(() => { - rest.debug(`[REST - processGlobalQueue] Globally rate limited, running setTimeout.`); - rest.processGlobalQueue(rest); - }, 1000); - - // BREAK WHILE LOOP - break; - } - - if (rest.invalidRequests === rest.maxInvalidRequests - rest.invalidRequestsSafetyAmount) { - setTimeout(() => { - const time = rest.invalidRequestsInterval - (Date.now() - rest.invalidRequestFrozenAt); - rest.debug( - `[REST - processGlobalQueue] Freeze global queue because of invalid requests. Time Remaining: ${ - time / 1000 - } seconds.`, - ); - rest.processGlobalQueue(rest); - }, 1000); - - // BREAK WHILE LOOP - break; - } - - const request = rest.globalQueue.shift(); - // REMOVES ANY POTENTIAL INVALID CONFLICTS - if (!request) continue; - - // CHECK RATE LIMITS FOR 429 REPEATS - // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN - const urlResetIn = rest.checkRateLimits(rest, request.basicURL); - // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS - const bucketResetIn = request.payload.bucketId ? rest.checkRateLimits(rest, request.payload.bucketId) : false; - - if (urlResetIn || bucketResetIn) { - // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING - setTimeout(() => { - rest.debug(`[REST - processGlobalQueue] rate limited, running setTimeout.`); - // THIS REST IS RATE LIMITED, SO PUSH BACK TO START - rest.globalQueue.unshift(request); - // START QUEUE IF NOT STARTED - rest.processGlobalQueue(rest); - }, urlResetIn || (bucketResetIn as number)); - - continue; - } - - await rest.sendRequest(rest, { - url: request.urlToUse, + await rest.sendRequest(rest, { + url: request.urlToUse, + method: request.request.method, + bucketId: request.payload.bucketId, + reject: request.request.reject, + respond: request.request.respond, + retryRequest: function () { + rest.processGlobalQueue(rest, request); + }, + retryCount: request.payload.retryCount ?? 0, + payload: rest.createRequestBody(rest, { method: request.request.method, - bucketId: request.payload.bucketId, - reject: request.request.reject, - respond: request.request.respond, - retryRequest: function () { - rest.globalQueue.unshift(request); - }, - retryCount: request.payload.retryCount ?? 0, - payload: rest.createRequestBody(rest, { - method: request.request.method, - body: request.payload.body, - url: request.urlToUse, - }), - }) - // Should be handled in sendRequest, this catch just prevents bots from dying - .catch(() => null); - } - - // ALLOW OTHER QUEUES TO START WHEN NEW REQUEST IS MADE - rest.globalQueueProcessing = false; + body: request.payload.body, + url: request.urlToUse, + }), + }) + // Should be handled in sendRequest, this catch just prevents bots from dying + .catch(() => null); } diff --git a/rest/processQueue.ts b/rest/processQueue.ts index c58d36151..944f3dc5e 100644 --- a/rest/processQueue.ts +++ b/rest/processQueue.ts @@ -2,56 +2,65 @@ import { RestManager } from "./restManager.ts"; /** Processes the queue by looping over each path separately until the queues are empty. */ export function processQueue(rest: RestManager, id: string) { - const queue = rest.pathQueues.get(id); - if (!queue) return; + // const queue = rest.pathQueues.get(id); + // if (!queue) return; - while (queue.requests.length) { - rest.debug(`[REST - processQueue] Running while loop.`); - // SELECT THE FIRST ITEM FROM THIS QUEUE - const queuedRequest = queue.requests[0]; - // IF THIS DOESN'T HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT. - if (!queuedRequest) break; + // while (queue.requests.length) { + // rest.debug(`[REST - processQueue] Running while loop.`); + // if (rest.globallyRateLimited) { + // rest.debug(`[REST - processQueue] Globally rate limited.`); + // continue; + // } - const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method); + // // SELECT THE FIRST ITEM FROM THIS QUEUE + // const queuedRequest = queue.requests[0]; + // // IF THIS DOESN'T HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT. + // if (!queuedRequest) break; - // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN - const urlResetIn = rest.checkRateLimits(rest, basicURL); - if (urlResetIn) { - // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING - if (!queue.isWaiting) { - queue.isWaiting = true; + // const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method); - setTimeout(() => { - queue.isWaiting = false; + // // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN + // const urlResetIn = rest.checkRateLimits(rest, basicURL); + // if (urlResetIn) { + // // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING + // if (!queue.isWaiting) { + // queue.isWaiting = true; - rest.debug(`[REST - processQueue] rate limited, running setTimeout.`); - rest.processQueue(rest, id); - }, urlResetIn); - } + // setTimeout(() => { + // queue.isWaiting = false; - // BREAK WHILE LOOP - break; - } + // rest.debug(`[REST - processQueue] rate limited, running setTimeout.`); + // rest.processQueue(rest, id); + // }, urlResetIn); + // } - // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS - const bucketResetIn = queuedRequest.payload.bucketId - ? rest.checkRateLimits(rest, queuedRequest.payload.bucketId) - : false; - // THIS BUCKET IS STILL RATE LIMITED, RE-ADD TO QUEUE - if (bucketResetIn) continue; - // EXECUTE THE REQUEST + // // BREAK WHILE LOOP + // break; + // } - // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE - rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`); - rest.globalQueue.push({ - ...queuedRequest, - urlToUse: queuedRequest.request.url, - basicURL, - }); - rest.processGlobalQueue(rest); - queue.requests.shift(); - } + // // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS + // const bucketResetIn = queuedRequest.payload.bucketId + // ? rest.checkRateLimits(rest, queuedRequest.payload.bucketId) + // : false; + // // THIS BUCKET IS STILL RATE LIMITED, RE-ADD TO QUEUE + // if (bucketResetIn) continue; + // // EXECUTE THE REQUEST - // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP - rest.cleanupQueues(rest); + // // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE + // rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`); + // // rest.globalQueue.push({ + // // ...queuedRequest, + // // urlToUse: queuedRequest.request.url, + // // basicURL, + // // }); + // rest.processGlobalQueue(rest, { + // ...queuedRequest, + // urlToUse: queuedRequest.request.url, + // basicURL, + // }); + // queue.requests.shift(); + // } + + // // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP + // rest.cleanupQueues(rest); } diff --git a/rest/processRequest.ts b/rest/processRequest.ts index 16350743c..2dbc457fc 100644 --- a/rest/processRequest.ts +++ b/rest/processRequest.ts @@ -1,6 +1,7 @@ -import { RestManager } from "./restManager.ts"; import { BASE_URL } from "../util/constants.ts"; +import { createQueueBucket } from "./createQueueBucket.ts"; import { RestPayload, RestRequest } from "./rest.ts"; +import { RestManager } from "./restManager.ts"; /** Processes a request and assigns it to a queue or creates a queue if none exists for it. */ export function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) { @@ -19,18 +20,16 @@ export function processRequest(rest: RestManager, request: RestRequest, payload: const queue = rest.pathQueues.get(url); if (queue) { - queue.requests.push({ request, payload }); + queue.makeRequest({ request, payload }); } else { // CREATES A NEW QUEUE - rest.pathQueues.set(url, { - isWaiting: false, - requests: [ - { - request, - payload, - }, - ], + const bucketQueue = createQueueBucket(rest, {}); + // Add request to queue + bucketQueue.makeRequest({ + request, + payload, }); - rest.processQueue(rest, url); + // Save queue + rest.pathQueues.set(url, bucketQueue); } } diff --git a/rest/processRequestHeaders.ts b/rest/processRequestHeaders.ts index 3d943e27d..e35fc0e29 100644 --- a/rest/processRequestHeaders.ts +++ b/rest/processRequestHeaders.ts @@ -12,6 +12,12 @@ export function processRequestHeaders(rest: RestManager, url: string, headers: H // undefined override null needed for typings const bucketId = headers.get("x-ratelimit-bucket") || undefined; + rest.pathQueues.get(url)?.handleCompletedRequest({ + remaining: Number(remaining), + interval: Number(retryAfter) * 1000, + max: Number(headers.get("x-ratelimit-limit")), + }); + // IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED if (remaining === "0") { rateLimited = true; @@ -41,6 +47,10 @@ export function processRequestHeaders(rest: RestManager, url: string, headers: H rest.globallyRateLimited = true; rateLimited = true; + setTimeout(() => { + rest.globallyRateLimited = false; + }, globalReset); + rest.rateLimitedPaths.set("global", { url: "global", resetTimestamp: globalReset, diff --git a/rest/restManager.ts b/rest/restManager.ts index bc001db04..536ec34e0 100644 --- a/rest/restManager.ts +++ b/rest/restManager.ts @@ -3,6 +3,8 @@ import { removeTokenPrefix } from "../util/token.ts"; import { checkRateLimits } from "./checkRateLimits.ts"; import { cleanupQueues } from "./cleanupQueues.ts"; import { convertRestError } from "./convertRestError.ts"; +import { createInvalidRequestBucket } from "./createInvalidRequestBucket.ts"; +import { QueueBucket } from "./createQueueBucket.ts"; import { createRequestBody } from "./createRequestBody.ts"; import { processGlobalQueue } from "./processGlobalQueue.ts"; import { processQueue } from "./processQueue.ts"; @@ -21,35 +23,14 @@ export function createRestManager(options: CreateRestManagerOptions) { baseEndpoints.BASE_URL = `${options.customUrl}/v${version}`; } - return { - // current invalid amount - invalidRequests: 0, - // max invalid requests allowed until ban - maxInvalidRequests: 10000, - // 10 minutes - invalidRequestsInterval: 600000, - // timer to reset to 0 - invalidRequestsTimeoutId: 0, - // how safe to be from max - invalidRequestsSafetyAmount: 1, - // when first request in this period was made - invalidRequestFrozenAt: 0, - invalidRequestErrorStatuses: [401, 403, 429], + const rest = { + invalidBucket: createInvalidRequestBucket({}), version, token: removeTokenPrefix(options.token), maxRetryCount: options.maxRetryCount || 10, secretKey: options.secretKey || "discordeno_best_lib_ever", customUrl: options.customUrl || "", - pathQueues: new Map< - string, - { - isWaiting: boolean; - requests: { - request: RestRequest; - payload: RestPayload; - }[]; - } - >(), + pathQueues: new Map(), processingQueue: false, processingRateLimitedPaths: false, globallyRateLimited: false, @@ -61,6 +42,7 @@ export function createRestManager(options: CreateRestManagerOptions) { }[], globalQueueProcessing: false, rateLimitedPaths: new Map(), + debug: options.debug || function (_text: string) {}, checkRateLimits: options.checkRateLimits || checkRateLimits, cleanupQueues: options.cleanupQueues || cleanupQueues, @@ -91,6 +73,8 @@ export function createRestManager(options: CreateRestManagerOptions) { ); }, }; + + return rest; } export interface CreateRestManagerOptions { diff --git a/rest/sendRequest.ts b/rest/sendRequest.ts index 7c0f45346..0fb1416c1 100644 --- a/rest/sendRequest.ts +++ b/rest/sendRequest.ts @@ -23,6 +23,16 @@ export async function sendRequest(rest: RestManager, options: RestSendRequest // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE rest.fetching(options); + // @ts-ignore + if (options.url.startsWith(BASE_URL) && options.payload?.body) { + // @ts-ignore + options.payload.body = JSON.parse(options.payload.body); + // @ts-ignore + options.payload.body.content += ` ${Date.now()}`; + // @ts-ignore + options.payload.body = JSON.stringify(options.payload.body); + } + const response = await fetch( options.url.startsWith(BASE_URL) ? options.url : `${BASE_URL}/v${rest.version}/${options.url}`, { @@ -70,24 +80,9 @@ export async function sendRequest(rest: RestManager, options: RestSendRequest break; } - if ( - rest.invalidRequestErrorStatuses.includes(response.status) && - !(response.status === 429 && response.headers.get("X-RateLimit-Scope")) - ) { - // INCREMENT CURRENT INVALID REQUESTS - ++rest.invalidRequests; - - if (!rest.invalidRequestsTimeoutId) { - rest.invalidRequestsTimeoutId = setTimeout(() => { - rest.debug(`[REST - processGlobalQueue] Resetting invalid optionss counter in setTimeout.`); - rest.invalidRequests = 0; - rest.invalidRequestsTimeoutId = 0; - }, rest.invalidRequestsInterval); - } - } - // If NOT rate limited remove from queue if (response.status !== 429) { + rest.invalidBucket.handleCompletedRequest(response.status, false); const body = response.type ? JSON.stringify(await response.json()) : undefined; return options.reject?.({ ok: false, @@ -96,6 +91,8 @@ export async function sendRequest(rest: RestManager, options: RestSendRequest body, }); } else { + const json = await response.json(); + // TOO MANY ATTEMPTS, GET RID OF REQUEST FROM QUEUE. if (options.retryCount && options.retryCount++ >= rest.maxRetryCount) { rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(options)}`); @@ -110,7 +107,7 @@ export async function sendRequest(rest: RestManager, options: RestSendRequest return; } // RATE LIMITED, ADD BACK TO QUEUE else { - const json = await response.json(); + rest.invalidBucket.handleCompletedRequest(response.status, response.headers.get('X-RateLimit-Scope') === 'shared'); await delay(json.retry_after * 1000); return options.retryRequest?.(); } diff --git a/tests/rest.ts b/tests/rest.ts index a80047bc7..07418fdd6 100644 --- a/tests/rest.ts +++ b/tests/rest.ts @@ -4,7 +4,7 @@ import { dotenv } from "./deps.ts"; dotenv({ export: true, path: `${Deno.cwd()}/.env` }); -const token = Deno.env.get("DISCORD_TOKEN"); +const token = Deno.env.get("GAMER_TOKEN"); if (!token) throw new Error("Token was not provided."); const REST_AUTHORIZATION_KEY = Deno.env.get("PROXY_REST_SECRET"); @@ -16,13 +16,12 @@ const rest = createRestManager({ token, secretKey: REST_AUTHORIZATION_KEY, customUrl: PROXY_REST_URL, - // debug: console.log, }); // START LISTENING TO THE URL(localhost) const server = Deno.listen({ port: REST_PORT }); console.log( - `HTTP webserver running. Access it at: ${PROXY_REST_URL}`, + `Rest Proxy running. Access it at: ${PROXY_REST_URL}`, ); // Connections to the server will be yielded up as an async iterable.