diff --git a/src/rest/cache.ts b/src/rest/cache.ts deleted file mode 100644 index 43f53e6eb..000000000 --- a/src/rest/cache.ts +++ /dev/null @@ -1,19 +0,0 @@ -export const restCache: RestCache = { - pathQueues: new Map(), - processingQueue: false, - globallyRateLimited: false, - ratelimitedPaths: new Map(), - eventHandlers: { - // BY DEFAULT WE WILL LOG ALL ERRORS TO CONSOLE. USER CAN CHOOSE TO OVERRIDE - error: function (_type, error) { - console.error(error); - }, - // PLACEHOLDERS TO ALLOW USERS TO CUSTOMIZE - fetching() {}, - fetched() {}, - fetchSuccess() {}, - fetchFailed() {}, - globallyRateLimited() {}, - retriesMaxed() {}, - }, -}; diff --git a/src/rest/check_rate_limits.ts b/src/rest/check_rate_limits.ts new file mode 100644 index 000000000..303d04b01 --- /dev/null +++ b/src/rest/check_rate_limits.ts @@ -0,0 +1,17 @@ +import { rest } from "./rest.ts"; + +/** Check the rate limits for a url or a bucket. */ +export function checkRateLimits(url: string) { + const ratelimited = rest.ratelimitedPaths.get(url); + const global = rest.ratelimitedPaths.get("global"); + const now = Date.now(); + + if (ratelimited && now < ratelimited.resetTimestamp) { + return ratelimited.resetTimestamp - now; + } + if (global && now < global.resetTimestamp) { + return global.resetTimestamp - now; + } + + return false; +} diff --git a/src/rest/cleanup_queues.ts b/src/rest/cleanup_queues.ts new file mode 100644 index 000000000..4171fc189 --- /dev/null +++ b/src/rest/cleanup_queues.ts @@ -0,0 +1,13 @@ +import { rest } from "./rest.ts"; + +/** Cleans up the queues by checking if there is nothing left and removing it. */ +export function cleanupQueues() { + for (const [key, queue] of rest.pathQueues) { + if (queue.length) continue; + // REMOVE IT FROM CACHE + rest.pathQueues.delete(key); + } + + // NO QUEUE LEFT, DISABLE THE QUEUE + if (!rest.pathQueues.size) rest.processingQueue = false; +} diff --git a/src/rest/create_request_body.ts b/src/rest/create_request_body.ts new file mode 100644 index 000000000..07c1effa7 --- /dev/null +++ b/src/rest/create_request_body.ts @@ -0,0 +1,46 @@ +/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */ +export function createRequestBody(queuedRequest: QueuedRequest) { + const headers: { [key: string]: string } = { + Authorization: `Bot ${queuedRequest.options.token}`, + "User-Agent": USER_AGENT, + }; + + // GET METHODS SHOULD NOT HAVE A BODY + if (queuedRequest.request.method === "GET") { + queuedRequest.payload.body = undefined; + } + + // IF A REASON IS PROVIDED ENCODE IT IN HEADERS + if (queuedRequest.payload.body?.reason) { + headers["X-Audit-Log-Reason"] = encodeURIComponent( + queuedRequest.payload.body.reason, + ); + } + + // IF A FILE/ATTACHMENT IS PRESENT WE NEED SPECIAL HANDLING + if (queuedRequest.payload.body?.file) { + const form = new FormData(); + form.append( + "file", + queuedRequest.payload.body.file.blob, + queuedRequest.payload.body.file.name, + ); + form.append( + "payload_json", + JSON.stringify({ ...queuedRequest.payload.body, file: undefined }), + ); + queuedRequest.payload.body.file = form; + } else if ( + queuedRequest.payload.body && + !["GET", "DELETE"].includes(queuedRequest.request.method) + ) { + headers["Content-Type"] = "application/json"; + } + + return { + headers, + body: queuedRequest.payload.body?.file || + JSON.stringify(queuedRequest.payload.body), + method: queuedRequest.request.method, + }; +} diff --git a/src/rest/deps.ts b/src/rest/deps.ts deleted file mode 100644 index b02feea5b..000000000 --- a/src/rest/deps.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "https://deno.land/std@0.90.0/http/server.ts"; diff --git a/src/rest/handle_payload.ts b/src/rest/handle_payload.ts new file mode 100644 index 000000000..89a723b07 --- /dev/null +++ b/src/rest/handle_payload.ts @@ -0,0 +1,34 @@ +// SERVERLESS REST CLIENT THAT CAN WORK ACROSS SHARDS/WORKERS TO COMMUNICATE GLOBAL RATE LIMITS EASILY +import { rest } from "./rest.ts"; + +/** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */ +export async function handlePayload( + request: Request, +) { + // INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY + const authorization = request.headers.get("authorization"); + if (authorization !== rest.authorization) return; + // READ BUFFER AFTER AUTH CHECK + const buffer = await Deno.readAll(request.body); + try { + // CONVERT THE BODY TO JSON + const data = JSON.parse(new TextDecoder().decode(buffer)); + if ( + !["GET", "POST", "PUT", "PATCH", "HEAD", "DELETE"].includes( + request.method, + ) + ) { + return request.respond( + { + status: 400, + body: JSON.stringify({ error: "Invalid METHOD." }), + }, + ); + } + + // PROCESS THE REQUEST + rest.processRequest(request, { body: data, retryCount: 0 }); + } catch (error) { + rest.eventHandlers.error("serverRequest", error); + } +} diff --git a/src/rest/mod.ts b/src/rest/mod.ts index d29ef8aac..e8eeb3c8c 100644 --- a/src/rest/mod.ts +++ b/src/rest/mod.ts @@ -1,4 +1,11 @@ -export * from "./cache.ts"; -export * from "./queue.ts"; -export * from "./request.ts"; -export * from "./server.ts"; +export * from "./check_rate_limits.ts"; +export * from "./cleanup_queues.ts"; +export * from "./create_request_body.ts"; +export * from "./handle_payload.ts"; +export * from "./process_queue.ts"; +export * from "./process_rate_limited_paths.ts"; +export * from "./process_request.ts"; +export * from "./process_request_headers.ts"; +export * from "./request_manager.ts"; +export * from "./rest.ts"; +export * from "./run_method.ts"; diff --git a/src/rest/queue.ts b/src/rest/process_queue.ts similarity index 72% rename from src/rest/queue.ts rename to src/rest/process_queue.ts index 90ee1fcc8..3deb3407d 100644 --- a/src/rest/queue.ts +++ b/src/rest/process_queue.ts @@ -1,15 +1,13 @@ -import { delay } from "../util/utils.ts"; -import { restCache } from "./cache.ts"; -import { createRequestBody, processRequestHeaders } from "./request.ts"; +import { rest } from "./rest.ts"; /** Processes the queue by looping over each path separately until the queues are empty. */ export async function processQueue(id: string) { - const queue = restCache.pathQueues.get(id); + const queue = rest.pathQueues.get(id); if (!queue) return; while (queue.length) { // IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN - if (restCache.globallyRateLimited) { + if (rest.globallyRateLimited) { setTimeout(() => processQueue(id), 1000); break; @@ -20,7 +18,7 @@ export async function processQueue(id: string) { if (!queuedRequest) return; // IF THIS URL IS STILL RATE LIMITED, TRY AGAIN - const urlResetIn = checkRateLimits(queuedRequest.request.url); + const urlResetIn = rest.checkRateLimits(queuedRequest.request.url); if (urlResetIn) { // PAUSE FOR THIS SPECIFC REQUEST await delay(urlResetIn); @@ -28,7 +26,7 @@ export async function processQueue(id: string) { } // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS const bucketResetIn = queuedRequest.payload.bucketId - ? checkRateLimits(queuedRequest.payload.bucketId) + ? rest.checkRateLimits(queuedRequest.payload.bucketId) : false; // THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE if (bucketResetIn) continue; @@ -48,21 +46,22 @@ export async function processQueue(id: string) { : queuedRequest.request.url; // CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE - restCache.eventHandlers.fetching(queuedRequest.payload); + rest.eventHandlers.fetching(queuedRequest.payload); try { const response = await fetch( urlToUse, - createRequestBody(queuedRequest), + rest.createRequestBody(queuedRequest), ); - restCache.eventHandlers.fetched(queuedRequest.payload); - const bucketIdFromHeaders = processRequestHeaders( + + rest.eventHandlers.fetched(queuedRequest.payload); + const bucketIdFromHeaders = rest.processRequestHeaders( queuedRequest.request.url, response.headers, ); if (response.status < 200 || response.status >= 400) { - restCache.eventHandlers.error( + rest.eventHandlers.error( "httpError", queuedRequest.payload, response, @@ -91,7 +90,7 @@ export async function processQueue(id: string) { // SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON if (response.status === 204) { - restCache.eventHandlers.fetchSuccess(queuedRequest.payload); + rest.eventHandlers.fetchSuccess(queuedRequest.payload); // REMOVE FROM QUEUE queue.shift(); queuedRequest.request.respond({ status: 204 }); @@ -108,7 +107,7 @@ export async function processQueue(id: string) { queuedRequest.payload.retryCount >= queuedRequest.options.maxRetryCount ) { - restCache.eventHandlers.retriesMaxed(queuedRequest.payload); + rest.eventHandlers.retriesMaxed(queuedRequest.payload); queuedRequest.request.respond( { status: 200, @@ -133,7 +132,7 @@ export async function processQueue(id: string) { continue; } - restCache.eventHandlers.fetchSuccess(queuedRequest.payload); + rest.eventHandlers.fetchSuccess(queuedRequest.payload); // REMOVE FROM QUEUE queue.shift(); queuedRequest.request.respond( @@ -142,7 +141,7 @@ export async function processQueue(id: string) { } } catch (error) { // SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR - restCache.eventHandlers.fetchFailed(queuedRequest.payload, error); + rest.eventHandlers.fetchFailed(queuedRequest.payload, error); queuedRequest.request.respond( { status: 404, body: JSON.stringify({ error }) }, ); @@ -152,33 +151,5 @@ export async function processQueue(id: string) { } // ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP - cleanupQueues(); -} - -/** Cleans up the queues by checking if there is nothing left and removing it. */ -export function cleanupQueues() { - for (const [key, queue] of restCache.pathQueues) { - if (queue.length) continue; - // REMOVE IT FROM CACHE - restCache.pathQueues.delete(key); - } - - // NO QUEUE LEFT, DISABLE THE QUEUE - if (!restCache.pathQueues.size) restCache.processingQueue = false; -} - -/** Check the rate limits for a url or a bucket. */ -export function checkRateLimits(url: string) { - const ratelimited = restCache.ratelimitedPaths.get(url); - const global = restCache.ratelimitedPaths.get("global"); - const now = Date.now(); - - if (ratelimited && now < ratelimited.resetTimestamp) { - return ratelimited.resetTimestamp - now; - } - if (global && now < global.resetTimestamp) { - return global.resetTimestamp - now; - } - - return false; + rest.cleanupQueues(); } diff --git a/src/rest/process_rate_limited_paths.ts b/src/rest/process_rate_limited_paths.ts new file mode 100644 index 000000000..c61061a8a --- /dev/null +++ b/src/rest/process_rate_limited_paths.ts @@ -0,0 +1,25 @@ +import { rest } from "./rest.ts"; + +/** This will create a infinite loop running in 1 seconds using tail recursion to keep rate limits clean. When a rate limit resets, this will remove it so the queue can proceed. */ +export function processRateLimitedPaths() { + const now = Date.now(); + + rest.ratelimitedPaths.forEach((value, key) => { + // IF THE TIME HAS NOT REACHED CANCEL + if (value.resetTimestamp > now) return; + // RATE LIMIT IS OVER, DELETE THE RATE LIMITER + rest.ratelimitedPaths.delete(key); + // IF IT WAS GLOBAL ALSO MARK THE GLOBAL VALUE AS FALSE + if (key === "global") rest.globallyRateLimited = false; + }); + + // ALL PATHS ARE CLEARED CAN CANCEL OUT! + if (!rest.ratelimitedPaths.size) { + rest.processingRateLimitedPaths = false; + return; + } else { + rest.processingRateLimitedPaths = true; + // RECHECK IN 1 SECOND + setTimeout(() => processRateLimitedPaths(), 1000); + } +} diff --git a/src/rest/process_request.ts b/src/rest/process_request.ts new file mode 100644 index 000000000..0e8350c34 --- /dev/null +++ b/src/rest/process_request.ts @@ -0,0 +1,33 @@ +/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */ +export function processRequest( + request: ServerRequest, + payload: RunMethodOptions, + options: RestServerOptions, +) { + const route = request.url.substring(request.url.indexOf("api/")); + const parts = route.split("/"); + // REMOVE THE API + parts.shift(); + // REMOVES THE VERSION NUMBER + if (parts[0]?.startsWith("v")) parts.shift(); + // SET THE NEW REQUEST URL + request.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`; + // REMOVE THE MAJOR PARAM + parts.shift(); + + const [id] = parts; + + const queue = restCache.pathQueues.get(id); + // IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE + if (queue) { + queue.push({ request, payload, options }); + } else { + // CREATES A NEW QUEUE + restCache.pathQueues.set(id, [{ + request, + payload, + options, + }]); + processQueue(id); + } +} diff --git a/src/rest/process_request_headers.ts b/src/rest/process_request_headers.ts new file mode 100644 index 000000000..6b07f381e --- /dev/null +++ b/src/rest/process_request_headers.ts @@ -0,0 +1,61 @@ +import { rest } from "./rest.ts"; + +/** Processes the rate limit headers and determines if it needs to be ratelimited and returns the bucket id if available */ +export function processRequestHeaders(url: string, headers: Headers) { + let ratelimited = false; + + // GET ALL NECESSARY HEADERS + const remaining = headers.get("x-ratelimit-remaining"); + const resetTimestamp = headers.get("x-ratelimit-reset"); + const retryAfter = headers.get("retry-after"); + const global = headers.get("x-ratelimit-global"); + const bucketId = headers.get("x-ratelimit-bucket"); + + // IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED + if (remaining && remaining === "0") { + ratelimited = true; + + // SAVE THE URL AS LIMITED, IMPORTANT FOR NEW REQUESTS BY USER WITHOUT BUCKET + rest.ratelimitedPaths.set(url, { + url, + resetTimestamp: Number(resetTimestamp) * 1000, + bucketId, + }); + + // SAVE THE BUCKET AS LIMITED SINCE DIFFERENT URLS MAY SHARE A BUCKET + if (bucketId) { + rest.ratelimitedPaths.set(bucketId, { + url, + resetTimestamp: Number(resetTimestamp) * 1000, + bucketId, + }); + } + } + + // IF THERE IS NO REMAINING GLOBAL LIMIT, MARK IT RATE LIMITED GLOBALLY + if (global) { + const reset = Date.now() + (Number(retryAfter) * 1000); + rest.eventHandlers.globallyRateLimited(url, reset); + rest.globallyRateLimited = true; + ratelimited = true; + + rest.ratelimitedPaths.set("global", { + url: "global", + resetTimestamp: reset, + bucketId, + }); + + if (bucketId) { + rest.ratelimitedPaths.set(bucketId, { + url: "global", + resetTimestamp: reset, + bucketId, + }); + } + } + + if (ratelimited && !rest.processingRateLimitedPaths) { + rest.processRateLimitedPaths(); + } + return ratelimited ? bucketId : undefined; +} diff --git a/src/rest/request.ts b/src/rest/request.ts deleted file mode 100644 index 75851df04..000000000 --- a/src/rest/request.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { BASE_URL, USER_AGENT } from "../util/constants.ts"; -import { restCache } from "./cache.ts"; -import { ServerRequest } from "./deps.ts"; -import { processQueue } from "./queue.ts"; - -/** Processes a request and assigns it to a queue or creates a queue if none exists for it. */ -export function processRequest( - request: ServerRequest, - payload: RunMethodOptions, - options: RestServerOptions, -) { - const route = request.url.substring(request.url.indexOf("api/")); - const parts = route.split("/"); - // REMOVE THE API - parts.shift(); - // REMOVES THE VERSION NUMBER - if (parts[0]?.startsWith("v")) parts.shift(); - // SET THE NEW REQUEST URL - request.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`; - // REMOVE THE MAJOR PARAM - parts.shift(); - - const [id] = parts; - - const queue = restCache.pathQueues.get(id); - // IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE - if (queue) { - queue.push({ request, payload, options }); - } else { - // CREATES A NEW QUEUE - restCache.pathQueues.set(id, [{ - request, - payload, - options, - }]); - processQueue(id); - } -} - -/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */ -export function createRequestBody(queuedRequest: QueuedRequest) { - const headers: { [key: string]: string } = { - Authorization: `Bot ${queuedRequest.options.token}`, - "User-Agent": USER_AGENT, - }; - - // GET METHODS SHOULD NOT HAVE A BODY - if (queuedRequest.request.method === "GET") { - queuedRequest.payload.body = undefined; - } - - // IF A REASON IS PROVIDED ENCODE IT IN HEADERS - if (queuedRequest.payload.body?.reason) { - headers["X-Audit-Log-Reason"] = encodeURIComponent( - queuedRequest.payload.body.reason, - ); - } - - // IF A FILE/ATTACHMENT IS PRESENT WE NEED SPECIAL HANDLING - if (queuedRequest.payload.body?.file) { - const form = new FormData(); - form.append( - "file", - queuedRequest.payload.body.file.blob, - queuedRequest.payload.body.file.name, - ); - form.append( - "payload_json", - JSON.stringify({ ...queuedRequest.payload.body, file: undefined }), - ); - queuedRequest.payload.body.file = form; - } else if ( - queuedRequest.payload.body && - !["GET", "DELETE"].includes(queuedRequest.request.method) - ) { - headers["Content-Type"] = "application/json"; - } - - return { - headers, - body: queuedRequest.payload.body?.file || - JSON.stringify(queuedRequest.payload.body), - method: queuedRequest.request.method, - }; -} - -/** Processes the rate limit headers and determines if it needs to be ratelimited and returns the bucket id if available */ -export function processRequestHeaders(url: string, headers: Headers) { - let ratelimited = false; - - // GET ALL NECESSARY HEADERS - const remaining = headers.get("x-ratelimit-remaining"); - const resetTimestamp = headers.get("x-ratelimit-reset"); - const retryAfter = headers.get("retry-after"); - const global = headers.get("x-ratelimit-global"); - const bucketId = headers.get("x-ratelimit-bucket"); - - // IF THERE IS NO REMAINING RATE LIMIT, MARK IT AS RATE LIMITED - if (remaining && remaining === "0") { - ratelimited = true; - - // SAVE THE URL AS LIMITED, IMPORTANT FOR NEW REQUESTS BY USER WITHOUT BUCKET - restCache.ratelimitedPaths.set(url, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketId, - }); - - // SAVE THE BUCKET AS LIMITED SINCE DIFFERENT URLS MAY SHARE A BUCKET - if (bucketId) { - restCache.ratelimitedPaths.set(bucketId, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketId, - }); - } - } - - // IF THERE IS NO REMAINING GLOBAL LIMIT, MARK IT RATE LIMITED GLOBALLY - if (global) { - const reset = Date.now() + (Number(retryAfter) * 1000); - restCache.eventHandlers.globallyRateLimited(url, reset); - restCache.globallyRateLimited = true; - ratelimited = true; - - restCache.ratelimitedPaths.set("global", { - url: "global", - resetTimestamp: reset, - bucketId, - }); - - if (bucketId) { - restCache.ratelimitedPaths.set(bucketId, { - url: "global", - resetTimestamp: reset, - bucketId, - }); - } - } - - return ratelimited ? bucketId : undefined; -} - -/** This wll create a infinite loop running in 1 seconds using tail recursion to keep rate limits clean. When a rate limit resets, this will remove it so the queue can proceed. */ -function processRateLimitedPaths() { - const now = Date.now(); - - restCache.ratelimitedPaths.forEach((value, key) => { - // IF THE TIME HAS NOT REACHED CANCEL - if (value.resetTimestamp > now) return; - // RATE LIMIT IS OVER, DELETE THE RATE LIMITER - restCache.ratelimitedPaths.delete(key); - // IF IT WAS GLOBAL ALSO MARK THE GLOBAL VALUE AS FALSE - if (key === "global") restCache.globallyRateLimited = false; - }); - - // RECHECK IN 1 SECOND - setTimeout(() => processRateLimitedPaths(), 1000); -} - -/** Starts the loop */ -processRateLimitedPaths(); diff --git a/src/rest/request_manager.ts b/src/rest/request_manager.ts index 7a13f77d0..23319ab06 100644 --- a/src/rest/request_manager.ts +++ b/src/rest/request_manager.ts @@ -1,436 +1,19 @@ -import { authorization, eventHandlers, restAuthorization } from "../bot.ts"; -import { - API_VERSION, - BASE_URL, - baseEndpoints, - IMAGE_BASE_URL, - USER_AGENT, -} from "../util/constants.ts"; -import { delay } from "../util/utils.ts"; - -const pathQueues: { [key: string]: QueuedRequest[] } = {}; -const ratelimitedPaths = new Map(); -let globallyRateLimited = false; -let queueInProcess = false; - -async function processRateLimitedPaths() { - const now = Date.now(); - ratelimitedPaths.forEach((value, key) => { - if (value.resetTimestamp > now) return; - ratelimitedPaths.delete(key); - if (key === "global") globallyRateLimited = false; - }); - - await delay(1000); - await processRateLimitedPaths(); -} - -function addToQueue(request: QueuedRequest) { - const route = request.url.substring(baseEndpoints.BASE_URL.length + 1); - const parts = route.split("/"); - // Remove the major param - parts.shift(); - const [id] = parts; - - if (pathQueues[id]) { - pathQueues[id].push(request); - } else { - pathQueues[id] = [request]; - } -} - -function cleanupQueues() { - Object.entries(pathQueues).forEach(([key, value]) => { - if (!value.length) { - // Remove it entirely - delete pathQueues[key]; - } - }); -} - -async function processQueue() { - while (queueInProcess) { - if ( - (Object.keys(pathQueues).length) && !globallyRateLimited - ) { - await Promise.allSettled( - Object.values(pathQueues).map(async (pathQueue) => { - const request = pathQueue.shift(); - if (!request) return; - - const rateLimitedURLResetIn = await checkRatelimits(request.url); - - if (request.bucketId) { - const rateLimitResetIn = await checkRatelimits(request.bucketId); - if (rateLimitResetIn) { - // This request is still rate limited readd to queue - addToQueue(request); - } else if (rateLimitedURLResetIn) { - // This URL is rate limited readd to queue - addToQueue(request); - } else { - // This request is not rate limited so it should be run - const result = await request.callback(); - if (result && result.rateLimited) { - addToQueue( - { ...request, bucketId: result.bucketId || request.bucketId }, - ); - } - } - } else { - if (rateLimitedURLResetIn) { - // This URL is rate limited readd to queue - addToQueue(request); - } else { - // This request has no bucket id so it should be processed - const result = await request.callback(); - if (request && result && result.rateLimited) { - addToQueue( - { ...request, bucketId: result.bucketId || request.bucketId }, - ); - } - } - } - }), - ); - } - - if (Object.keys(pathQueues).length) { - cleanupQueues(); - } else queueInProcess = false; - } -} - -processRateLimitedPaths(); +import { rest } from "./rest.ts"; export const RequestManager = { get: (url: string, body?: unknown) => { - return runMethod("get", url, body); + return rest.runMethod("get", url, body); }, post: (url: string, body?: unknown) => { - return runMethod("post", url, body); + return rest.runMethod("post", url, body); }, delete: (url: string, body?: unknown) => { - return runMethod("delete", url, body); + return rest.runMethod("delete", url, body); }, patch: (url: string, body?: unknown) => { - return runMethod("patch", url, body); + return rest.runMethod("patch", url, body); }, put: (url: string, body?: unknown) => { - return runMethod("put", url, body); + return rest.runMethod("put", url, body); }, }; - -// deno-lint-ignore no-explicit-any -function createRequestBody(body: any, method: RequestMethods) { - const headers: { [key: string]: string } = { - Authorization: authorization, - "User-Agent": USER_AGENT, - }; - - if (method === "get") body = undefined; - - if (body?.reason) { - headers["X-Audit-Log-Reason"] = encodeURIComponent(body.reason); - } - - if (body?.file) { - if (!Array.isArray(body.file)) body.file = [body.file]; - - const form = new FormData(); - - body.file.map((file: FileContent, index: number) => - // The key of the form data item must be unique; otherwise, Discordeno only considers the first item in the form data with the same names - form.append(`file${index + 1}`, file.blob, file.name) - ); - - form.append("payload_json", JSON.stringify({ ...body, file: undefined })); - body.file = form; - } else if ( - body && !["get", "delete"].includes(method) - ) { - headers["Content-Type"] = "application/json"; - } - - return { - headers, - body: body?.file || JSON.stringify(body), - method: method.toUpperCase(), - }; -} - -function checkRatelimits(url: string) { - const ratelimited = ratelimitedPaths.get(url); - const global = ratelimitedPaths.get("global"); - const now = Date.now(); - - if (ratelimited && now < ratelimited.resetTimestamp) { - return ratelimited.resetTimestamp - now; - } - if (global && now < global.resetTimestamp) { - return global.resetTimestamp - now; - } - - return false; -} - -function runMethod( - method: RequestMethods, - url: string, - body?: unknown, - retryCount = 0, - bucketId?: string | null, -) { - eventHandlers.debug?.( - { - type: "requestCreate", - data: { method, url, body, retryCount, bucketId }, - }, - ); - - const errorStack = new Error("Location:"); - Error.captureStackTrace(errorStack); - - // For proxies we don't need to do any of the legwork so we just forward the request - if ( - !url.startsWith(`${BASE_URL}/v${API_VERSION}`) && - !url.startsWith(IMAGE_BASE_URL) - ) { - return fetch(url, { - body: JSON.stringify(body || {}), - headers: { - authorization: restAuthorization, - }, - method: method.toUpperCase(), - }) - .then((res) => { - if (res.status === 204) return undefined; - - return res.json(); - }) - .catch((error) => { - console.error(error); - throw errorStack; - }); - } - - // No proxy so we need to handle all rate limiting and such - // deno-lint-ignore no-async-promise-executor - return new Promise(async (resolve, reject) => { - const callback = async () => { - try { - const rateLimitResetIn = await checkRatelimits(url); - if (rateLimitResetIn) { - return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketId }; - } - - const query = method === "get" && body - ? // deno-lint-ignore no-explicit-any - Object.entries(body as any).map(([key, value]) => - // deno-lint-ignore no-explicit-any - `${encodeURIComponent(key)}=${encodeURIComponent(value as any)}` - ) - .join("&") - : ""; - const urlToUse = method === "get" && query ? `${url}?${query}` : url; - - eventHandlers.debug?.( - { - type: "requestFetch", - data: { method, url, body, retryCount, bucketId }, - }, - ); - const response = await fetch(urlToUse, createRequestBody(body, method)); - eventHandlers.debug?.( - { - type: "requestFetched", - data: { method, url, body, retryCount, bucketId, response }, - }, - ); - const bucketIdFromHeaders = processHeaders(url, response.headers); - await handleStatusCode(response, errorStack); - - // Sometimes Discord returns an empty 204 response that can't be made to JSON. - if (response.status === 204) return resolve(undefined); - - const json = await response.json(); - if ( - json.retry_after || - json.message === "You are being rate limited." - ) { - if (retryCount > 10) { - eventHandlers.debug?.( - { - type: "error", - data: { method, url, body, retryCount, bucketId, errorStack }, - }, - ); - throw new Error(Errors.RATE_LIMIT_RETRY_MAXED); - } - - return { - rateLimited: json.retry_after, - beforeFetch: false, - bucketId: bucketIdFromHeaders, - }; - } - - eventHandlers.debug?.( - { - type: "requestSuccess", - data: { method, url, body, retryCount, bucketId }, - }, - ); - return resolve(json); - } catch (error) { - eventHandlers.debug?.( - { - type: "error", - data: { method, url, body, retryCount, bucketId, errorStack }, - }, - ); - return reject(error); - } - }; - - addToQueue({ - callback, - bucketId, - url, - }); - if (!queueInProcess) { - queueInProcess = true; - await processQueue(); - } - }); -} - -async function logErrors(response: Response, errorStack?: unknown) { - try { - const error = await response.json(); - console.error(Deno.inspect(error, { depth: 10 })); - - eventHandlers.debug?.({ type: "error", data: { errorStack, error } }); - } catch { - eventHandlers.debug?.( - { - type: "error", - data: { errorStack }, - }, - ); - console.error(response); - } -} - -async function handleStatusCode(response: Response, errorStack?: unknown) { - const status = response.status; - - if ( - (status >= 200 && status < 400) || - status === HttpResponseCode.TooManyRequests - ) { - return true; - } - - await logErrors(response, errorStack); - - switch (status) { - case HttpResponseCode.BadRequest: - console.error( - "The request was improperly formatted, or the server couldn't understand it.", - ); - throw errorStack; - case HttpResponseCode.Unauthorized: - console.error("The Authorization header was missing or invalid."); - throw errorStack; - case HttpResponseCode.Forbidden: - console.error( - "The Authorization token you passed did not have permission to the resource.", - ); - throw errorStack; - case HttpResponseCode.NotFound: - console.error("The resource at the location specified doesn't exist."); - throw errorStack; - case HttpResponseCode.MethodNotAllowed: - console.error( - "The HTTP method used is not valid for the location specified.", - ); - throw errorStack; - case HttpResponseCode.GatewayUnavailable: - console.error( - "There was not a gateway available to process your request. Wait a bit and retry.", - ); - throw errorStack; - // left are all unknown - default: - console.error(Errors.REQUEST_UNKNOWN_ERROR); - throw errorStack; - } -} - -function processHeaders(url: string, headers: Headers) { - let ratelimited = false; - - // Get all useful headers - const remaining = headers.get("x-ratelimit-remaining"); - const resetTimestamp = headers.get("x-ratelimit-reset"); - const retryAfter = headers.get("retry-after"); - const global = headers.get("x-ratelimit-global"); - const bucketId = headers.get("x-ratelimit-bucket"); - - // If there is no remaining rate limit for this endpoint, we save it in cache - if (remaining && remaining === "0") { - ratelimited = true; - - ratelimitedPaths.set(url, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketId, - }); - - if (bucketId) { - ratelimitedPaths.set(bucketId, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketId, - }); - } - } - - // If there is no remaining global limit, we save it in cache - if (global) { - const reset = Date.now() + (Number(retryAfter) * 1000); - eventHandlers.debug?.( - { type: "globallyRateLimited", data: { url, reset } }, - ); - globallyRateLimited = true; - ratelimited = true; - - ratelimitedPaths.set("global", { - url: "global", - resetTimestamp: reset, - bucketId, - }); - - if (bucketId) { - ratelimitedPaths.set(bucketId, { - url: "global", - resetTimestamp: reset, - bucketId, - }); - } - } - - if (ratelimited) { - eventHandlers.rateLimit?.({ - remaining, - bucketId, - global, - resetTimestamp, - retryAfter, - url, - }); - } - - return ratelimited ? bucketId : undefined; -} diff --git a/src/rest/rest.ts b/src/rest/rest.ts new file mode 100644 index 000000000..3c058f57c --- /dev/null +++ b/src/rest/rest.ts @@ -0,0 +1,47 @@ +import { checkRateLimits } from "./check_rate_limits.ts"; +import { cleanupQueues } from "./cleanup_queues.ts"; +import { createRequestBody } from "./create_request_body.ts"; +import { handlePayload } from "./handle_payload.ts"; +import { processQueue } from "./process_queue.ts"; +import { processRateLimitedPaths } from "./process_rate_limited_paths.ts"; +import { processRequest } from "./process_request.ts"; +import { processRequestHeaders } from "./process_request_headers.ts"; +import { RequestManager } from "./request_manager.ts"; +import { runMethod } from "./run_method.ts"; + +export const rest: RestCache = { + /** The secret authorization key to confirm that this was a request made by you and not a DDOS attack. */ + authorization: "discordeno_best_lib_ever", + pathQueues: new Map(), + processingQueue: false, + processingRateLimitedPaths: false, + globallyRateLimited: false, + ratelimitedPaths: new Map(), + eventHandlers: { + // BY DEFAULT WE WILL LOG ALL ERRORS TO CONSOLE. USER CAN CHOOSE TO OVERRIDE + error: function (_type, error) { + console.error(error); + }, + // PLACEHOLDERS TO ALLOW USERS TO CUSTOMIZE + fetching() {}, + fetched() {}, + fetchSuccess() {}, + fetchFailed() {}, + globallyRateLimited() {}, + retriesMaxed() {}, + }, + + // TODO: add propeer docs dcomments + manager: RequestManager, + + /** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */ + handlePayload, + checkRateLimits, + cleanupQueues, + processQueue, + processRateLimitedPaths, + processRequestHeaders, + processRequest, + createRequestBody, + runMethod, +}; diff --git a/src/rest/run_method.ts b/src/rest/run_method.ts new file mode 100644 index 000000000..b8f69712c --- /dev/null +++ b/src/rest/run_method.ts @@ -0,0 +1,135 @@ +import { rest } from "./rest.ts"; + +export function runMethod( + method: RequestMethods, + url: string, + body?: unknown, + retryCount = 0, + bucketId?: string | null, +) { + rest.eventHandlers.debug?.( + { + type: "requestCreate", + data: { method, url, body, retryCount, bucketId }, + }, + ); + + const errorStack = new Error("Location:"); + Error.captureStackTrace(errorStack); + + // For proxies we don't need to do any of the legwork so we just forward the request + if ( + !url.startsWith(`${BASE_URL}/v${API_VERSION}`) && + !url.startsWith(IMAGE_BASE_URL) + ) { + return fetch(url, { + body: JSON.stringify(body || {}), + headers: { + authorization: rest.authorization, + }, + method: method.toUpperCase(), + }) + .then((res) => { + if (res.status === 204) return undefined; + + return res.json(); + }) + .catch((error) => { + console.error(error); + throw errorStack; + }); + } + + // No proxy so we need to handle all rate limiting and such + // deno-lint-ignore no-async-promise-executor + return new Promise(async (resolve, reject) => { + const callback = async () => { + try { + const rateLimitResetIn = await rest.checkRatelimits(url); + if (rateLimitResetIn) { + return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketId }; + } + + const query = method === "get" && body + ? // deno-lint-ignore no-explicit-any + Object.entries(body as any).map(([key, value]) => + // deno-lint-ignore no-explicit-any + `${encodeURIComponent(key)}=${encodeURIComponent(value as any)}` + ) + .join("&") + : ""; + const urlToUse = method === "get" && query ? `${url}?${query}` : url; + + rest.eventHandlers.debug?.( + { + type: "requestFetch", + data: { method, url, body, retryCount, bucketId }, + }, + ); + const response = await fetch( + urlToUse, + rest.createRequestBody(body, method), + ); + rest.eventHandlers.debug?.( + { + type: "requestFetched", + data: { method, url, body, retryCount, bucketId, response }, + }, + ); + const bucketIdFromHeaders = rest.processHeaders(url, response.headers); + await rest.handleStatusCode(response, errorStack); + + // Sometimes Discord returns an empty 204 response that can't be made to JSON. + if (response.status === 204) return resolve(undefined); + + const json = await response.json(); + if ( + json.retry_after || + json.message === "You are being rate limited." + ) { + if (retryCount > 10) { + rest.eventHandlers.debug?.( + { + type: "error", + data: { method, url, body, retryCount, bucketId, errorStack }, + }, + ); + throw new Error(Errors.RATE_LIMIT_RETRY_MAXED); + } + + return { + rateLimited: json.retry_after, + beforeFetch: false, + bucketId: bucketIdFromHeaders, + }; + } + + rest.eventHandlers.debug?.( + { + type: "requestSuccess", + data: { method, url, body, retryCount, bucketId }, + }, + ); + return resolve(json); + } catch (error) { + rest.eventHandlers.debug?.( + { + type: "error", + data: { method, url, body, retryCount, bucketId, errorStack }, + }, + ); + return reject(error); + } + }; + + rest.addToQueue({ + callback, + bucketId, + url, + }); + if (!rest.queueInProcess) { + rest.queueInProcess = true; + await rest.processQueue(); + } + }); +} diff --git a/src/rest/server.ts b/src/rest/server.ts deleted file mode 100644 index 220b7a598..000000000 --- a/src/rest/server.ts +++ /dev/null @@ -1,54 +0,0 @@ -// SERVERLESS REST CLIENT THAT CAN WORK ACROSS SHARDS/WORKERS TO COMMUNICATE GLOBAL RATE LIMITS EASILY -import { restCache } from "./cache.ts"; -import { serve, ServerRequest, serveTLS } from "./deps.ts"; -import { processRequest } from "./request.ts"; - -/** Begins an http server that will handle incoming requests. */ -export async function startRESTServer(options: RestServerOptions) { - const server = options.keys - ? serveTLS({ - port: options.port, - certFile: options.keys.cert, - keyFile: options.keys.key, - }) - : serve({ port: options.port }); - - for await (const request of server) { - handlePayload(request, options).catch((error) => { - restCache.eventHandlers.error("processRequest", error); - }); - } -} - -/** Handler function for every request. Converts to json, verified authorization & requirements and begins processing the request */ -async function handlePayload( - request: ServerRequest, - options: RestServerOptions, -) { - // INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY - const authorization = request.headers.get("authorization"); - if (authorization !== options.authorization) return; - // READ BUFFER AFTER AUTH CHECK - const buffer = await Deno.readAll(request.body); - try { - // CONVERT THE BODY TO JSON - const data = JSON.parse(new TextDecoder().decode(buffer)); - if ( - !["GET", "POST", "PUT", "PATCH", "HEAD", "DELETE"].includes( - request.method, - ) - ) { - return request.respond( - { - status: 400, - body: JSON.stringify({ error: "Invalid METHOD." }), - }, - ); - } - - // PROCESS THE REQUEST - processRequest(request, { body: data, retryCount: 0 }, options); - } catch (error) { - restCache.eventHandlers.error("serverRequest", error); - } -}