refactor(rest): separate functions into diff files (#685)

* shtufffff

* fixe rate limit path infinite loop

* refactor

* ID > id

* Update process_request_headers.ts

* file names

Co-authored-by: ITOH <72305210+itohatweb@users.noreply.github.com>
Co-authored-by: ayntee <ayyantee@gmail.com>
This commit is contained in:
Skillz4Killz
2021-03-29 15:50:08 -04:00
committed by GitHub
parent 79d57f0377
commit ebb15afa0e
16 changed files with 444 additions and 708 deletions
-19
View File
@@ -1,19 +0,0 @@
export const restCache: RestCache = {
pathQueues: new Map(),
processingQueue: false,
globallyRateLimited: false,
ratelimitedPaths: new Map(),
eventHandlers: {
// BY DEFAULT WE WILL LOG ALL ERRORS TO CONSOLE. USER CAN CHOOSE TO OVERRIDE
error: function (_type, error) {
console.error(error);
},
// PLACEHOLDERS TO ALLOW USERS TO CUSTOMIZE
fetching() {},
fetched() {},
fetchSuccess() {},
fetchFailed() {},
globallyRateLimited() {},
retriesMaxed() {},
},
};
+17
View File
@@ -0,0 +1,17 @@
import { rest } from "./rest.ts";
/** Check the rate limits for a url or a bucket. */
export function checkRateLimits(url: string) {
const ratelimited = rest.ratelimitedPaths.get(url);
const global = rest.ratelimitedPaths.get("global");
const now = Date.now();
if (ratelimited && now < ratelimited.resetTimestamp) {
return ratelimited.resetTimestamp - now;
}
if (global && now < global.resetTimestamp) {
return global.resetTimestamp - now;
}
return false;
}
+13
View File
@@ -0,0 +1,13 @@
import { rest } from "./rest.ts";
/** Cleans up the queues by checking if there is nothing left and removing it. */
export function cleanupQueues() {
for (const [key, queue] of rest.pathQueues) {
if (queue.length) continue;
// REMOVE IT FROM CACHE
rest.pathQueues.delete(key);
}
// NO QUEUE LEFT, DISABLE THE QUEUE
if (!rest.pathQueues.size) rest.processingQueue = false;
}
+46
View File
@@ -0,0 +1,46 @@
/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */
export function createRequestBody(queuedRequest: QueuedRequest) {
const headers: { [key: string]: string } = {
Authorization: `Bot ${queuedRequest.options.token}`,
"User-Agent": USER_AGENT,
};
// GET METHODS SHOULD NOT HAVE A BODY
if (queuedRequest.request.method === "GET") {
queuedRequest.payload.body = undefined;
}
// IF A REASON IS PROVIDED ENCODE IT IN HEADERS
if (queuedRequest.payload.body?.reason) {
headers["X-Audit-Log-Reason"] = encodeURIComponent(
queuedRequest.payload.body.reason,
);
}
// IF A FILE/ATTACHMENT IS PRESENT WE NEED SPECIAL HANDLING
if (queuedRequest.payload.body?.file) {
const form = new FormData();
form.append(
"file",
queuedRequest.payload.body.file.blob,
queuedRequest.payload.body.file.name,
);
form.append(
"payload_json",
JSON.stringify({ ...queuedRequest.payload.body, file: undefined }),
);
queuedRequest.payload.body.file = form;
} else if (
queuedRequest.payload.body &&
!["GET", "DELETE"].includes(queuedRequest.request.method)
) {
headers["Content-Type"] = "application/json";
}
return {
headers,
body: queuedRequest.payload.body?.file ||
JSON.stringify(queuedRequest.payload.body),
method: queuedRequest.request.method,
};
}
-1
View File
@@ -1 +0,0 @@
export * from "https://deno.land/std@0.90.0/http/server.ts";
+34
View File
@@ -0,0 +1,34 @@
// SERVERLESS REST CLIENT THAT CAN WORK ACROSS SHARDS/WORKERS TO COMMUNICATE GLOBAL RATE LIMITS EASILY
import { rest } from "./rest.ts";
/** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */
export async function handlePayload(
request: Request,
) {
// INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY
const authorization = request.headers.get("authorization");
if (authorization !== rest.authorization) return;
// READ BUFFER AFTER AUTH CHECK
const buffer = await Deno.readAll(request.body);
try {
// CONVERT THE BODY TO JSON
const data = JSON.parse(new TextDecoder().decode(buffer));
if (
!["GET", "POST", "PUT", "PATCH", "HEAD", "DELETE"].includes(
request.method,
)
) {
return request.respond(
{
status: 400,
body: JSON.stringify({ error: "Invalid METHOD." }),
},
);
}
// PROCESS THE REQUEST
rest.processRequest(request, { body: data, retryCount: 0 });
} catch (error) {
rest.eventHandlers.error("serverRequest", error);
}
}
+11 -4
View File
@@ -1,4 +1,11 @@
export * from "./cache.ts"; export * from "./check_rate_limits.ts";
export * from "./queue.ts"; export * from "./cleanup_queues.ts";
export * from "./request.ts"; export * from "./create_request_body.ts";
export * from "./server.ts"; export * from "./handle_payload.ts";
export * from "./process_queue.ts";
export * from "./process_rate_limited_paths.ts";
export * from "./process_request.ts";
export * from "./process_request_headers.ts";
export * from "./request_manager.ts";
export * from "./rest.ts";
export * from "./run_method.ts";
+16 -45
View File
@@ -1,15 +1,13 @@
import { delay } from "../util/utils.ts"; import { rest } from "./rest.ts";
import { restCache } from "./cache.ts";
import { createRequestBody, processRequestHeaders } from "./request.ts";
/** Processes the queue by looping over each path separately until the queues are empty. */ /** Processes the queue by looping over each path separately until the queues are empty. */
export async function processQueue(id: string) { export async function processQueue(id: string) {
const queue = restCache.pathQueues.get(id); const queue = rest.pathQueues.get(id);
if (!queue) return; if (!queue) return;
while (queue.length) { while (queue.length) {
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN // IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (restCache.globallyRateLimited) { if (rest.globallyRateLimited) {
setTimeout(() => processQueue(id), 1000); setTimeout(() => processQueue(id), 1000);
break; break;
@@ -20,7 +18,7 @@ export async function processQueue(id: string) {
if (!queuedRequest) return; if (!queuedRequest) return;
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = checkRateLimits(queuedRequest.request.url); const urlResetIn = rest.checkRateLimits(queuedRequest.request.url);
if (urlResetIn) { if (urlResetIn) {
// PAUSE FOR THIS SPECIFC REQUEST // PAUSE FOR THIS SPECIFC REQUEST
await delay(urlResetIn); await delay(urlResetIn);
@@ -28,7 +26,7 @@ export async function processQueue(id: string) {
} // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS } // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = queuedRequest.payload.bucketId const bucketResetIn = queuedRequest.payload.bucketId
? checkRateLimits(queuedRequest.payload.bucketId) ? rest.checkRateLimits(queuedRequest.payload.bucketId)
: false; : false;
// THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE // THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE
if (bucketResetIn) continue; if (bucketResetIn) continue;
@@ -48,21 +46,22 @@ export async function processQueue(id: string) {
: queuedRequest.request.url; : queuedRequest.request.url;
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
restCache.eventHandlers.fetching(queuedRequest.payload); rest.eventHandlers.fetching(queuedRequest.payload);
try { try {
const response = await fetch( const response = await fetch(
urlToUse, urlToUse,
createRequestBody(queuedRequest), rest.createRequestBody(queuedRequest),
); );
restCache.eventHandlers.fetched(queuedRequest.payload);
const bucketIdFromHeaders = processRequestHeaders( rest.eventHandlers.fetched(queuedRequest.payload);
const bucketIdFromHeaders = rest.processRequestHeaders(
queuedRequest.request.url, queuedRequest.request.url,
response.headers, response.headers,
); );
if (response.status < 200 || response.status >= 400) { if (response.status < 200 || response.status >= 400) {
restCache.eventHandlers.error( rest.eventHandlers.error(
"httpError", "httpError",
queuedRequest.payload, queuedRequest.payload,
response, response,
@@ -91,7 +90,7 @@ export async function processQueue(id: string) {
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON // SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) { if (response.status === 204) {
restCache.eventHandlers.fetchSuccess(queuedRequest.payload); rest.eventHandlers.fetchSuccess(queuedRequest.payload);
// REMOVE FROM QUEUE // REMOVE FROM QUEUE
queue.shift(); queue.shift();
queuedRequest.request.respond({ status: 204 }); queuedRequest.request.respond({ status: 204 });
@@ -108,7 +107,7 @@ export async function processQueue(id: string) {
queuedRequest.payload.retryCount >= queuedRequest.payload.retryCount >=
queuedRequest.options.maxRetryCount queuedRequest.options.maxRetryCount
) { ) {
restCache.eventHandlers.retriesMaxed(queuedRequest.payload); rest.eventHandlers.retriesMaxed(queuedRequest.payload);
queuedRequest.request.respond( queuedRequest.request.respond(
{ {
status: 200, status: 200,
@@ -133,7 +132,7 @@ export async function processQueue(id: string) {
continue; continue;
} }
restCache.eventHandlers.fetchSuccess(queuedRequest.payload); rest.eventHandlers.fetchSuccess(queuedRequest.payload);
// REMOVE FROM QUEUE // REMOVE FROM QUEUE
queue.shift(); queue.shift();
queuedRequest.request.respond( queuedRequest.request.respond(
@@ -142,7 +141,7 @@ export async function processQueue(id: string) {
} }
} catch (error) { } catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR // SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
restCache.eventHandlers.fetchFailed(queuedRequest.payload, error); rest.eventHandlers.fetchFailed(queuedRequest.payload, error);
queuedRequest.request.respond( queuedRequest.request.respond(
{ status: 404, body: JSON.stringify({ error }) }, { status: 404, body: JSON.stringify({ error }) },
); );
@@ -152,33 +151,5 @@ export async function processQueue(id: string) {
} }
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
cleanupQueues(); rest.cleanupQueues();
}
/** Cleans up the queues by checking if there is nothing left and removing it. */
export function cleanupQueues() {
for (const [key, queue] of restCache.pathQueues) {
if (queue.length) continue;
// REMOVE IT FROM CACHE
restCache.pathQueues.delete(key);
}
// NO QUEUE LEFT, DISABLE THE QUEUE
if (!restCache.pathQueues.size) restCache.processingQueue = false;
}
/** Check the rate limits for a url or a bucket. */
export function checkRateLimits(url: string) {
const ratelimited = restCache.ratelimitedPaths.get(url);
const global = restCache.ratelimitedPaths.get("global");
const now = Date.now();
if (ratelimited && now < ratelimited.resetTimestamp) {
return ratelimited.resetTimestamp - now;
}
if (global && now < global.resetTimestamp) {
return global.resetTimestamp - now;
}
return false;
} }
+25
View File
@@ -0,0 +1,25 @@
import { rest } from "./rest.ts";
/** This will create a infinite loop running in 1 seconds using tail recursion to keep rate limits clean. When a rate limit resets, this will remove it so the queue can proceed. */
export function processRateLimitedPaths() {
const now = Date.now();
rest.ratelimitedPaths.forEach((value, key) => {
// IF THE TIME HAS NOT REACHED CANCEL
if (value.resetTimestamp > now) return;
// RATE LIMIT IS OVER, DELETE THE RATE LIMITER
rest.ratelimitedPaths.delete(key);
// IF IT WAS GLOBAL ALSO MARK THE GLOBAL VALUE AS FALSE
if (key === "global") rest.globallyRateLimited = false;
});
// ALL PATHS ARE CLEARED CAN CANCEL OUT!
if (!rest.ratelimitedPaths.size) {
rest.processingRateLimitedPaths = false;
return;
} else {
rest.processingRateLimitedPaths = true;
// RECHECK IN 1 SECOND
setTimeout(() => processRateLimitedPaths(), 1000);
}
}
+33
View File
@@ -0,0 +1,33 @@
/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */
export function processRequest(
request: ServerRequest,
payload: RunMethodOptions,
options: RestServerOptions,
) {
const route = request.url.substring(request.url.indexOf("api/"));
const parts = route.split("/");
// REMOVE THE API
parts.shift();
// REMOVES THE VERSION NUMBER
if (parts[0]?.startsWith("v")) parts.shift();
// SET THE NEW REQUEST URL
request.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`;
// REMOVE THE MAJOR PARAM
parts.shift();
const [id] = parts;
const queue = restCache.pathQueues.get(id);
// IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE
if (queue) {
queue.push({ request, payload, options });
} else {
// CREATES A NEW QUEUE
restCache.pathQueues.set(id, [{
request,
payload,
options,
}]);
processQueue(id);
}
}
+61
View File
@@ -0,0 +1,61 @@
import { rest } from "./rest.ts";
/** Processes the rate limit headers and determines if it needs to be ratelimited and returns the bucket id if available */
export function processRequestHeaders(url: string, headers: Headers) {
let ratelimited = false;
// GET ALL NECESSARY HEADERS
const remaining = headers.get("x-ratelimit-remaining");
const resetTimestamp = headers.get("x-ratelimit-reset");
const retryAfter = headers.get("retry-after");
const global = headers.get("x-ratelimit-global");
const bucketId = headers.get("x-ratelimit-bucket");
// IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED
if (remaining && remaining === "0") {
ratelimited = true;
// SAVE THE URL AS LIMITED, IMPORTANT FOR NEW REQUESTS BY USER WITHOUT BUCKET
rest.ratelimitedPaths.set(url, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
// SAVE THE BUCKET AS LIMITED SINCE DIFFERENT URLS MAY SHARE A BUCKET
if (bucketId) {
rest.ratelimitedPaths.set(bucketId, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
}
}
// IF THERE IS NO REMAINING GLOBAL LIMIT, MARK IT RATE LIMITED GLOBALLY
if (global) {
const reset = Date.now() + (Number(retryAfter) * 1000);
rest.eventHandlers.globallyRateLimited(url, reset);
rest.globallyRateLimited = true;
ratelimited = true;
rest.ratelimitedPaths.set("global", {
url: "global",
resetTimestamp: reset,
bucketId,
});
if (bucketId) {
rest.ratelimitedPaths.set(bucketId, {
url: "global",
resetTimestamp: reset,
bucketId,
});
}
}
if (ratelimited && !rest.processingRateLimitedPaths) {
rest.processRateLimitedPaths();
}
return ratelimited ? bucketId : undefined;
}
-162
View File
@@ -1,162 +0,0 @@
import { BASE_URL, USER_AGENT } from "../util/constants.ts";
import { restCache } from "./cache.ts";
import { ServerRequest } from "./deps.ts";
import { processQueue } from "./queue.ts";
/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */
export function processRequest(
request: ServerRequest,
payload: RunMethodOptions,
options: RestServerOptions,
) {
const route = request.url.substring(request.url.indexOf("api/"));
const parts = route.split("/");
// REMOVE THE API
parts.shift();
// REMOVES THE VERSION NUMBER
if (parts[0]?.startsWith("v")) parts.shift();
// SET THE NEW REQUEST URL
request.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`;
// REMOVE THE MAJOR PARAM
parts.shift();
const [id] = parts;
const queue = restCache.pathQueues.get(id);
// IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE
if (queue) {
queue.push({ request, payload, options });
} else {
// CREATES A NEW QUEUE
restCache.pathQueues.set(id, [{
request,
payload,
options,
}]);
processQueue(id);
}
}
/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */
export function createRequestBody(queuedRequest: QueuedRequest) {
const headers: { [key: string]: string } = {
Authorization: `Bot ${queuedRequest.options.token}`,
"User-Agent": USER_AGENT,
};
// GET METHODS SHOULD NOT HAVE A BODY
if (queuedRequest.request.method === "GET") {
queuedRequest.payload.body = undefined;
}
// IF A REASON IS PROVIDED ENCODE IT IN HEADERS
if (queuedRequest.payload.body?.reason) {
headers["X-Audit-Log-Reason"] = encodeURIComponent(
queuedRequest.payload.body.reason,
);
}
// IF A FILE/ATTACHMENT IS PRESENT WE NEED SPECIAL HANDLING
if (queuedRequest.payload.body?.file) {
const form = new FormData();
form.append(
"file",
queuedRequest.payload.body.file.blob,
queuedRequest.payload.body.file.name,
);
form.append(
"payload_json",
JSON.stringify({ ...queuedRequest.payload.body, file: undefined }),
);
queuedRequest.payload.body.file = form;
} else if (
queuedRequest.payload.body &&
!["GET", "DELETE"].includes(queuedRequest.request.method)
) {
headers["Content-Type"] = "application/json";
}
return {
headers,
body: queuedRequest.payload.body?.file ||
JSON.stringify(queuedRequest.payload.body),
method: queuedRequest.request.method,
};
}
/** Processes the rate limit headers and determines if it needs to be ratelimited and returns the bucket id if available */
export function processRequestHeaders(url: string, headers: Headers) {
let ratelimited = false;
// GET ALL NECESSARY HEADERS
const remaining = headers.get("x-ratelimit-remaining");
const resetTimestamp = headers.get("x-ratelimit-reset");
const retryAfter = headers.get("retry-after");
const global = headers.get("x-ratelimit-global");
const bucketId = headers.get("x-ratelimit-bucket");
// IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED
if (remaining && remaining === "0") {
ratelimited = true;
// SAVE THE URL AS LIMITED, IMPORTANT FOR NEW REQUESTS BY USER WITHOUT BUCKET
restCache.ratelimitedPaths.set(url, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
// SAVE THE BUCKET AS LIMITED SINCE DIFFERENT URLS MAY SHARE A BUCKET
if (bucketId) {
restCache.ratelimitedPaths.set(bucketId, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
}
}
// IF THERE IS NO REMAINING GLOBAL LIMIT, MARK IT RATE LIMITED GLOBALLY
if (global) {
const reset = Date.now() + (Number(retryAfter) * 1000);
restCache.eventHandlers.globallyRateLimited(url, reset);
restCache.globallyRateLimited = true;
ratelimited = true;
restCache.ratelimitedPaths.set("global", {
url: "global",
resetTimestamp: reset,
bucketId,
});
if (bucketId) {
restCache.ratelimitedPaths.set(bucketId, {
url: "global",
resetTimestamp: reset,
bucketId,
});
}
}
return ratelimited ? bucketId : undefined;
}
/** This wll create a infinite loop running in 1 seconds using tail recursion to keep rate limits clean. When a rate limit resets, this will remove it so the queue can proceed. */
function processRateLimitedPaths() {
const now = Date.now();
restCache.ratelimitedPaths.forEach((value, key) => {
// IF THE TIME HAS NOT REACHED CANCEL
if (value.resetTimestamp > now) return;
// RATE LIMIT IS OVER, DELETE THE RATE LIMITER
restCache.ratelimitedPaths.delete(key);
// IF IT WAS GLOBAL ALSO MARK THE GLOBAL VALUE AS FALSE
if (key === "global") restCache.globallyRateLimited = false;
});
// RECHECK IN 1 SECOND
setTimeout(() => processRateLimitedPaths(), 1000);
}
/** Starts the loop */
processRateLimitedPaths();
+6 -423
View File
@@ -1,436 +1,19 @@
import { authorization, eventHandlers, restAuthorization } from "../bot.ts"; import { rest } from "./rest.ts";
import {
API_VERSION,
BASE_URL,
baseEndpoints,
IMAGE_BASE_URL,
USER_AGENT,
} from "../util/constants.ts";
import { delay } from "../util/utils.ts";
const pathQueues: { [key: string]: QueuedRequest[] } = {};
const ratelimitedPaths = new Map<string, RateLimitedPath>();
let globallyRateLimited = false;
let queueInProcess = false;
async function processRateLimitedPaths() {
const now = Date.now();
ratelimitedPaths.forEach((value, key) => {
if (value.resetTimestamp > now) return;
ratelimitedPaths.delete(key);
if (key === "global") globallyRateLimited = false;
});
await delay(1000);
await processRateLimitedPaths();
}
function addToQueue(request: QueuedRequest) {
const route = request.url.substring(baseEndpoints.BASE_URL.length + 1);
const parts = route.split("/");
// Remove the major param
parts.shift();
const [id] = parts;
if (pathQueues[id]) {
pathQueues[id].push(request);
} else {
pathQueues[id] = [request];
}
}
function cleanupQueues() {
Object.entries(pathQueues).forEach(([key, value]) => {
if (!value.length) {
// Remove it entirely
delete pathQueues[key];
}
});
}
async function processQueue() {
while (queueInProcess) {
if (
(Object.keys(pathQueues).length) && !globallyRateLimited
) {
await Promise.allSettled(
Object.values(pathQueues).map(async (pathQueue) => {
const request = pathQueue.shift();
if (!request) return;
const rateLimitedURLResetIn = await checkRatelimits(request.url);
if (request.bucketId) {
const rateLimitResetIn = await checkRatelimits(request.bucketId);
if (rateLimitResetIn) {
// This request is still rate limited readd to queue
addToQueue(request);
} else if (rateLimitedURLResetIn) {
// This URL is rate limited readd to queue
addToQueue(request);
} else {
// This request is not rate limited so it should be run
const result = await request.callback();
if (result && result.rateLimited) {
addToQueue(
{ ...request, bucketId: result.bucketId || request.bucketId },
);
}
}
} else {
if (rateLimitedURLResetIn) {
// This URL is rate limited readd to queue
addToQueue(request);
} else {
// This request has no bucket id so it should be processed
const result = await request.callback();
if (request && result && result.rateLimited) {
addToQueue(
{ ...request, bucketId: result.bucketId || request.bucketId },
);
}
}
}
}),
);
}
if (Object.keys(pathQueues).length) {
cleanupQueues();
} else queueInProcess = false;
}
}
processRateLimitedPaths();
export const RequestManager = { export const RequestManager = {
get: (url: string, body?: unknown) => { get: (url: string, body?: unknown) => {
return runMethod("get", url, body); return rest.runMethod("get", url, body);
}, },
post: (url: string, body?: unknown) => { post: (url: string, body?: unknown) => {
return runMethod("post", url, body); return rest.runMethod("post", url, body);
}, },
delete: (url: string, body?: unknown) => { delete: (url: string, body?: unknown) => {
return runMethod("delete", url, body); return rest.runMethod("delete", url, body);
}, },
patch: (url: string, body?: unknown) => { patch: (url: string, body?: unknown) => {
return runMethod("patch", url, body); return rest.runMethod("patch", url, body);
}, },
put: (url: string, body?: unknown) => { put: (url: string, body?: unknown) => {
return runMethod("put", url, body); return rest.runMethod("put", url, body);
}, },
}; };
// deno-lint-ignore no-explicit-any
function createRequestBody(body: any, method: RequestMethods) {
const headers: { [key: string]: string } = {
Authorization: authorization,
"User-Agent": USER_AGENT,
};
if (method === "get") body = undefined;
if (body?.reason) {
headers["X-Audit-Log-Reason"] = encodeURIComponent(body.reason);
}
if (body?.file) {
if (!Array.isArray(body.file)) body.file = [body.file];
const form = new FormData();
body.file.map((file: FileContent, index: number) =>
// The key of the form data item must be unique; otherwise, Discordeno only considers the first item in the form data with the same names
form.append(`file${index + 1}`, file.blob, file.name)
);
form.append("payload_json", JSON.stringify({ ...body, file: undefined }));
body.file = form;
} else if (
body && !["get", "delete"].includes(method)
) {
headers["Content-Type"] = "application/json";
}
return {
headers,
body: body?.file || JSON.stringify(body),
method: method.toUpperCase(),
};
}
function checkRatelimits(url: string) {
const ratelimited = ratelimitedPaths.get(url);
const global = ratelimitedPaths.get("global");
const now = Date.now();
if (ratelimited && now < ratelimited.resetTimestamp) {
return ratelimited.resetTimestamp - now;
}
if (global && now < global.resetTimestamp) {
return global.resetTimestamp - now;
}
return false;
}
function runMethod(
method: RequestMethods,
url: string,
body?: unknown,
retryCount = 0,
bucketId?: string | null,
) {
eventHandlers.debug?.(
{
type: "requestCreate",
data: { method, url, body, retryCount, bucketId },
},
);
const errorStack = new Error("Location:");
Error.captureStackTrace(errorStack);
// For proxies we don't need to do any of the legwork so we just forward the request
if (
!url.startsWith(`${BASE_URL}/v${API_VERSION}`) &&
!url.startsWith(IMAGE_BASE_URL)
) {
return fetch(url, {
body: JSON.stringify(body || {}),
headers: {
authorization: restAuthorization,
},
method: method.toUpperCase(),
})
.then((res) => {
if (res.status === 204) return undefined;
return res.json();
})
.catch((error) => {
console.error(error);
throw errorStack;
});
}
// No proxy so we need to handle all rate limiting and such
// deno-lint-ignore no-async-promise-executor
return new Promise(async (resolve, reject) => {
const callback = async () => {
try {
const rateLimitResetIn = await checkRatelimits(url);
if (rateLimitResetIn) {
return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketId };
}
const query = method === "get" && body
? // deno-lint-ignore no-explicit-any
Object.entries(body as any).map(([key, value]) =>
// deno-lint-ignore no-explicit-any
`${encodeURIComponent(key)}=${encodeURIComponent(value as any)}`
)
.join("&")
: "";
const urlToUse = method === "get" && query ? `${url}?${query}` : url;
eventHandlers.debug?.(
{
type: "requestFetch",
data: { method, url, body, retryCount, bucketId },
},
);
const response = await fetch(urlToUse, createRequestBody(body, method));
eventHandlers.debug?.(
{
type: "requestFetched",
data: { method, url, body, retryCount, bucketId, response },
},
);
const bucketIdFromHeaders = processHeaders(url, response.headers);
await handleStatusCode(response, errorStack);
// Sometimes Discord returns an empty 204 response that can't be made to JSON.
if (response.status === 204) return resolve(undefined);
const json = await response.json();
if (
json.retry_after ||
json.message === "You are being rate limited."
) {
if (retryCount > 10) {
eventHandlers.debug?.(
{
type: "error",
data: { method, url, body, retryCount, bucketId, errorStack },
},
);
throw new Error(Errors.RATE_LIMIT_RETRY_MAXED);
}
return {
rateLimited: json.retry_after,
beforeFetch: false,
bucketId: bucketIdFromHeaders,
};
}
eventHandlers.debug?.(
{
type: "requestSuccess",
data: { method, url, body, retryCount, bucketId },
},
);
return resolve(json);
} catch (error) {
eventHandlers.debug?.(
{
type: "error",
data: { method, url, body, retryCount, bucketId, errorStack },
},
);
return reject(error);
}
};
addToQueue({
callback,
bucketId,
url,
});
if (!queueInProcess) {
queueInProcess = true;
await processQueue();
}
});
}
async function logErrors(response: Response, errorStack?: unknown) {
try {
const error = await response.json();
console.error(Deno.inspect(error, { depth: 10 }));
eventHandlers.debug?.({ type: "error", data: { errorStack, error } });
} catch {
eventHandlers.debug?.(
{
type: "error",
data: { errorStack },
},
);
console.error(response);
}
}
async function handleStatusCode(response: Response, errorStack?: unknown) {
const status = response.status;
if (
(status >= 200 && status < 400) ||
status === HttpResponseCode.TooManyRequests
) {
return true;
}
await logErrors(response, errorStack);
switch (status) {
case HttpResponseCode.BadRequest:
console.error(
"The request was improperly formatted, or the server couldn't understand it.",
);
throw errorStack;
case HttpResponseCode.Unauthorized:
console.error("The Authorization header was missing or invalid.");
throw errorStack;
case HttpResponseCode.Forbidden:
console.error(
"The Authorization token you passed did not have permission to the resource.",
);
throw errorStack;
case HttpResponseCode.NotFound:
console.error("The resource at the location specified doesn't exist.");
throw errorStack;
case HttpResponseCode.MethodNotAllowed:
console.error(
"The HTTP method used is not valid for the location specified.",
);
throw errorStack;
case HttpResponseCode.GatewayUnavailable:
console.error(
"There was not a gateway available to process your request. Wait a bit and retry.",
);
throw errorStack;
// left are all unknown
default:
console.error(Errors.REQUEST_UNKNOWN_ERROR);
throw errorStack;
}
}
function processHeaders(url: string, headers: Headers) {
let ratelimited = false;
// Get all useful headers
const remaining = headers.get("x-ratelimit-remaining");
const resetTimestamp = headers.get("x-ratelimit-reset");
const retryAfter = headers.get("retry-after");
const global = headers.get("x-ratelimit-global");
const bucketId = headers.get("x-ratelimit-bucket");
// If there is no remaining rate limit for this endpoint, we save it in cache
if (remaining && remaining === "0") {
ratelimited = true;
ratelimitedPaths.set(url, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
if (bucketId) {
ratelimitedPaths.set(bucketId, {
url,
resetTimestamp: Number(resetTimestamp) * 1000,
bucketId,
});
}
}
// If there is no remaining global limit, we save it in cache
if (global) {
const reset = Date.now() + (Number(retryAfter) * 1000);
eventHandlers.debug?.(
{ type: "globallyRateLimited", data: { url, reset } },
);
globallyRateLimited = true;
ratelimited = true;
ratelimitedPaths.set("global", {
url: "global",
resetTimestamp: reset,
bucketId,
});
if (bucketId) {
ratelimitedPaths.set(bucketId, {
url: "global",
resetTimestamp: reset,
bucketId,
});
}
}
if (ratelimited) {
eventHandlers.rateLimit?.({
remaining,
bucketId,
global,
resetTimestamp,
retryAfter,
url,
});
}
return ratelimited ? bucketId : undefined;
}
+47
View File
@@ -0,0 +1,47 @@
import { checkRateLimits } from "./check_rate_limits.ts";
import { cleanupQueues } from "./cleanup_queues.ts";
import { createRequestBody } from "./create_request_body.ts";
import { handlePayload } from "./handle_payload.ts";
import { processQueue } from "./process_queue.ts";
import { processRateLimitedPaths } from "./process_rate_limited_paths.ts";
import { processRequest } from "./process_request.ts";
import { processRequestHeaders } from "./process_request_headers.ts";
import { RequestManager } from "./request_manager.ts";
import { runMethod } from "./run_method.ts";
export const rest: RestCache = {
/** The secret authorization key to confirm that this was a request made by you and not a DDOS attack. */
authorization: "discordeno_best_lib_ever",
pathQueues: new Map(),
processingQueue: false,
processingRateLimitedPaths: false,
globallyRateLimited: false,
ratelimitedPaths: new Map(),
eventHandlers: {
// BY DEFAULT WE WILL LOG ALL ERRORS TO CONSOLE. USER CAN CHOOSE TO OVERRIDE
error: function (_type, error) {
console.error(error);
},
// PLACEHOLDERS TO ALLOW USERS TO CUSTOMIZE
fetching() {},
fetched() {},
fetchSuccess() {},
fetchFailed() {},
globallyRateLimited() {},
retriesMaxed() {},
},
// TODO: add propeer docs dcomments
manager: RequestManager,
/** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */
handlePayload,
checkRateLimits,
cleanupQueues,
processQueue,
processRateLimitedPaths,
processRequestHeaders,
processRequest,
createRequestBody,
runMethod,
};
+135
View File
@@ -0,0 +1,135 @@
import { rest } from "./rest.ts";
export function runMethod(
method: RequestMethods,
url: string,
body?: unknown,
retryCount = 0,
bucketId?: string | null,
) {
rest.eventHandlers.debug?.(
{
type: "requestCreate",
data: { method, url, body, retryCount, bucketId },
},
);
const errorStack = new Error("Location:");
Error.captureStackTrace(errorStack);
// For proxies we don't need to do any of the legwork so we just forward the request
if (
!url.startsWith(`${BASE_URL}/v${API_VERSION}`) &&
!url.startsWith(IMAGE_BASE_URL)
) {
return fetch(url, {
body: JSON.stringify(body || {}),
headers: {
authorization: rest.authorization,
},
method: method.toUpperCase(),
})
.then((res) => {
if (res.status === 204) return undefined;
return res.json();
})
.catch((error) => {
console.error(error);
throw errorStack;
});
}
// No proxy so we need to handle all rate limiting and such
// deno-lint-ignore no-async-promise-executor
return new Promise(async (resolve, reject) => {
const callback = async () => {
try {
const rateLimitResetIn = await rest.checkRatelimits(url);
if (rateLimitResetIn) {
return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketId };
}
const query = method === "get" && body
? // deno-lint-ignore no-explicit-any
Object.entries(body as any).map(([key, value]) =>
// deno-lint-ignore no-explicit-any
`${encodeURIComponent(key)}=${encodeURIComponent(value as any)}`
)
.join("&")
: "";
const urlToUse = method === "get" && query ? `${url}?${query}` : url;
rest.eventHandlers.debug?.(
{
type: "requestFetch",
data: { method, url, body, retryCount, bucketId },
},
);
const response = await fetch(
urlToUse,
rest.createRequestBody(body, method),
);
rest.eventHandlers.debug?.(
{
type: "requestFetched",
data: { method, url, body, retryCount, bucketId, response },
},
);
const bucketIdFromHeaders = rest.processHeaders(url, response.headers);
await rest.handleStatusCode(response, errorStack);
// Sometimes Discord returns an empty 204 response that can't be made to JSON.
if (response.status === 204) return resolve(undefined);
const json = await response.json();
if (
json.retry_after ||
json.message === "You are being rate limited."
) {
if (retryCount > 10) {
rest.eventHandlers.debug?.(
{
type: "error",
data: { method, url, body, retryCount, bucketId, errorStack },
},
);
throw new Error(Errors.RATE_LIMIT_RETRY_MAXED);
}
return {
rateLimited: json.retry_after,
beforeFetch: false,
bucketId: bucketIdFromHeaders,
};
}
rest.eventHandlers.debug?.(
{
type: "requestSuccess",
data: { method, url, body, retryCount, bucketId },
},
);
return resolve(json);
} catch (error) {
rest.eventHandlers.debug?.(
{
type: "error",
data: { method, url, body, retryCount, bucketId, errorStack },
},
);
return reject(error);
}
};
rest.addToQueue({
callback,
bucketId,
url,
});
if (!rest.queueInProcess) {
rest.queueInProcess = true;
await rest.processQueue();
}
});
}
-54
View File
@@ -1,54 +0,0 @@
// SERVERLESS REST CLIENT THAT CAN WORK ACROSS SHARDS/WORKERS TO COMMUNICATE GLOBAL RATE LIMITS EASILY
import { restCache } from "./cache.ts";
import { serve, ServerRequest, serveTLS } from "./deps.ts";
import { processRequest } from "./request.ts";
/** Begins an http server that will handle incoming requests. */
export async function startRESTServer(options: RestServerOptions) {
const server = options.keys
? serveTLS({
port: options.port,
certFile: options.keys.cert,
keyFile: options.keys.key,
})
: serve({ port: options.port });
for await (const request of server) {
handlePayload(request, options).catch((error) => {
restCache.eventHandlers.error("processRequest", error);
});
}
}
/** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */
async function handlePayload(
request: ServerRequest,
options: RestServerOptions,
) {
// INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY
const authorization = request.headers.get("authorization");
if (authorization !== options.authorization) return;
// READ BUFFER AFTER AUTH CHECK
const buffer = await Deno.readAll(request.body);
try {
// CONVERT THE BODY TO JSON
const data = JSON.parse(new TextDecoder().decode(buffer));
if (
!["GET", "POST", "PUT", "PATCH", "HEAD", "DELETE"].includes(
request.method,
)
) {
return request.respond(
{
status: 400,
body: JSON.stringify({ error: "Invalid METHOD." }),
},
);
}
// PROCESS THE REQUEST
processRequest(request, { body: data, retryCount: 0 }, options);
} catch (error) {
restCache.eventHandlers.error("serverRequest", error);
}
}