itoh is love itoh is bae this is his stuff

This commit is contained in:
Skillz
2021-02-22 12:01:32 -05:00
parent 15dc027b92
commit 6eca60777c
8 changed files with 61 additions and 27 deletions
+10 -6
View File
@@ -127,9 +127,9 @@ export async function getPins(channelID: string) {
return Promise.all(result.map((res) => structures.createMessage(res)));
}
/**
* Trigger a typing indicator for the specified channel. Generally bots should **NOT** implement this route.
* However, if a bot is responding to a command and expects the computation to take a few seconds,
/**
* Trigger a typing indicator for the specified channel. Generally bots should **NOT** implement this route.
* However, if a bot is responding to a command and expects the computation to take a few seconds,
* this endpoint may be called to let the user know that the bot is processing their message.
*/
export async function startTyping(channelID: string) {
@@ -240,9 +240,13 @@ export async function sendMessage(
replied_user: content.mentions.repliedUser,
}
: undefined,
message_reference: {
message_id: content.replyMessageID,
},
...(content.replyMessageID
? {
message_reference: {
message_id: content.replyMessageID,
},
}
: {}),
},
) as MessageCreateOptions;
+1 -1
View File
@@ -1 +1 @@
export * from "https://deno.land/std@0.87.0/http/server.ts";
export * from "https://deno.land/std@0.88.0/http/server.ts";
+24 -12
View File
@@ -1,6 +1,7 @@
import { restCache } from "./cache.ts";
import { createRequestBody, processRequestHeaders } from "./request.ts";
import { HttpResponseCode } from "./types/mod.ts";
import { delay } from "../util/utils.ts";
/** If the queue is not already processing, this will start processing the queue. */
export function startQueue() {
@@ -12,19 +13,24 @@ export function startQueue() {
}
/** Processes the queue by looping over each path separately until the queues are empty. */
export function processQueue() {
export async function processQueue() {
while (restCache.processingQueue) {
// FOR EVERY PATH WE WILL START ITS OWN LOOP.
restCache.pathQueues.forEach(async (queue) => {
// MAKE SURE THIS QUEUE HAS NOT ALREADY STARTED
if (queue.processing) return;
// EACH PATH IS UNIQUE LIMITER
while (queue.length) {
while (queue.requests.length) {
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (!restCache.globallyRateLimited) continue;
if (restCache.globallyRateLimited) continue;
// SELECT THE FIRST ITEM FROM THIS QUEUE
const [queuedRequest] = queue;
const [queuedRequest] = queue.requests;
// IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) return;
// MARK THIS QUEUE AS NOW BEING PROCESSED
queue.processing = true;
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = checkRateLimits(queuedRequest.payload.url);
if (urlResetIn) continue;
@@ -90,7 +96,7 @@ export function processQueue() {
queuedRequest.request.respond(
{ status: response.status, body: JSON.stringify({ error }) },
);
queue.shift();
queue.requests.shift();
continue;
}
@@ -102,7 +108,6 @@ export function processQueue() {
// CONVERT THE RESPONSE TO JSON
const json = await response.json();
// IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY
if (
json.retry_after ||
@@ -126,7 +131,7 @@ export function processQueue() {
},
);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
queue.shift();
queue.requests.shift();
continue;
}
@@ -140,7 +145,7 @@ export function processQueue() {
restCache.eventHandlers.fetchSuccess(queuedRequest.payload);
// REMOVE FROM QUEUE
queue.shift();
queue.requests.shift();
queuedRequest.request.respond(
{ status: 200, body: JSON.stringify(json) },
);
@@ -151,23 +156,30 @@ export function processQueue() {
{ status: 404, body: JSON.stringify({ error }) },
);
// REMOVE FROM QUEUE
queue.shift();
queue.requests.shift();
}
}
// MARK THE QUEUE AS NO LONGER PROCESSING
queue.processing = false;
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
cleanupQueues();
});
await delay(1000);
}
}
/** Cleans up the queues by checking if there is nothing left and removing it. */
export function cleanupQueues() {
restCache.pathQueues.forEach((queue, key) => {
if (queue.length) return;
for (const [key, queue] of restCache.pathQueues) {
if (queue.requests.length) continue;
// REMOVE IT FROM CACHE
restCache.pathQueues.delete(key);
});
}
// NO QUEUE LEFT, DISABLE THE QUEUE
if (!restCache.pathQueues.size) restCache.processingQueue = false;
}
/** Check the rate limits for a url or a bucket. */
+14 -4
View File
@@ -1,4 +1,4 @@
import { USER_AGENT } from "../util/constants.ts";
import { BASE_URL, USER_AGENT } from "../util/constants.ts";
import { restCache } from "./cache.ts";
import { ServerRequest } from "./deps.ts";
import { startQueue } from "./queue.ts";
@@ -20,6 +20,8 @@ export function processRequest(
parts.shift();
// REMOVES THE VERSION NUMBER
if (parts[0]?.startsWith("v")) parts.shift();
// SET THE NEW REQUEST URL
payload.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`;
// REMOVE THE MAJOR PARAM
parts.shift();
@@ -28,19 +30,27 @@ export function processRequest(
const queue = restCache.pathQueues.get(id);
// IF THE QUEUE EXISTS JUST ADD THIS TO THE QUEUE
if (queue) {
queue.push({ request, payload, options });
queue.requests.push({ request, payload, options });
} else {
// CREATES A NEW QUEUE
restCache.pathQueues.set(id, [{ request, payload, options }]);
restCache.pathQueues.set(id, {
processing: false,
requests: [{
request,
payload,
options,
}],
});
}
console.log("starting queue");
startQueue();
}
/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */
export function createRequestBody(queuedRequest: QueuedRequest) {
const headers: { [key: string]: string } = {
Authorization: queuedRequest.options.token,
Authorization: `Bot ${queuedRequest.options.token}`,
"User-Agent": USER_AGENT,
};
+1 -1
View File
@@ -225,7 +225,7 @@ function runMethod(
body: JSON.stringify({
url,
method,
...(body as Record<string, unknown> || {}),
body: body || {},
}),
headers: {
authorization: restAuthorization,
+1 -2
View File
@@ -23,7 +23,6 @@ async function handlePayload(
// INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY
const authorization = request.headers.get("authorization");
if (authorization !== options.authorization) return;
// READ BUFFER AFTER AUTH CHECK
const buffer = await Deno.readAll(request.body);
@@ -48,7 +47,7 @@ async function handlePayload(
}
// PROCESS THE REQUEST
await processRequest(
processRequest(
request,
{ method: data.method, url: data.url, body: data.body, retryCount: 0 },
options,
+8 -1
View File
@@ -3,7 +3,7 @@ import { RestEventHandlers } from "./server.ts";
export interface RestCache {
/** The queues that are currently needing to be executed. Key is the url path and the value is all the requests in this same path. Paths are mapped by MAJOR params. */
pathQueues: Map<string, QueuedRequest[]>;
pathQueues: Map<string, Queue>;
/** Whether or not the queues are currently processing. */
processingQueue: boolean;
/** Whether or not this token has been globally rate limited. */
@@ -13,3 +13,10 @@ export interface RestCache {
/** The event handlers are functions that run when something is happening internally. Users can customize this for analytics, debugging, logging or anything their heart desires. */
eventHandlers: RestEventHandlers;
}
export interface Queue {
/** Whether or not this queue is being processed */
processing: boolean;
/** All the requests in this queue. */
requests: QueuedRequest[];
}
+2
View File
@@ -9,6 +9,8 @@ export interface RestServerOptions {
token: string;
/** When a request is rate limited, how many times should it keep retrying the request. Recommended: 10 */
maxRetryCount: number;
/** The api version you would like to use */
apiVersion?: number;
}
export interface RestEventHandlers {