Merge branch 'master' of https://github.com/discordeno/discordeno into proxy-ws

This commit is contained in:
Skillz
2021-02-25 12:53:25 -05:00
10 changed files with 180 additions and 184 deletions

View File

@@ -127,9 +127,9 @@ export async function getPins(channelID: string) {
return Promise.all(result.map((res) => structures.createMessage(res)));
}
/**
* Trigger a typing indicator for the specified channel. Generally bots should **NOT** implement this route.
* However, if a bot is responding to a command and expects the computation to take a few seconds,
/**
* Trigger a typing indicator for the specified channel. Generally bots should **NOT** implement this route.
* However, if a bot is responding to a command and expects the computation to take a few seconds,
* this endpoint may be called to let the user know that the bot is processing their message.
*/
export async function startTyping(channelID: string) {
@@ -240,9 +240,13 @@ export async function sendMessage(
replied_user: content.mentions.repliedUser,
}
: undefined,
message_reference: {
message_id: content.replyMessageID,
},
...(content.replyMessageID
? {
message_reference: {
message_id: content.replyMessageID,
},
}
: {}),
},
) as MessageCreateOptions;

View File

@@ -1,2 +1,2 @@
export { serve } from "https://deno.land/std@0.87.0/http/server.ts";
export { verify } from "https://esm.sh/@evan/wasm@0.0.41/target/ed25519/deno.js";
export { verify } from "https://esm.sh/@evan/wasm@0.0.49/target/ed25519/deno.js";

View File

@@ -1 +1 @@
export * from "https://deno.land/std@0.87.0/http/server.ts";
export * from "https://deno.land/std@0.88.0/http/server.ts";

View File

@@ -1,173 +1,169 @@
import { delay } from "../util/utils.ts";
import { restCache } from "./cache.ts";
import { createRequestBody, processRequestHeaders } from "./request.ts";
import { HttpResponseCode } from "./types/mod.ts";
/** If the queue is not already processing, this will start processing the queue. */
export function startQueue() {
// IF ALREADY PROCESSING CANCEL
if (restCache.processingQueue) return;
// MARK AS PROCESSING
restCache.processingQueue = true;
processQueue();
}
/** Processes the queue by looping over each path separately until the queues are empty. */
export function processQueue() {
while (restCache.processingQueue) {
// FOR EVERY PATH WE WILL START ITS OWN LOOP.
restCache.pathQueues.forEach(async (queue) => {
// EACH PATH IS UNIQUE LIMITER
while (queue.length) {
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (!restCache.globallyRateLimited) continue;
// SELECT THE FIRST ITEM FROM THIS QUEUE
const [queuedRequest] = queue;
// IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) return;
export async function processQueue(id: string) {
const queue = restCache.pathQueues.get(id);
if (!queue) return;
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = checkRateLimits(queuedRequest.payload.url);
if (urlResetIn) continue;
while (queue.length) {
// IF THE BOT IS GLOBALLY RATELIMITED TRY AGAIN
if (restCache.globallyRateLimited) {
setTimeout(() => processQueue(id), 1000);
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = queuedRequest.payload.bucketID
? checkRateLimits(queuedRequest.payload.bucketID)
: false;
// THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE
if (bucketResetIn) continue;
break;
}
// SELECT THE FIRST ITEM FROM THIS QUEUE
const [queuedRequest] = queue;
// IF THIS DOESNT HAVE ANY ITEMS JUST CANCEL, THE CLEANER WILL REMOVE IT.
if (!queuedRequest) return;
// EXECUTE THE REQUEST
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = checkRateLimits(queuedRequest.request.url);
if (urlResetIn) {
// PAUSE FOR THIS SPECIFC REQUEST
await delay(urlResetIn);
continue;
} // IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
// IF THIS IS A GET REQUEST, CHANGE THE BODY TO QUERY PARAMETERS
const query =
queuedRequest.payload.method === "get" && queuedRequest.payload.body
? Object.entries(queuedRequest.payload.body).map(([key, value]) =>
`${encodeURIComponent(key)}=${
encodeURIComponent(value as string)
}`
)
.join("&")
: "";
const urlToUse = queuedRequest.payload.method === "get" && query
? `${queuedRequest.payload.url}?${query}`
: queuedRequest.payload.url;
const bucketResetIn = queuedRequest.payload.bucketID
? checkRateLimits(queuedRequest.payload.bucketID)
: false;
// THIS BUCKET IS STILL RATELIMITED, RE-ADD TO QUEUE
if (bucketResetIn) continue;
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
restCache.eventHandlers.fetching(queuedRequest.payload);
// EXECUTE THE REQUEST
try {
const response = await fetch(
urlToUse,
createRequestBody(queuedRequest),
);
restCache.eventHandlers.fetched(queuedRequest.payload);
const bucketIDFromHeaders = processRequestHeaders(
queuedRequest.payload.url,
response.headers,
);
// IF THIS IS A GET REQUEST, CHANGE THE BODY TO QUERY PARAMETERS
const query =
queuedRequest.request.method === "GET" && queuedRequest.payload.body
? Object.entries(queuedRequest.payload.body).map(([key, value]) =>
`${encodeURIComponent(key)}=${encodeURIComponent(value as string)}`
)
.join("&")
: "";
const urlToUse = queuedRequest.request.method === "GET" && query
? `${queuedRequest.request.url}?${query}`
: queuedRequest.request.url;
if (response.status < 200 && response.status >= 400) {
restCache.eventHandlers.error(
"httpError",
queuedRequest.payload,
response,
);
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
restCache.eventHandlers.fetching(queuedRequest.payload);
const error = response.status === HttpResponseCode.BadRequest
? "The request was improperly formatted, or the server couldn't understand it."
: response.status === HttpResponseCode.Unauthorized
? "The Authorization header was missing or invalid."
: response.status === HttpResponseCode.Forbidden
? "The Authorization token you passed did not have permission to the resource."
: response.status === HttpResponseCode.NotFound
? "The resource at the location specified doesn't exist."
: response.status === HttpResponseCode.MethodNotAllowed
? "The HTTP method used is not valid for the location specified."
: response.status === HttpResponseCode.GatewayUnavailable
? "There was not a gateway available to process your request. Wait a bit and retry."
: "REQUEST_UNKNOWN_ERROR";
try {
const response = await fetch(
urlToUse,
createRequestBody(queuedRequest),
);
restCache.eventHandlers.fetched(queuedRequest.payload);
const bucketIDFromHeaders = processRequestHeaders(
queuedRequest.request.url,
response.headers,
);
if (response.status < 200 && response.status >= 400) {
restCache.eventHandlers.error(
"httpError",
queuedRequest.payload,
response,
);
const error = response.status === HttpResponseCode.BadRequest
? "The request was improperly formatted, or the server couldn't understand it."
: response.status === HttpResponseCode.Unauthorized
? "The Authorization header was missing or invalid."
: response.status === HttpResponseCode.Forbidden
? "The Authorization token you passed did not have permission to the resource."
: response.status === HttpResponseCode.NotFound
? "The resource at the location specified doesn't exist."
: response.status === HttpResponseCode.MethodNotAllowed
? "The HTTP method used is not valid for the location specified."
: response.status === HttpResponseCode.GatewayUnavailable
? "There was not a gateway available to process your request. Wait a bit and retry."
: "REQUEST_UNKNOWN_ERROR";
queuedRequest.request.respond(
{ status: response.status, body: JSON.stringify({ error }) },
);
queue.shift();
continue;
}
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) {
restCache.eventHandlers.fetchSuccess(queuedRequest.payload);
queuedRequest.request.respond({ status: 204 });
} else {
// CONVERT THE RESPONSE TO JSON
const json = await response.json();
// IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY
if (
json.retry_after ||
json.message === "You are being rate limited."
) {
// IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT.
if (
queuedRequest.payload.retryCount >=
queuedRequest.options.maxRetryCount
) {
restCache.eventHandlers.retriesMaxed(queuedRequest.payload);
queuedRequest.request.respond(
{ status: response.status, body: JSON.stringify({ error }) },
{
status: 200,
body: JSON.stringify(
{
error:
"The request was rate limited and it maxed out the retries limit.",
},
),
},
);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
queue.shift();
continue;
}
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) {
restCache.eventHandlers.fetchSuccess(queuedRequest.payload);
return queuedRequest.request.respond({ status: 204 });
// SET THE BUCKET ID IF IT WAS PRESENT
if (bucketIDFromHeaders) {
queuedRequest.payload.bucketID = bucketIDFromHeaders;
}
// CONVERT THE RESPONSE TO JSON
const json = await response.json();
// IF THE RESPONSE WAS RATE LIMITED, HANDLE ACCORDINGLY
if (
json.retry_after ||
json.message === "You are being rate limited."
) {
// IF IT HAS MAXED RETRIES SOMETHING SERIOUSLY WRONG. CANCEL OUT.
if (
queuedRequest.payload.retryCount >=
queuedRequest.options.maxRetryCount
) {
restCache.eventHandlers.retriesMaxed(queuedRequest.payload);
queuedRequest.request.respond(
{
status: 200,
body: JSON.stringify(
{
error:
"The request was rate limited and it maxed out the retries limit.",
},
),
},
);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
queue.shift();
continue;
}
// SET THE BUCKET ID IF IT WAS PRESENT
if (bucketIDFromHeaders) {
queuedRequest.payload.bucketID = bucketIDFromHeaders;
}
// SINCE IT WAS RATELIMITE, RETRY AGAIN
continue;
}
restCache.eventHandlers.fetchSuccess(queuedRequest.payload);
// REMOVE FROM QUEUE
queue.shift();
queuedRequest.request.respond(
{ status: 200, body: JSON.stringify(json) },
);
} catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
restCache.eventHandlers.fetchFailed(queuedRequest.payload, error);
queuedRequest.request.respond(
{ status: 404, body: JSON.stringify({ error }) },
);
// REMOVE FROM QUEUE
queue.shift();
// SINCE IT WAS RATELIMITE, RETRY AGAIN
continue;
}
}
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
cleanupQueues();
});
restCache.eventHandlers.fetchSuccess(queuedRequest.payload);
// REMOVE FROM QUEUE
queue.shift();
queuedRequest.request.respond(
{ status: 200, body: JSON.stringify(json) },
);
}
} catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
restCache.eventHandlers.fetchFailed(queuedRequest.payload, error);
queuedRequest.request.respond(
{ status: 404, body: JSON.stringify({ error }) },
);
// REMOVE FROM QUEUE
queue.shift();
}
}
// ONCE QUEUE IS DONE, WE CAN TRY CLEANING UP
cleanupQueues();
}
/** Cleans up the queues by checking if there is nothing left and removing it. */
export function cleanupQueues() {
restCache.pathQueues.forEach((queue, key) => {
if (queue.length) return;
for (const [key, queue] of restCache.pathQueues) {
if (queue.length) continue;
// REMOVE IT FROM CACHE
restCache.pathQueues.delete(key);
});
}
// NO QUEUE LEFT, DISABLE THE QUEUE
if (!restCache.pathQueues.size) restCache.processingQueue = false;
}
/** Check the rate limits for a url or a bucket. */

View File

@@ -1,7 +1,7 @@
import { USER_AGENT } from "../util/constants.ts";
import { BASE_URL, USER_AGENT } from "../util/constants.ts";
import { restCache } from "./cache.ts";
import { ServerRequest } from "./deps.ts";
import { startQueue } from "./queue.ts";
import { processQueue } from "./queue.ts";
import {
QueuedRequest,
RestServerOptions,
@@ -14,12 +14,14 @@ export function processRequest(
payload: RunMethodOptions,
options: RestServerOptions,
) {
const route = payload.url.substring(payload.url.indexOf("api/"));
const route = request.url.substring(request.url.indexOf("api/"));
const parts = route.split("/");
// REMOVE THE API
parts.shift();
// REMOVES THE VERSION NUMBER
if (parts[0]?.startsWith("v")) parts.shift();
// SET THE NEW REQUEST URL
request.url = `${BASE_URL}/v${options.apiVersion || 8}/${parts.join("/")}`;
// REMOVE THE MAJOR PARAM
parts.shift();
@@ -31,21 +33,24 @@ export function processRequest(
queue.push({ request, payload, options });
} else {
// CREATES A NEW QUEUE
restCache.pathQueues.set(id, [{ request, payload, options }]);
restCache.pathQueues.set(id, [{
request,
payload,
options,
}]);
processQueue(id);
}
startQueue();
}
/** Creates the request body and headers that are necessary to send a request. Will handle different types of methods and everything necessary for discord. */
export function createRequestBody(queuedRequest: QueuedRequest) {
const headers: { [key: string]: string } = {
Authorization: queuedRequest.options.token,
Authorization: `Bot ${queuedRequest.options.token}`,
"User-Agent": USER_AGENT,
};
// GET METHODS SHOULD NOT HAVE A BODY
if (queuedRequest.payload.method === "get") {
if (queuedRequest.request.method === "GET") {
queuedRequest.payload.body = undefined;
}
@@ -71,7 +76,7 @@ export function createRequestBody(queuedRequest: QueuedRequest) {
queuedRequest.payload.body.file = form;
} else if (
queuedRequest.payload.body &&
!["get", "delete"].includes(queuedRequest.payload.method)
!["GET", "DELETE"].includes(queuedRequest.request.method)
) {
headers["Content-Type"] = "application/json";
}
@@ -80,7 +85,7 @@ export function createRequestBody(queuedRequest: QueuedRequest) {
headers,
body: queuedRequest.payload.body?.file ||
JSON.stringify(queuedRequest.payload.body),
method: queuedRequest.payload.method.toUpperCase(),
method: queuedRequest.request.method,
};
}

View File

@@ -222,16 +222,17 @@ function runMethod(
!url.startsWith(IMAGE_BASE_URL)
) {
return fetch(url, {
body: JSON.stringify({
url,
method,
...(body as Record<string, unknown> || {}),
}),
body: JSON.stringify(body || {}),
headers: {
authorization: restAuthorization,
},
method: method.toUpperCase(),
})
.then((res) => res.json())
.then((res) => {
if (res.status === 204) return undefined;
return res.json();
})
.catch((error) => {
console.error(error);
throw errorStack;

View File

@@ -23,36 +23,26 @@ async function handlePayload(
// INSTANTLY IGNORE ANY REQUESTS THAT DON'T HAVE THE SECRET AUTHORIZATION KEY
const authorization = request.headers.get("authorization");
if (authorization !== options.authorization) return;
// READ BUFFER AFTER AUTH CHECK
const buffer = await Deno.readAll(request.body);
try {
// CONVERT THE BODY TO JSON
const data = JSON.parse(new TextDecoder().decode(buffer));
if (!data.url) {
if (
!["GET", "POST", "PUT", "PATCH", "HEAD", "DELETE"].includes(
request.method,
)
) {
return request.respond(
{
status: 400,
body: JSON.stringify({ error: "No URL was provided." }),
},
);
}
if (!data.method) {
return request.respond(
{
status: 400,
body: JSON.stringify({ error: "No METHOD was provided." }),
body: JSON.stringify({ error: "Invalid METHOD." }),
},
);
}
// PROCESS THE REQUEST
await processRequest(
request,
{ method: data.method, url: data.url, body: data.body, retryCount: 0 },
options,
);
processRequest(request, { body: data, retryCount: 0 }, options);
} catch (error) {
restCache.eventHandlers.error("serverRequest", error);
}

View File

@@ -7,8 +7,6 @@ export type RequestMethods =
| "delete";
export interface RunMethodOptions {
method: RequestMethods;
url: string;
retryCount: number;
// deno-lint-ignore no-explicit-any
body?: any;

View File

@@ -9,6 +9,8 @@ export interface RestServerOptions {
token: string;
/** When a request is rate limited, how many times should it keep retrying the request. Recommended: 10 */
maxRetryCount: number;
/** The api version you would like to use */
apiVersion?: number;
}
export interface RestEventHandlers {

View File

@@ -1 +1 @@
export { decompress_with as decompressWith } from "https://esm.sh/@evan/wasm@0.0.41/target/zlib/deno.js";
export { decompress_with as decompressWith } from "https://esm.sh/@evan/wasm@0.0.49/target/zlib/deno.js";