fix global queue issue

This commit is contained in:
Skillz4Killz
2021-10-12 18:59:00 +00:00
committed by GitHub
parent 7474d7886f
commit f3b818491a
5 changed files with 185 additions and 132 deletions
+15 -2
View File
@@ -66,6 +66,7 @@ import { GatewayPayload } from "./types/gateway/gateway_payload.ts";
import { delay, validateSlashOptionChoices, validateSlashOptions } from "./util/utils.ts"; import { delay, validateSlashOptionChoices, validateSlashOptions } from "./util/utils.ts";
import { iconBigintToHash, iconHashToBigInt } from "./util/hash.ts"; import { iconBigintToHash, iconHashToBigInt } from "./util/hash.ts";
import { validateLength } from "./util/validate_length.ts"; import { validateLength } from "./util/validate_length.ts";
import { processGlobalQueue } from "./rest/process_global_queue.ts";
export async function createBot(options: CreateBotOptions) { export async function createBot(options: CreateBotOptions) {
return { return {
@@ -181,6 +182,7 @@ export interface CreateRestManagerOptions {
createRequestBody?: typeof createRequestBody; createRequestBody?: typeof createRequestBody;
runMethod?: typeof runMethod; runMethod?: typeof runMethod;
simplifyUrl?: typeof simplifyUrl; simplifyUrl?: typeof simplifyUrl;
processGlobalQueue?: typeof processGlobalQueue;
} }
export function createRestManager(options: CreateRestManagerOptions) { export function createRestManager(options: CreateRestManagerOptions) {
@@ -192,13 +194,23 @@ export function createRestManager(options: CreateRestManagerOptions) {
pathQueues: new Map< pathQueues: new Map<
string, string,
{ {
isWaiting: boolean;
requests: {
request: RestRequest; request: RestRequest;
payload: RestPayload; payload: RestPayload;
}[] }[];
}
>(), >(),
processingQueue: false, processingQueue: false,
processingRateLimitedPaths: false, processingRateLimitedPaths: false,
globallyRateLimited: false, globallyRateLimited: false,
globalQueue: [] as {
request: RestRequest;
payload: RestPayload;
basicURL: string;
urlToUse: string;
}[],
globalQueueProcessing: false,
ratelimitedPaths: new Map<string, RestRateLimitedPath>(), ratelimitedPaths: new Map<string, RestRateLimitedPath>(),
debug: options.debug || function (_text: string) {}, debug: options.debug || function (_text: string) {},
checkRateLimits: options.checkRateLimits || checkRateLimits, checkRateLimits: options.checkRateLimits || checkRateLimits,
@@ -210,6 +222,7 @@ export function createRestManager(options: CreateRestManagerOptions) {
createRequestBody: options.createRequestBody || createRequestBody, createRequestBody: options.createRequestBody || createRequestBody,
runMethod: options.runMethod || runMethod, runMethod: options.runMethod || runMethod,
simplifyUrl: options.simplifyUrl || simplifyUrl, simplifyUrl: options.simplifyUrl || simplifyUrl,
processGlobalQueue: options.processGlobalQueue || processGlobalQueue,
}; };
} }
@@ -252,7 +265,7 @@ export function createUtils(options: Partial<HelperUtils>) {
higherRolePosition, higherRolePosition,
validateLength, validateLength,
validateSlashOptions, validateSlashOptions,
validateSlashOptionChoices validateSlashOptionChoices,
}; };
} }
+134
View File
@@ -0,0 +1,134 @@
import { RestManager } from "../bot.ts";
import { DiscordHTTPResponseCodes } from "../types/codes/http_response_codes.ts";
export async function processGlobalQueue(rest: RestManager) {
// IF QUEUE IS EMPTY EXIT
if (!rest.globalQueue.length) return;
// IF QUEUE IS ALREADY RUNNING EXIT
if (rest.globalQueueProcessing) return;
// SET AS TRUE SO OTHER QUEUES DON'T START
rest.globalQueueProcessing = true;
while (rest.globalQueue.length) {
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (rest.globallyRateLimited) {
setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] Globally rate limited, running setTimeout.`);
rest.processGlobalQueue(rest);
}, 1000);
// BREAK WHILE LOOP
break;
}
const request = rest.globalQueue[0];
try {
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.debug(`[REST - fetching] ${JSON.stringify(request.payload)}`);
const response = await fetch(request.urlToUse, rest.createRequestBody(rest, request));
rest.debug(`[REST - fetched] ${JSON.stringify(request.payload)}`);
const bucketIdFromHeaders = rest.processRequestHeaders(rest, request.basicURL, response.headers);
// SET THE BUCKET Id IF IT WAS PRESENT
if (bucketIdFromHeaders) {
request.payload.bucketId = bucketIdFromHeaders;
}
if (response.status < 200 || response.status >= 400) {
rest.debug(
`[REST - httpError] Payload: ${JSON.stringify(request.payload)} | Response: ${JSON.stringify(response)}`
);
let error = "REQUEST_UNKNOWN_ERROR";
switch (response.status) {
case DiscordHTTPResponseCodes.BadRequest:
error = "The request was improperly formatted, or the server couldn't understand it.";
break;
case DiscordHTTPResponseCodes.Unauthorized:
error = "The Authorization header was missing or invalid.";
break;
case DiscordHTTPResponseCodes.Forbidden:
error = "The Authorization token you passed did not have permission to the resource.";
break;
case DiscordHTTPResponseCodes.NotFound:
error = "The resource at the location specified doesn't exist.";
break;
case DiscordHTTPResponseCodes.MethodNotAllowed:
error = "The HTTP method used is not valid for the location specified.";
break;
case DiscordHTTPResponseCodes.GatewayUnavailable:
error = "There was not a gateway available to process your request. Wait a bit and retry.";
break;
}
// If NOT rate limited remove from queue
if (response.status !== 429) {
request.request.reject(new Error(`[${response.status}] ${error}`));
rest.globalQueue.shift();
} else {
if (request.payload.retryCount++ >= rest.maxRetryCount) {
rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(request.payload)}`);
request.request.reject(
new Error(`[${response.status}] The request was rate limited and it maxed out the retries limit.`)
);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
rest.globalQueue.shift();
continue;
}
}
continue;
}
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) {
rest.debug(`[REST - FetchSuccess] ${JSON.stringify(request.payload)}`);
// REMOVE FROM QUEUE
rest.globalQueue.shift();
request.request.respond({ status: 204 });
} else {
// CONVERT THE RESPONSE TO JSON
const json = await response.json();
// IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY
// if (json.retry_after || json.message === "You are being rate limited.") {
// // IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT.
// if (request.payload.retryCount >= rest.maxRetryCount) {
// rest.eventHandlers.retriesMaxed(request.payload);
// request.request.respond({
// status: 200,
// body: JSON.stringify({
// error: "The request was rate limited and it maxed out the retries limit.",
// }),
// });
// // REMOVE ITEM FROM QUEUE TO PREVENT RETRY
// rest.globalQueue.shift();
// continue;
// }
// // SINCE IT WAS RATELIMITE, RETRY AGAIN
// continue;
// }
rest.debug(`[REST - fetchSuccess] ${JSON.stringify(request.payload)}`);
// REMOVE FROM QUEUE
rest.globalQueue.shift();
request.request.respond({
status: 200,
body: JSON.stringify(json),
});
}
} catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
rest.debug(`[REST - fetchFailed] Payload: ${JSON.stringify(request.payload)} | Error: ${error}`);
request.request.reject(error);
// REMOVE FROM QUEUE
rest.globalQueue.shift();
}
}
// ALLOW OTHER QUEUES TO START WHEN NEW REQUEST IS MADE
rest.globalQueueProcessing = true;
}
+20 -118
View File
@@ -1,25 +1,14 @@
import { RestManager } from "../bot.ts"; import { RestManager } from "../bot.ts";
import { DiscordHTTPResponseCodes } from "../types/codes/http_response_codes.ts";
import { delay } from "../util/utils.ts";
/** Processes the queue by looping over each path separately until the queues are empty. */ /** Processes the queue by looping over each path separately until the queues are empty. */
export async function processQueue(rest: RestManager, id: string) { export function processQueue(rest: RestManager, id: string) {
const queue = rest.pathQueues.get(id); const queue = rest.pathQueues.get(id);
if (!queue) return; if (!queue) return;
while (queue.length) { while (queue.requests.length) {
rest.debug(`[REST - processQueue] Running while loop.`); rest.debug(`[REST - processQueue] Running while loop.`);
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (rest.globallyRateLimited) {
setTimeout(async () => {
rest.debug(`[REST - processQueue] Running setTimeout.`);
await processQueue(rest, id);
}, 1000);
break;
}
// SELECT THE FIRST ITEM FROM THIS QUEUE // SELECT THE FIRST ITEM FROM THIS QUEUE
const [queuedRequest] = queue; const queuedRequest = queue.requests[0];
// IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT. // IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) return; if (!queuedRequest) return;
@@ -28,9 +17,18 @@ export async function processQueue(rest: RestManager, id: string) {
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = rest.checkRateLimits(rest, basicURL); const urlResetIn = rest.checkRateLimits(rest, basicURL);
if (urlResetIn) { if (urlResetIn) {
// PAUSE FOR THIS SPECIFC REQUEST // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING
await delay(urlResetIn); if (!queue.isWaiting) {
continue; queue.isWaiting = true;
setTimeout(() => {
rest.debug(`[REST - processQueue] rate limited, running setTimeout.`);
rest.processQueue(rest, id);
}, 1000);
}
// BREAK WHILE LOOP
break;
} }
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
@@ -60,109 +58,13 @@ export async function processQueue(rest: RestManager, id: string) {
: queuedRequest.request.url; : queuedRequest.request.url;
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.debug(`[REST - fetching] ${JSON.stringify(queuedRequest.payload)}`); rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`);
rest.globalQueue.push({
try { ...queuedRequest,
const response = await fetch(urlToUse, rest.createRequestBody(rest, queuedRequest)); basicURL,
rest.debug(`[REST - fetched] ${JSON.stringify(queuedRequest.payload)}`); urlToUse,
const bucketIdFromHeaders = rest.processRequestHeaders(rest, basicURL, response.headers);
// SET THE BUCKET Id IF IT WAS PRESENT
if (bucketIdFromHeaders) {
queuedRequest.payload.bucketId = bucketIdFromHeaders;
}
if (response.status < 200 || response.status >= 400) {
rest.debug(
`[REST - httpError] Payload: ${JSON.stringify(queuedRequest.payload)} | Response: ${JSON.stringify(response)}`
);
let error = "REQUEST_UNKNOWN_ERROR";
switch (response.status) {
case DiscordHTTPResponseCodes.BadRequest:
error = "The request was improperly formatted, or the server couldn't understand it.";
break;
case DiscordHTTPResponseCodes.Unauthorized:
error = "The Authorization header was missing or invalid.";
break;
case DiscordHTTPResponseCodes.Forbidden:
error = "The Authorization token you passed did not have permission to the resource.";
break;
case DiscordHTTPResponseCodes.NotFound:
error = "The resource at the location specified doesn't exist.";
break;
case DiscordHTTPResponseCodes.MethodNotAllowed:
error = "The HTTP method used is not valid for the location specified.";
break;
case DiscordHTTPResponseCodes.GatewayUnavailable:
error = "There was not a gateway available to process your request. Wait a bit and retry.";
break;
}
// If Rate limited should not remove from queue
if (response.status !== 429) {
queuedRequest.request.reject(new Error(`[${response.status}] ${error}`));
queue.shift();
} else {
if (queuedRequest.payload.retryCount++ >= rest.maxRetryCount) {
rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(queuedRequest.payload)}`);
queuedRequest.request.reject(
new Error(`[${response.status}] The request was rate limited and it maxed out the retries limit.`)
);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
queue.shift();
continue;
}
}
continue;
}
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) {
rest.debug(`[REST - FetchSuccess] ${JSON.stringify(queuedRequest.payload)}`);
// REMOVE FROM QUEUE
queue.shift();
queuedRequest.request.respond({ status: 204 });
} else {
// CONVERT THE RESPONSE TO JSON
const json = await response.json();
// IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY
// if (json.retry_after || json.message === "You are being rate limited.") {
// // IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT.
// if (queuedRequest.payload.retryCount >= rest.maxRetryCount) {
// rest.eventHandlers.retriesMaxed(queuedRequest.payload);
// queuedRequest.request.respond({
// status: 200,
// body: JSON.stringify({
// error: "The request was rate limited and it maxed out the retries limit.",
// }),
// });
// // REMOVE ITEM FROM QUEUE TO PREVENT RETRY
// queue.shift();
// continue;
// }
// // SINCE IT WAS RATELIMITE, RETRY AGAIN
// continue;
// }
rest.debug(`[REST - fetchSuccess] ${JSON.stringify(queuedRequest.payload)}`);
// REMOVE FROM QUEUE
queue.shift();
queuedRequest.request.respond({
status: 200,
body: JSON.stringify(json),
}); });
} }
} catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
rest.debug(`[REST - fetchFailed] Payload: ${JSON.stringify(queuedRequest.payload)} | Error: ${error}`);
queuedRequest.request.reject(error);
// REMOVE FROM QUEUE
queue.shift();
}
}
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
rest.cleanupQueues(rest); rest.cleanupQueues(rest);
+8 -5
View File
@@ -3,7 +3,7 @@ import { BASE_URL } from "../util/constants.ts";
import { RestPayload, RestRequest } from "./rest.ts"; import { RestPayload, RestRequest } from "./rest.ts";
/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */ /** Processes a request and assigns it to a queue or creates a queue if none exists for it. */
export async function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) { export function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) {
const route = request.url.substring(request.url.indexOf("api/")); const route = request.url.substring(request.url.indexOf("api/"));
const parts = route.split("/"); const parts = route.split("/");
// REMOVE THE API // REMOVE THE API
@@ -20,15 +20,18 @@ export async function processRequest(rest: RestManager, request: RestRequest, pa
const queue = rest.pathQueues.get(url); const queue = rest.pathQueues.get(url);
// IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE // IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE
if (queue) { if (queue) {
queue.push({ request, payload }); queue.requests.push({ request, payload });
} else { } else {
// CREATES A NEW QUEUE // CREATES A NEW QUEUE
rest.pathQueues.set(url, [ rest.pathQueues.set(url, {
isWaiting: false,
requests: [
{ {
request, request,
payload, payload,
}, },
]); ],
await rest.processQueue(url); });
rest.processQueue(rest, url);
} }
} }
+1
View File
@@ -39,6 +39,7 @@ export async function runMethod<T = any>(
// No proxy so we need to handle all rate limiting and such // No proxy so we need to handle all rate limiting and such
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
rest.processRequest( rest.processRequest(
rest,
{ {
url, url,
method, method,