Bucking Rest (#2588)

* Pending changes exported from your codespace

* fix: more testing needed

* fix: try fix

* fix: global shared scope erro

* fix: cleanup console logs
This commit is contained in:
Skillz4Killz
2022-11-14 12:17:29 -06:00
committed by GitHub
parent a385176d8a
commit 76ea7d9a67
14 changed files with 634 additions and 182 deletions

View File

@@ -20,6 +20,7 @@
"dnt.ts",
"mod.ts",
"debug.ts",
"debug.rest.ts",
"bot.ts"
],
"editor.defaultFormatter": "denoland.vscode-deno",

138
debug.rest.ts Normal file
View File

@@ -0,0 +1,138 @@
// START FILE FOR REST PROCESS
import { config as dotenv } from "https://deno.land/x/dotenv@v3.2.0/mod.ts";
import { BASE_URL, Collection, createRestManager } from "./mod.ts";
dotenv({ export: true, path: `${Deno.cwd()}/.env` });
const col = new Collection<string, number>();
const token = Deno.env.get("GAMER_TOKEN");
if (!token) throw new Error("Token was not provided.");
const REST_AUTHORIZATION_KEY = Deno.env.get("PROXY_REST_SECRET");
const PROXY_REST_URL = Deno.env.get("PROXY_REST_URL");
const REST_PORT = Number(PROXY_REST_URL?.substring(PROXY_REST_URL.lastIndexOf(":") + 1)) ?? 8080;
// CREATES THE FUNCTIONALITY FOR MANAGING THE REST REQUESTS
const rest = createRestManager({
token,
secretKey: REST_AUTHORIZATION_KEY,
customUrl: PROXY_REST_URL,
debug(text) {
if (text.startsWith("[REST - RequestCreate]")) {
const aaa = text.split(" ");
const method = aaa[4];
const url = aaa[7];
col.set(method + url, Date.now());
// console.log("[DEBUG]", method, url);
}
if (text.startsWith("[REST - processGlobalQueue] rate limited, running setTimeout.")) {
console.log("[POSSIBLE BUCKET ISSUE]");
}
},
fetching(options) {
// console.log("[FETCHING]", options.method, options.url, Date.now() - col.get(options.method + options.url)!);
},
});
// START LISTENING TO THE URL(localhost)
const server = Deno.listen({ port: REST_PORT });
console.log(
`HTTP webserver running. Access it at: ${PROXY_REST_URL}`,
);
// Connections to the server will be yielded up as an async iterable.
for await (const conn of server) {
// In order to not be blocking, we need to handle each connection individually
// in its own async function.
handleRequest(conn);
}
async function handleRequest(conn: Deno.Conn) {
// This "upgrades" a network connection into an HTTP connection.
const httpConn = Deno.serveHttp(conn);
// Each request sent over the HTTP connection will be yielded as an async
// iterator from the HTTP connection.
for await (const requestEvent of httpConn) {
if (
!REST_AUTHORIZATION_KEY ||
REST_AUTHORIZATION_KEY !==
requestEvent.request.headers.get("AUTHORIZATION")
) {
return requestEvent.respondWith(
new Response(JSON.stringify({ error: "Invalid authorization key." }), {
status: 401,
}),
);
}
try {
const text = await requestEvent.request.text();
const json = text ? JSON.parse(text) : undefined;
if (json?.file) {
json.file = await Promise.all(json.file.map(async (f: any) => ({
name: f.name,
blob: await (await fetch(f.blob)).blob(),
})));
}
const result = await rest.runMethod(
rest,
requestEvent.request.method as RequestMethod,
`${BASE_URL}${
requestEvent.request.url.substring(
`http://localhost:${REST_PORT}`.length,
)
}`,
json,
);
if (result) {
requestEvent.respondWith(
new Response(JSON.stringify(result), {
status: 200,
}),
);
} else {
requestEvent.respondWith(
new Response(undefined, {
status: 204,
}),
);
}
} catch (error) {
console.log(
"CATCH",
requestEvent.request.url,
requestEvent.request.method,
requestEvent.request.body,
error.code,
error,
);
requestEvent.respondWith(
new Response(
JSON.stringify({
message: error.message,
}),
{
status: error.code ?? 469,
},
),
);
}
}
}
type RequestMethod = "POST" | "PUT" | "DELETE" | "PATCH";
// // @ts-ignore
// rest.convertRestError = (errorStack, data) => {
// return data;
// };
// console.log(`Giveaway Boat REST Started At: ${new Date().toUTCString()}`);

View File

@@ -18,7 +18,7 @@ export async function connect(shard: Shard): Promise<void> {
url.searchParams.set("encoding", "json");
}
const socket = new WebSocket(url);
const socket = new WebSocket(url.toString());
shard.socket = socket;

View File

@@ -1,7 +1,6 @@
import { StatusUpdate } from "../../helpers/misc/editShardStatus.ts";
import { DiscordGatewayPayload } from "../../types/discord.ts";
import { GatewayOpcodes } from "../../types/shared.ts";
import { LeakyBucket } from "../../util/bucket.ts";
import { createShard } from "./createShard.ts";
// TODO: think whether we also need an identifiedShard function

View File

@@ -1,14 +1,35 @@
import { QueueBucket } from "./createQueueBucket.ts";
import { RestManager } from "./restManager.ts";
/** Cleans up the queues by checking if there is nothing left and removing it. */
export function cleanupQueues(rest: RestManager) {
for (const [key, queue] of rest.pathQueues) {
rest.debug(`[REST - cleanupQueues] Running for of loop. ${key}`);
if (queue.requests.length) continue;
// rest.debug(`[REST - cleanupQueues] Running for of loop. ${key}`);
if (!isQueueClearable(queue)) continue;
// REMOVE IT FROM CACHE
rest.pathQueues.delete(key);
setTimeout(() => {
clearQueue(rest, key, queue);
}, 5000);
}
// NO QUEUE LEFT, DISABLE THE QUEUE
if (!rest.pathQueues.size) rest.processingQueue = false;
}
export function clearQueue(rest: RestManager, key: string, queue: QueueBucket) {
if (!isQueueClearable(queue)) return;
rest.pathQueues.delete(key);
}
export function isQueueClearable(queue: QueueBucket) {
if (queue.firstRequest) return false;
if (queue.waiting.length) return false;
if (queue.pending.length) return false;
if (!queue.interval) return false;
if (queue.processing) return false;
if (queue.processingPending) return false;
return true;
}

View File

@@ -0,0 +1,147 @@
import { delay } from "../mod.ts";
/**
* A invalid request bucket is used in a similar manner as a leaky bucket but a invalid request bucket can be refilled as needed.
* It's purpose is to make sure the bot does not hit the limit to getting a 1 hr ban.
*
* @param options The options used to configure this bucket.
* @returns RefillingBucket
*/
export function createInvalidRequestBucket(options: InvalidRequestBucketOptions): InvalidRequestBucket {
const bucket: InvalidRequestBucket = {
current: options.current ?? 0,
max: options.max ?? 10000,
interval: options.interval ?? 600000,
timeoutId: options.timeoutId ?? 0,
safety: options.safety ?? 1,
frozenAt: options.frozenAt ?? 0,
errorStatuses: options.errorStatuses ?? [401, 403, 429],
requested: options.requested ?? 0,
processing: false,
waiting: [],
requestsAllowed: function () {
return bucket.max - bucket.current - bucket.requested - bucket.safety;
},
isRequestAllowed: function () {
return bucket.requestsAllowed() > 0;
},
waitUntilRequestAvailable: async function () {
return new Promise(async (resolve) => {
// If whatever amount of requests is left is more than the safety margin, allow the request
if (bucket.isRequestAllowed()) {
bucket.requested++;
resolve();
} else {
bucket.waiting.push(resolve);
await bucket.processWaiting();
}
});
},
processWaiting: async function () {
// If already processing, that loop will handle all waiting requests.
if (bucket.processing) {
return;
}
// Mark as processing so other loops don't start
bucket.processing = true;
while (bucket.waiting.length) {
if (bucket.isRequestAllowed()) {
bucket.requested++;
// Resolve the next item in the queue
bucket.waiting.shift()?.();
} else {
await delay(1000);
}
}
// Mark as false so next pending request can be triggered by new loop.
bucket.processing = false;
},
handleCompletedRequest: function (code, sharedScope) {
// Since request is complete, we can remove one from requested.
bucket.requested--;
// Since it is as a valid request, we don't need to do anything
if (!bucket.errorStatuses.includes(code)) return;
// Shared scope is not considered invalid
if (code === 429 && sharedScope) return;
// INVALID REQUEST WAS MADE
// If it was not frozen before, mark it frozen
if (!bucket.frozenAt) bucket.frozenAt = Date.now();
// Mark a request has been invalid
bucket.current++;
// If a timeout was not started, start a timeout to reset this bucket
if (!bucket.timeoutId) {
bucket.timeoutId = setTimeout(() => {
bucket.frozenAt = 0;
bucket.current = 0;
bucket.timeoutId = 0;
}, bucket.frozenAt + bucket.interval);
}
},
};
return bucket;
}
export interface InvalidRequestBucketOptions {
/** current invalid amount */
current?: number;
/** max invalid requests allowed until ban. Defaults to 10,000 */
max?: number;
/** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */
interval?: number;
/** timer to reset to 0 */
timeoutId?: number;
/** how safe to be from max. Defaults to 1 */
safety?: number;
/** when first request in this period was made */
frozenAt?: number;
/** The request statuses that count as an invalid request. */
errorStatuses?: number[];
/** The amount of requests that were requested from this bucket. */
requested?: number;
}
export interface InvalidRequestBucket {
/** current invalid amount */
current: number;
/** max invalid requests allowed until ban. Defaults to 10,000 */
max: number;
/** The time that discord allows to make the max number of invalid requests. Defaults to 10 minutes */
interval: number;
/** timer to reset to 0 */
timeoutId: number;
/** how safe to be from max. Defaults to 1 */
safety: number;
/** when first request in this period was made */
frozenAt: number;
/** The request statuses that count as an invalid request. */
errorStatuses: number[];
/** The amount of requests that were requested from this bucket. */
requested: number;
/** The requests that are currently pending. */
waiting: ((value: void | PromiseLike<void>) => void)[];
/** Whether or not the waiting queue is already processing. */
processing: boolean;
/** Gives the number of requests that are currently allowed. */
requestsAllowed: () => number;
/** Checks if a request is allowed at this time. */
isRequestAllowed: () => boolean;
/** Waits until a request is available */
waitUntilRequestAvailable: () => Promise<void>;
/** Begins processing the waiting queue of requests. */
processWaiting: () => Promise<void>;
/** Handler for whenever a request is validated. This should update the requested values or trigger any other necessary stuff. */
handleCompletedRequest: (code: number, sharedScope: boolean) => void;
}

199
rest/createQueueBucket.ts Normal file
View File

@@ -0,0 +1,199 @@
import { delay } from "../util/utils.ts";
import { RestPayload, RestRequest } from "./rest.ts";
import { RestManager } from "./restManager.ts";
/**
* A queue bucket is used in a similar manner as a leaky bucket.
*
* @param options The options used to configure this bucket.
* @returns RefillingBucket
*/
export function createQueueBucket(rest: RestManager, options: QueueBucketOptions): QueueBucket {
const bucket: QueueBucket = {
remaining: options.remaining ?? 1,
max: options.max ?? 1,
interval: options.interval ?? 0,
timeoutId: options.timeoutId ?? 0,
processing: false,
processingPending: false,
firstRequest: true,
waiting: [],
pending: [],
isRequestAllowed: function () {
return bucket.remaining > 0;
},
waitUntilRequestAvailable: async function () {
return new Promise(async (resolve) => {
// If whatever amount of requests is left is more than the safety margin, allow the request
if (bucket.isRequestAllowed()) {
// bucket.remaining++;
resolve();
} else {
bucket.waiting.push(resolve);
await bucket.processWaiting();
}
});
},
processWaiting: async function () {
// If already processing, that loop will handle all waiting requests.
if (bucket.processing) {
return;
}
// Mark as processing so other loops don't start
bucket.processing = true;
while (bucket.waiting.length) {
if (bucket.isRequestAllowed()) {
// Resolve the next item in the queue
bucket.waiting.shift()?.();
} else {
await delay(1000);
}
}
// Mark as false so next pending request can be triggered by new loop.
bucket.processing = false;
},
processPending: async function () {
// If already processing, that loop will handle all pending requests.
if (bucket.processingPending) {
return;
}
// Mark as processing so other loops don't start
bucket.processingPending = true;
while (bucket.pending.length) {
if (bucket.firstRequest || bucket.isRequestAllowed()) {
const [queuedRequest] = bucket.pending;
if (queuedRequest) {
const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method);
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = rest.checkRateLimits(rest, basicURL);
if (urlResetIn) {
setTimeout(() => {
bucket.processPending();
}, urlResetIn);
break;
}
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = queuedRequest.payload.bucketId
? rest.checkRateLimits(rest, queuedRequest.payload.bucketId)
: false;
if (bucketResetIn) {
setTimeout(() => {
bucket.processPending();
}, bucketResetIn);
break;
}
bucket.firstRequest = false;
bucket.remaining--;
if (!bucket.timeoutId && !bucket.remaining && bucket.interval) {
bucket.timeoutId = setTimeout(() => {
bucket.remaining = bucket.max;
bucket.timeoutId = 0;
}, bucket.interval);
}
// Remove from queue, we are executing it.
bucket.pending.shift();
rest.processGlobalQueue(rest, {
...queuedRequest,
urlToUse: queuedRequest.request.url,
basicURL,
});
}
} else {
await delay(1000);
}
}
// Mark as false so next pending request can be triggered by new loop.
bucket.processingPending = false;
rest.cleanupQueues(rest);
},
handleCompletedRequest: function (headers) {
bucket.max = headers.max;
bucket.interval = headers.interval;
bucket.remaining = headers.remaining;
if (bucket.remaining <= 1) {
bucket.timeoutId = setTimeout(() => {
bucket.remaining = bucket.max;
bucket.timeoutId = 0;
}, headers.interval);
}
},
makeRequest: async function (options: BucketRequest) {
await bucket.waitUntilRequestAvailable();
bucket.pending.push(options);
bucket.processPending();
},
};
return bucket;
}
export interface QueueBucketOptions {
/** How many requests are remaining. Defaults to 1 */
remaining?: number;
/** Max number of requests allowed in this bucket. Defaults to 1. */
max?: number;
/** The time in milliseconds that discord allows to make the max number of invalid requests. Defaults to 0 */
interval?: number;
/** timer to reset to 0 */
timeoutId?: number;
}
export interface QueueBucket {
/** Amount of requests that have are remaining. Defaults to 1. */
remaining: number;
/** Max requests for this bucket. Defaults to 1. */
max: number;
/** The time that discord allows to make the max number of requests. Defaults to 0 */
interval: number;
/** timer to reset to 0 */
timeoutId: number;
/** The requests that are currently pending. */
waiting: ((value: void | PromiseLike<void>) => void)[];
/** The requests that are currently pending. */
pending: BucketRequest[];
/** Whether or not the waiting queue is already processing. */
processing: boolean;
/** Whether or not the pending queue is already processing. */
processingPending: boolean;
/** Whether the first request is pending. */
firstRequest: boolean;
/** Checks if a request is allowed at this time. */
isRequestAllowed: () => boolean;
/** Waits until a request is available */
waitUntilRequestAvailable: () => Promise<void>;
/** Begins processing the waiting queue of requests. */
processWaiting: () => Promise<void>;
/** Begins processing the pending queue of requests. */
processPending: () => Promise<void>;
/** Handler for whenever a request is validated. This should update the requested values or trigger any other necessary stuff. */
handleCompletedRequest: (headers: { remaining: number; interval: number; max: number }) => void;
/** Adds a request to the queue. */
makeRequest: (options: BucketRequest) => Promise<void>;
}
export interface BucketRequest {
request: RestRequest;
payload: RestPayload;
}

View File

@@ -1,85 +1,34 @@
import { RestRequest,RestPayload } from "./rest.ts";
import { RestManager } from "./restManager.ts";
import { HTTPResponseCodes } from "../types/shared.ts";
export async function processGlobalQueue(rest: RestManager) {
// IF QUEUE IS EMPTY EXIT
if (!rest.globalQueue.length) return;
// IF QUEUE IS ALREADY RUNNING EXIT
if (rest.globalQueueProcessing) return;
export async function processGlobalQueue(rest: RestManager, request: {
request: RestRequest;
payload: RestPayload;
basicURL: string;
urlToUse: string;
}) {
// Check if this request is able to be made globally
await rest.invalidBucket.waitUntilRequestAvailable();
// SET AS TRUE SO OTHER QUEUES DON'T START
rest.globalQueueProcessing = true;
// Check if this request is able to be made for it's specific bucket
// await rest.buckets.get()
while (rest.globalQueue.length) {
// IF THE BOT IS GLOBALLY RATE LIMITED TRY AGAIN
if (rest.globallyRateLimited) {
setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] Globally rate limited, running setTimeout.`);
rest.processGlobalQueue(rest);
}, 1000);
// BREAK WHILE LOOP
break;
}
if (rest.invalidRequests === rest.maxInvalidRequests - rest.invalidRequestsSafetyAmount) {
setTimeout(() => {
const time = rest.invalidRequestsInterval - (Date.now() - rest.invalidRequestFrozenAt);
rest.debug(
`[REST - processGlobalQueue] Freeze global queue because of invalid requests. Time Remaining: ${
time / 1000
} seconds.`,
);
rest.processGlobalQueue(rest);
}, 1000);
// BREAK WHILE LOOP
break;
}
const request = rest.globalQueue.shift();
// REMOVES ANY POTENTIAL INVALID CONFLICTS
if (!request) continue;
// CHECK RATE LIMITS FOR 429 REPEATS
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = rest.checkRateLimits(rest, request.basicURL);
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = request.payload.bucketId ? rest.checkRateLimits(rest, request.payload.bucketId) : false;
if (urlResetIn || bucketResetIn) {
// ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING
setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] rate limited, running setTimeout.`);
// THIS REST IS RATE LIMITED, SO PUSH BACK TO START
rest.globalQueue.unshift(request);
// START QUEUE IF NOT STARTED
rest.processGlobalQueue(rest);
}, urlResetIn || (bucketResetIn as number));
continue;
}
await rest.sendRequest(rest, {
url: request.urlToUse,
await rest.sendRequest(rest, {
url: request.urlToUse,
method: request.request.method,
bucketId: request.payload.bucketId,
reject: request.request.reject,
respond: request.request.respond,
retryRequest: function () {
rest.processGlobalQueue(rest, request);
},
retryCount: request.payload.retryCount ?? 0,
payload: rest.createRequestBody(rest, {
method: request.request.method,
bucketId: request.payload.bucketId,
reject: request.request.reject,
respond: request.request.respond,
retryRequest: function () {
rest.globalQueue.unshift(request);
},
retryCount: request.payload.retryCount ?? 0,
payload: rest.createRequestBody(rest, {
method: request.request.method,
body: request.payload.body,
url: request.urlToUse,
}),
})
// Should be handled in sendRequest, this catch just prevents bots from dying
.catch(() => null);
}
// ALLOW OTHER QUEUES TO START WHEN NEW REQUEST IS MADE
rest.globalQueueProcessing = false;
body: request.payload.body,
url: request.urlToUse,
}),
})
// Should be handled in sendRequest, this catch just prevents bots from dying
.catch(() => null);
}

View File

@@ -2,56 +2,65 @@ import { RestManager } from "./restManager.ts";
/** Processes the queue by looping over each path separately until the queues are empty. */
export function processQueue(rest: RestManager, id: string) {
const queue = rest.pathQueues.get(id);
if (!queue) return;
// const queue = rest.pathQueues.get(id);
// if (!queue) return;
while (queue.requests.length) {
rest.debug(`[REST - processQueue] Running while loop.`);
// SELECT THE FIRST ITEM FROM THIS QUEUE
const queuedRequest = queue.requests[0];
// IF THIS DOESN'T HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) break;
// while (queue.requests.length) {
// rest.debug(`[REST - processQueue] Running while loop.`);
// if (rest.globallyRateLimited) {
// rest.debug(`[REST - processQueue] Globally rate limited.`);
// continue;
// }
const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method);
// // SELECT THE FIRST ITEM FROM THIS QUEUE
// const queuedRequest = queue.requests[0];
// // IF THIS DOESN'T HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
// if (!queuedRequest) break;
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = rest.checkRateLimits(rest, basicURL);
if (urlResetIn) {
// ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING
if (!queue.isWaiting) {
queue.isWaiting = true;
// const basicURL = rest.simplifyUrl(queuedRequest.request.url, queuedRequest.request.method);
setTimeout(() => {
queue.isWaiting = false;
// // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
// const urlResetIn = rest.checkRateLimits(rest, basicURL);
// if (urlResetIn) {
// // ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING
// if (!queue.isWaiting) {
// queue.isWaiting = true;
rest.debug(`[REST - processQueue] rate limited, running setTimeout.`);
rest.processQueue(rest, id);
}, urlResetIn);
}
// setTimeout(() => {
// queue.isWaiting = false;
// BREAK WHILE LOOP
break;
}
// rest.debug(`[REST - processQueue] rate limited, running setTimeout.`);
// rest.processQueue(rest, id);
// }, urlResetIn);
// }
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = queuedRequest.payload.bucketId
? rest.checkRateLimits(rest, queuedRequest.payload.bucketId)
: false;
// THIS BUCKET IS STILL RATE LIMITED, RE-ADD TO QUEUE
if (bucketResetIn) continue;
// EXECUTE THE REQUEST
// // BREAK WHILE LOOP
// break;
// }
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`);
rest.globalQueue.push({
...queuedRequest,
urlToUse: queuedRequest.request.url,
basicURL,
});
rest.processGlobalQueue(rest);
queue.requests.shift();
}
// // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
// const bucketResetIn = queuedRequest.payload.bucketId
// ? rest.checkRateLimits(rest, queuedRequest.payload.bucketId)
// : false;
// // THIS BUCKET IS STILL RATE LIMITED, RE-ADD TO QUEUE
// if (bucketResetIn) continue;
// // EXECUTE THE REQUEST
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
rest.cleanupQueues(rest);
// // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
// rest.debug(`[REST - Add To Global Queue] ${JSON.stringify(queuedRequest.payload)}`);
// // rest.globalQueue.push({
// // ...queuedRequest,
// // urlToUse: queuedRequest.request.url,
// // basicURL,
// // });
// rest.processGlobalQueue(rest, {
// ...queuedRequest,
// urlToUse: queuedRequest.request.url,
// basicURL,
// });
// queue.requests.shift();
// }
// // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
// rest.cleanupQueues(rest);
}

View File

@@ -1,6 +1,7 @@
import { RestManager } from "./restManager.ts";
import { BASE_URL } from "../util/constants.ts";
import { createQueueBucket } from "./createQueueBucket.ts";
import { RestPayload, RestRequest } from "./rest.ts";
import { RestManager } from "./restManager.ts";
/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */
export function processRequest(rest: RestManager, request: RestRequest, payload: RestPayload) {
@@ -19,18 +20,16 @@ export function processRequest(rest: RestManager, request: RestRequest, payload:
const queue = rest.pathQueues.get(url);
if (queue) {
queue.requests.push({ request, payload });
queue.makeRequest({ request, payload });
} else {
// CREATES A NEW QUEUE
rest.pathQueues.set(url, {
isWaiting: false,
requests: [
{
request,
payload,
},
],
const bucketQueue = createQueueBucket(rest, {});
// Add request to queue
bucketQueue.makeRequest({
request,
payload,
});
rest.processQueue(rest, url);
// Save queue
rest.pathQueues.set(url, bucketQueue);
}
}

View File

@@ -12,6 +12,12 @@ export function processRequestHeaders(rest: RestManager, url: string, headers: H
// undefined override null needed for typings
const bucketId = headers.get("x-ratelimit-bucket") || undefined;
rest.pathQueues.get(url)?.handleCompletedRequest({
remaining: Number(remaining),
interval: Number(retryAfter) * 1000,
max: Number(headers.get("x-ratelimit-limit")),
});
// IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED
if (remaining === "0") {
rateLimited = true;
@@ -41,6 +47,10 @@ export function processRequestHeaders(rest: RestManager, url: string, headers: H
rest.globallyRateLimited = true;
rateLimited = true;
setTimeout(() => {
rest.globallyRateLimited = false;
}, globalReset);
rest.rateLimitedPaths.set("global", {
url: "global",
resetTimestamp: globalReset,

View File

@@ -3,6 +3,8 @@ import { removeTokenPrefix } from "../util/token.ts";
import { checkRateLimits } from "./checkRateLimits.ts";
import { cleanupQueues } from "./cleanupQueues.ts";
import { convertRestError } from "./convertRestError.ts";
import { createInvalidRequestBucket } from "./createInvalidRequestBucket.ts";
import { QueueBucket } from "./createQueueBucket.ts";
import { createRequestBody } from "./createRequestBody.ts";
import { processGlobalQueue } from "./processGlobalQueue.ts";
import { processQueue } from "./processQueue.ts";
@@ -21,35 +23,14 @@ export function createRestManager(options: CreateRestManagerOptions) {
baseEndpoints.BASE_URL = `${options.customUrl}/v${version}`;
}
return {
// current invalid amount
invalidRequests: 0,
// max invalid requests allowed until ban
maxInvalidRequests: 10000,
// 10 minutes
invalidRequestsInterval: 600000,
// timer to reset to 0
invalidRequestsTimeoutId: 0,
// how safe to be from max
invalidRequestsSafetyAmount: 1,
// when first request in this period was made
invalidRequestFrozenAt: 0,
invalidRequestErrorStatuses: [401, 403, 429],
const rest = {
invalidBucket: createInvalidRequestBucket({}),
version,
token: removeTokenPrefix(options.token),
maxRetryCount: options.maxRetryCount || 10,
secretKey: options.secretKey || "discordeno_best_lib_ever",
customUrl: options.customUrl || "",
pathQueues: new Map<
string,
{
isWaiting: boolean;
requests: {
request: RestRequest;
payload: RestPayload;
}[];
}
>(),
pathQueues: new Map<string, QueueBucket>(),
processingQueue: false,
processingRateLimitedPaths: false,
globallyRateLimited: false,
@@ -61,6 +42,7 @@ export function createRestManager(options: CreateRestManagerOptions) {
}[],
globalQueueProcessing: false,
rateLimitedPaths: new Map<string, RestRateLimitedPath>(),
debug: options.debug || function (_text: string) {},
checkRateLimits: options.checkRateLimits || checkRateLimits,
cleanupQueues: options.cleanupQueues || cleanupQueues,
@@ -91,6 +73,8 @@ export function createRestManager(options: CreateRestManagerOptions) {
);
},
};
return rest;
}
export interface CreateRestManagerOptions {

View File

@@ -23,6 +23,16 @@ export async function sendRequest<T>(rest: RestManager, options: RestSendRequest
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.fetching(options);
// @ts-ignore
if (options.url.startsWith(BASE_URL) && options.payload?.body) {
// @ts-ignore
options.payload.body = JSON.parse(options.payload.body);
// @ts-ignore
options.payload.body.content += ` ${Date.now()}`;
// @ts-ignore
options.payload.body = JSON.stringify(options.payload.body);
}
const response = await fetch(
options.url.startsWith(BASE_URL) ? options.url : `${BASE_URL}/v${rest.version}/${options.url}`,
{
@@ -70,24 +80,9 @@ export async function sendRequest<T>(rest: RestManager, options: RestSendRequest
break;
}
if (
rest.invalidRequestErrorStatuses.includes(response.status) &&
!(response.status === 429 && response.headers.get("X-RateLimit-Scope"))
) {
// INCREMENT CURRENT INVALID REQUESTS
++rest.invalidRequests;
if (!rest.invalidRequestsTimeoutId) {
rest.invalidRequestsTimeoutId = setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] Resetting invalid optionss counter in setTimeout.`);
rest.invalidRequests = 0;
rest.invalidRequestsTimeoutId = 0;
}, rest.invalidRequestsInterval);
}
}
// If NOT rate limited remove from queue
if (response.status !== 429) {
rest.invalidBucket.handleCompletedRequest(response.status, false);
const body = response.type ? JSON.stringify(await response.json()) : undefined;
return options.reject?.({
ok: false,
@@ -96,6 +91,8 @@ export async function sendRequest<T>(rest: RestManager, options: RestSendRequest
body,
});
} else {
const json = await response.json();
// TOO MANY ATTEMPTS, GET RID OF REQUEST FROM QUEUE.
if (options.retryCount && options.retryCount++ >= rest.maxRetryCount) {
rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(options)}`);
@@ -110,7 +107,7 @@ export async function sendRequest<T>(rest: RestManager, options: RestSendRequest
return;
} // RATE LIMITED, ADD BACK TO QUEUE
else {
const json = await response.json();
rest.invalidBucket.handleCompletedRequest(response.status, response.headers.get('X-RateLimit-Scope') === 'shared');
await delay(json.retry_after * 1000);
return options.retryRequest?.();
}

View File

@@ -4,7 +4,7 @@ import { dotenv } from "./deps.ts";
dotenv({ export: true, path: `${Deno.cwd()}/.env` });
const token = Deno.env.get("DISCORD_TOKEN");
const token = Deno.env.get("GAMER_TOKEN");
if (!token) throw new Error("Token was not provided.");
const REST_AUTHORIZATION_KEY = Deno.env.get("PROXY_REST_SECRET");
@@ -16,13 +16,12 @@ const rest = createRestManager({
token,
secretKey: REST_AUTHORIZATION_KEY,
customUrl: PROXY_REST_URL,
// debug: console.log,
});
// START LISTENING TO THE URL(localhost)
const server = Deno.listen({ port: REST_PORT });
console.log(
`HTTP webserver running. Access it at: ${PROXY_REST_URL}`,
`Rest Proxy running. Access it at: ${PROXY_REST_URL}`,
);
// Connections to the server will be yielded up as an async iterable.