diff --git a/src/bot.ts b/src/bot.ts index 37d9cd229..897e14cc0 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -66,6 +66,7 @@ import { GatewayPayload } from "./types/gateway/gateway_payload.ts"; import { delay, validateSlashOptionChoices, validateSlashOptions } from "./util/utils.ts"; import { iconBigintToHash, iconHashToBigInt } from "./util/hash.ts"; import { validateLength } from "./util/validate_length.ts"; +import { processGlobalQueue } from "./rest/process_global_queue.ts"; export async function createBot(options: CreateBotOptions) { return { @@ -181,6 +182,7 @@ export interface CreateRestManagerOptions { createRequestBody?: typeof createRequestBody; runMethod?: typeof runMethod; simplifyUrl?: typeof simplifyUrl; + processGlobalQueue?: typeof processGlobalQueue; } export function createRestManager(options: CreateRestManagerOptions) { @@ -192,13 +194,23 @@ export function createRestManager(options: CreateRestManagerOptions) { pathQueues: new Map< string, { - request: RestRequest; - payload: RestPayload; - }[] + isWaiting: boolean; + requests: { + request: RestRequest; + payload: RestPayload; + }[]; + } >(), processingQueue: false, processingRateLimitedPaths: false, globallyRateLimited: false, + globalQueue: [] as { + request: RestRequest; + payload: RestPayload; + basicURL: string; + urlToUse: string; + }[], + globalQueueProcessing: false, ratelimitedPaths: new Map(), debug: options.debug || function (_text: string) {}, checkRateLimits: options.checkRateLimits || checkRateLimits, @@ -210,6 +222,7 @@ export function createRestManager(options: CreateRestManagerOptions) { createRequestBody: options.createRequestBody || createRequestBody, runMethod: options.runMethod || runMethod, simplifyUrl: options.simplifyUrl || simplifyUrl, + processGlobalQueue: options.processGlobalQueue || processGlobalQueue, }; } @@ -252,7 +265,7 @@ export function createUtils(options: Partial) { higherRolePosition, validateLength, validateSlashOptions, - validateSlashOptionChoices + validateSlashOptionChoices, }; } diff --git a/src/rest/process_global_queue.ts b/src/rest/process_global_queue.ts new file mode 100644 index 000000000..1a33f26ed --- /dev/null +++ b/src/rest/process_global_queue.ts @@ -0,0 +1,134 @@ +import { RestManager } from "../bot.ts"; +import { DiscordHTTPResponseCodes } from "../types/codes/http_response_codes.ts"; + +export async function processGlobalQueue(rest: RestManager) { + // IF QUEUE IS EMPTY EXIT + if (!rest.globalQueue.length) return; + // IF QUEUE IS ALREADY RUNNING EXIT + if (rest.globalQueueProcessing) return; + + // SET AS TRUE SO OTHER QUEUES DON'T START + rest.globalQueueProcessing = true; + + while (rest.globalQueue.length) { + // IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN + if (rest.globallyRateLimited) { + setTimeout(() => { + rest.debug(`[REST - processGlobalQueue] Globally rate limited, running setTimeout.`); + rest.processGlobalQueue(rest); + }, 1000); + + // BREAK WHILE LOOP + break; + } + + const request = rest.globalQueue[0]; + + try { + // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE + rest.debug(`[REST - fetching] ${JSON.stringify(request.payload)}`); + + const response = await fetch(request.urlToUse, rest.createRequestBody(rest, request)); + rest.debug(`[REST - fetched] ${JSON.stringify(request.payload)}`); + + const bucketIdFromHeaders = rest.processRequestHeaders(rest, request.basicURL, response.headers); + // SET THE BUCKET Id IF IT WAS PRESENT + if (bucketIdFromHeaders) { + request.payload.bucketId = bucketIdFromHeaders; + } + + if (response.status < 200 || response.status >= 400) { + rest.debug( + `[REST - httpError] Payload: ${JSON.stringify(request.payload)} | Response: ${JSON.stringify(response)}` + ); + + let error = "REQUEST_UNKNOWN_ERROR"; + switch (response.status) { + case DiscordHTTPResponseCodes.BadRequest: + error = "The request was improperly formatted, or the server couldn't understand it."; + break; + case DiscordHTTPResponseCodes.Unauthorized: + error = "The Authorization header was missing or invalid."; + break; + case DiscordHTTPResponseCodes.Forbidden: + error = "The Authorization token you passed did not have permission to the resource."; + break; + case DiscordHTTPResponseCodes.NotFound: + error = "The resource at the location specified doesn't exist."; + break; + case DiscordHTTPResponseCodes.MethodNotAllowed: + error = "The HTTP method used is not valid for the location specified."; + break; + case DiscordHTTPResponseCodes.GatewayUnavailable: + error = "There was not a gateway available to process your request. Wait a bit and retry."; + break; + } + + // If NOT rate limited remove from queue + if (response.status !== 429) { + request.request.reject(new Error(`[${response.status}] ${error}`)); + rest.globalQueue.shift(); + } else { + if (request.payload.retryCount++ >= rest.maxRetryCount) { + rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(request.payload)}`); + request.request.reject( + new Error(`[${response.status}] The request was rate limited and it maxed out the retries limit.`) + ); + // REMOVE ITEM FROM QUEUE TO PREVENT RETRY + rest.globalQueue.shift(); + continue; + } + } + + continue; + } + + // SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON + if (response.status === 204) { + rest.debug(`[REST - FetchSuccess] ${JSON.stringify(request.payload)}`); + // REMOVE FROM QUEUE + rest.globalQueue.shift(); + request.request.respond({ status: 204 }); + } else { + // CONVERT THE RESPONSE TO JSON + const json = await response.json(); + // IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY + // if (json.retry_after || json.message === "You are being rate limited.") { + // // IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT. + // if (request.payload.retryCount >= rest.maxRetryCount) { + // rest.eventHandlers.retriesMaxed(request.payload); + // request.request.respond({ + // status: 200, + // body: JSON.stringify({ + // error: "The request was rate limited and it maxed out the retries limit.", + // }), + // }); + // // REMOVE ITEM FROM QUEUE TO PREVENT RETRY + // rest.globalQueue.shift(); + // continue; + // } + + // // SINCE IT WAS RATELIMITE, RETRY AGAIN + // continue; + // } + + rest.debug(`[REST - fetchSuccess] ${JSON.stringify(request.payload)}`); + // REMOVE FROM QUEUE + rest.globalQueue.shift(); + request.request.respond({ + status: 200, + body: JSON.stringify(json), + }); + } + } catch (error) { + // SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR + rest.debug(`[REST - fetchFailed] Payload: ${JSON.stringify(request.payload)} | Error: ${error}`); + request.request.reject(error); + // REMOVE FROM QUEUE + rest.globalQueue.shift(); + } + } + + // ALLOW OTHER QUEUES TO START WHEN NEW REQUEST IS MADE + rest.globalQueueProcessing = true; +} diff --git a/src/rest/process_queue.ts b/src/rest/process_queue.ts index d0e153540..bad357984 100644 --- a/src/rest/process_queue.ts +++ b/src/rest/process_queue.ts @@ -1,25 +1,14 @@ import { RestManager } from "../bot.ts"; -import { DiscordHTTPResponseCodes } from "../types/codes/http_response_codes.ts"; -import { delay } from "../util/utils.ts"; /** Processes the queue by looping over each path separately until the queues are empty. */ -export async function processQueue(rest: RestManager, id: string) { +export function processQueue(rest: RestManager, id: string) { const queue = rest.pathQueues.get(id); if (!queue) return; - while (queue.length) { + while (queue.requests.length) { rest.debug(`[REST - processQueue] Running while loop.`); - // IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN - if (rest.globallyRateLimited) { - setTimeout(async () => { - rest.debug(`[REST - processQueue] Running setTimeout.`); - await processQueue(rest, id); - }, 1000); - - break; - } // SELECT THE FIRST ITEM FROM THIS QUEUE - const [queuedRequest] = queue; + const queuedRequest = queue.requests[0]; // IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT. if (!queuedRequest) return; @@ -28,9 +17,18 @@ export async function processQueue(rest: RestManager, id: string) { // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN const urlResetIn = rest.checkRateLimits(rest, basicURL); if (urlResetIn) { - // PAUSE FOR THIS SPECIFC REQUEST - await delay(urlResetIn); - continue; + // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING + if (!queue.isWaiting) { + queue.isWaiting = true; + + setTimeout(() => { + rest.debug(`[REST - processQueue] rate limited, running setTimeout.`); + rest.processQueue(rest, id); + }, 1000); + } + + // BREAK WHILE LOOP + break; } // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS @@ -60,108 +58,12 @@ export async function processQueue(rest: RestManager, id: string) { : queuedRequest.request.url; // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE - rest.debug(`[REST - fetching] ${JSON.stringify(queuedRequest.payload)}`); - - try { - const response = await fetch(urlToUse, rest.createRequestBody(rest, queuedRequest)); - rest.debug(`[REST - fetched] ${JSON.stringify(queuedRequest.payload)}`); - - const bucketIdFromHeaders = rest.processRequestHeaders(rest, basicURL, response.headers); - // SET THE BUCKET Id IF IT WAS PRESENT - if (bucketIdFromHeaders) { - queuedRequest.payload.bucketId = bucketIdFromHeaders; - } - - if (response.status < 200 || response.status >= 400) { - rest.debug( - `[REST - httpError] Payload: ${JSON.stringify(queuedRequest.payload)} | Response: ${JSON.stringify(response)}` - ); - - let error = "REQUEST_UNKNOWN_ERROR"; - switch (response.status) { - case DiscordHTTPResponseCodes.BadRequest: - error = "The request was improperly formatted, or the server couldn't understand it."; - break; - case DiscordHTTPResponseCodes.Unauthorized: - error = "The Authorization header was missing or invalid."; - break; - case DiscordHTTPResponseCodes.Forbidden: - error = "The Authorization token you passed did not have permission to the resource."; - break; - case DiscordHTTPResponseCodes.NotFound: - error = "The resource at the location specified doesn't exist."; - break; - case DiscordHTTPResponseCodes.MethodNotAllowed: - error = "The HTTP method used is not valid for the location specified."; - break; - case DiscordHTTPResponseCodes.GatewayUnavailable: - error = "There was not a gateway available to process your request. Wait a bit and retry."; - break; - } - - // If Rate limited should not remove from queue - if (response.status !== 429) { - queuedRequest.request.reject(new Error(`[${response.status}] ${error}`)); - queue.shift(); - } else { - if (queuedRequest.payload.retryCount++ >= rest.maxRetryCount) { - rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(queuedRequest.payload)}`); - queuedRequest.request.reject( - new Error(`[${response.status}] The request was rate limited and it maxed out the retries limit.`) - ); - // REMOVE ITEM FROM QUEUE TO PREVENT RETRY - queue.shift(); - continue; - } - } - - continue; - } - - // SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON - if (response.status === 204) { - rest.debug(`[REST - FetchSuccess] ${JSON.stringify(queuedRequest.payload)}`); - // REMOVE FROM QUEUE - queue.shift(); - queuedRequest.request.respond({ status: 204 }); - } else { - // CONVERT THE RESPONSE TO JSON - const json = await response.json(); - // IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY - // if (json.retry_after || json.message === "You are being rate limited.") { - // // IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT. - // if (queuedRequest.payload.retryCount >= rest.maxRetryCount) { - // rest.eventHandlers.retriesMaxed(queuedRequest.payload); - // queuedRequest.request.respond({ - // status: 200, - // body: JSON.stringify({ - // error: "The request was rate limited and it maxed out the retries limit.", - // }), - // }); - // // REMOVE ITEM FROM QUEUE TO PREVENT RETRY - // queue.shift(); - // continue; - // } - - // // SINCE IT WAS RATELIMITE, RETRY AGAIN - // continue; - // } - - rest.debug(`[REST - fetchSuccess] ${JSON.stringify(queuedRequest.payload)}`); - // REMOVE FROM QUEUE - queue.shift(); - queuedRequest.request.respond({ - status: 200, - body: JSON.stringify(json), - }); - } - } catch (error) { - // SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR - rest.debug(`[REST - fetchFailed] Payload: ${JSON.stringify(queuedRequest.payload)} | Error: ${error}`); - queuedRequest.request.reject(error); - // REMOVE FROM QUEUE - queue.shift(); - } + rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`); + rest.globalQueue.push({ + ...queuedRequest, + basicURL, + urlToUse, + }); } // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP diff --git a/src/rest/process_request.ts b/src/rest/process_request.ts index d202e34e0..183916efc 100644 --- a/src/rest/process_request.ts +++ b/src/rest/process_request.ts @@ -3,7 +3,7 @@ import { BASE_URL } from "../util/constants.ts"; import { RestPayload, RestRequest } from "./rest.ts"; /** Processes a request and assigns it to a queue or creates a queue if none exists for it. */ -export async function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) { +export function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) { const route = request.url.substring(request.url.indexOf("api/")); const parts = route.split("/"); // REMOVE THE API @@ -20,15 +20,18 @@ export async function processRequest(rest: RestManager, request: RestRequest, pa const queue = rest.pathQueues.get(url); // IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE if (queue) { - queue.push({ request, payload }); + queue.requests.push({ request, payload }); } else { // CREATES A NEW QUEUE - rest.pathQueues.set(url, [ - { - request, - payload, - }, - ]); - await rest.processQueue(url); + rest.pathQueues.set(url, { + isWaiting: false, + requests: [ + { + request, + payload, + }, + ], + }); + rest.processQueue(rest, url); } } diff --git a/src/rest/run_method.ts b/src/rest/run_method.ts index 0d796b2c0..77b25d84b 100644 --- a/src/rest/run_method.ts +++ b/src/rest/run_method.ts @@ -39,6 +39,7 @@ export async function runMethod( // No proxy so we need to handle all rate limiting and such return new Promise((resolve, reject) => { rest.processRequest( + rest, { url, method,