fix(gateway): rate-limit handling (#2079)

* fix: Closes Gateway Ratelimiting  #1886

* fix: fmt

* fix: invalid import
This commit is contained in:
Skillz4Killz
2022-02-28 13:39:45 -05:00
committed by GitHub
parent ac5f68fcf9
commit 4aac2246ad
9 changed files with 42 additions and 15 deletions
+11 -1
View File
@@ -1,5 +1,6 @@
import { GatewayIntents, GatewayPayload, StatusUpdate } from "../types/mod.ts"; import { GatewayIntents, GatewayPayload, StatusUpdate } from "../types/mod.ts";
import { Collection } from "../util/collection.ts"; import { Collection } from "../util/collection.ts";
import { safeRequestsPerShard } from "./safeRequestsPerShard.ts";
import { closeWS } from "./closeWs.ts"; import { closeWS } from "./closeWs.ts";
import { createShard } from "./createShard.ts"; import { createShard } from "./createShard.ts";
import { handleOnMessage } from "./handleOnMessage.ts"; import { handleOnMessage } from "./handleOnMessage.ts";
@@ -18,7 +19,7 @@ import { resume } from "./resume.ts";
import { sendShardMessage } from "./sendShardMessage.ts"; import { sendShardMessage } from "./sendShardMessage.ts";
import { prepareBuckets, spawnShards } from "./spawnShards.ts"; import { prepareBuckets, spawnShards } from "./spawnShards.ts";
import { tellWorkerToIdentify } from "./tellWorkerToIdentify.ts"; import { tellWorkerToIdentify } from "./tellWorkerToIdentify.ts";
import { DiscordenoShard } from "./ws.ts"; import { DiscordenoShard } from "./shard.ts";
/** Create a new Gateway Manager. /** Create a new Gateway Manager.
* *
@@ -30,6 +31,8 @@ export function createGatewayManager(
options: Partial<GatewayManager> & Pick<GatewayManager, "handleDiscordPayload">, options: Partial<GatewayManager> & Pick<GatewayManager, "handleDiscordPayload">,
): GatewayManager { ): GatewayManager {
return { return {
queueResetInterval: 60000,
maxRequestsPerInterval: 120,
cache: { cache: {
guildIds: new Set(), guildIds: new Set(),
loadingGuildIds: new Set(), loadingGuildIds: new Set(),
@@ -87,6 +90,7 @@ export function createGatewayManager(
closeWS: options.closeWS ?? closeWS, closeWS: options.closeWS ?? closeWS,
sendShardMessage: options.sendShardMessage ?? sendShardMessage, sendShardMessage: options.sendShardMessage ?? sendShardMessage,
resume: options.resume ?? resume, resume: options.resume ?? resume,
safeRequestsPerShard: options.safeRequestsPerShard ?? safeRequestsPerShard,
handleDiscordPayload: options.handleDiscordPayload, handleDiscordPayload: options.handleDiscordPayload,
}; };
} }
@@ -155,6 +159,10 @@ export interface GatewayManager {
} }
>; >;
utf8decoder: TextDecoder; utf8decoder: TextDecoder;
/** The amount of milliseconds the gateway rate limit will reset in. By default 60000 or 1 minute. */
queueResetInterval: number;
/** The maximum amount of requests that the gateway can make before being rate limited. By default 120. */
maxRequestsPerInterval: number;
cache: { cache: {
guildIds: Set<bigint>; guildIds: Set<bigint>;
@@ -205,4 +213,6 @@ export interface GatewayManager {
sendShardMessage: typeof sendShardMessage; sendShardMessage: typeof sendShardMessage;
/** Properly resume an old shards session. */ /** Properly resume an old shards session. */
resume: typeof resume; resume: typeof resume;
/** Calculates the number of requests in a shard that are safe to be used. */
safeRequestsPerShard: typeof safeRequestsPerShard;
} }
+2
View File
@@ -47,6 +47,8 @@ export async function handleOnMessage(gateway: GatewayManager, message: any, sha
break; break;
case GatewayOpcodes.Hello: case GatewayOpcodes.Hello:
gateway.heartbeat(gateway, shardId, (messageData.d as DiscordHello).heartbeat_interval); gateway.heartbeat(gateway, shardId, (messageData.d as DiscordHello).heartbeat_interval);
// UPDATES THE SAFE AMOUNT OF SHARDS BASED ON THE INTERVAL
if (shard) shard.safeRequestsPerShard = gateway.safeRequestsPerShard(gateway, shard);
break; break;
case GatewayOpcodes.HeartbeatACK: case GatewayOpcodes.HeartbeatACK:
if (gateway.shards.has(shardId)) { if (gateway.shards.has(shardId)) {
+2
View File
@@ -36,6 +36,8 @@ export function identify(gateway: GatewayManager, shardId: number, maxShards: nu
processingQueue: false, processingQueue: false,
queueStartedAt: Date.now(), queueStartedAt: Date.now(),
queueCounter: 0, queueCounter: 0,
// BY DEFAULT SET TO 120. EDIT IN HELLO
safeRequestsPerShard: 120,
}); });
socket.onopen = () => { socket.onopen = () => {
+2 -1
View File
@@ -10,5 +10,6 @@ export * from "./spawnShards.ts";
export * from "./sendShardMessage.ts"; export * from "./sendShardMessage.ts";
export * from "./startGatewayOptions.ts"; export * from "./startGatewayOptions.ts";
export * from "./tellWorkerToIdentify.ts"; export * from "./tellWorkerToIdentify.ts";
export * from "./ws.ts"; export * from "./shard.ts";
export * from "./gateway_manager.ts"; export * from "./gateway_manager.ts";
export * from "./safeRequestsPerShard.ts";
+12 -8
View File
@@ -15,7 +15,7 @@ export async function processGatewayQueue(gateway: GatewayManager, id: number) {
} }
const now = Date.now(); const now = Date.now();
if (now - shard.queueStartedAt >= 60000) { if (now - shard.queueStartedAt >= gateway.queueResetInterval) {
shard.queueStartedAt = now; shard.queueStartedAt = now;
shard.queueCounter = 0; shard.queueCounter = 0;
} }
@@ -28,16 +28,20 @@ export async function processGatewayQueue(gateway: GatewayManager, id: number) {
shard.ws.send(JSON.stringify(request)); shard.ws.send(JSON.stringify(request));
// Counter is useful for preventing 120/m requests. // Counter is useful for preventing max requests.
shard.queueCounter++; shard.queueCounter++;
// Handle if the requests have been maxed // Handle if the requests have been maxed
if (shard.queueCounter >= 116) { if (shard.queueCounter >= shard.safeRequestsPerShard) {
gateway.debug("GW MAX_REQUESTS", { const remaining = shard.queueStartedAt + gateway.queueResetInterval - Date.now();
message: "Max gateway requests per minute reached setting timeout for one minute", if (remaining > 0) {
shardId: shard.id, gateway.debug("GW MAX REQUESTS", {
}); message: `Max gateway requests per minute reached setting timeout for ${remaining}ms`,
await delay(60000); shardId: shard.id,
});
await delay(remaining);
}
shard.queueCounter = 0; shard.queueCounter = 0;
continue; continue;
} }
+1
View File
@@ -43,6 +43,7 @@ export function resume(gateway: GatewayManager, shardId: number) {
processingQueue: false, processingQueue: false,
queueStartedAt: Date.now(), queueStartedAt: Date.now(),
queueCounter: 0, queueCounter: 0,
safeRequestsPerShard: oldShard.safeRequestsPerShard || 120,
}); });
// Resume on open // Resume on open
+9
View File
@@ -0,0 +1,9 @@
import { GatewayManager } from "./gateway_manager.ts";
import { DiscordenoShard } from "./shard.ts";
export function safeRequestsPerShard(gateway: GatewayManager, shard: DiscordenoShard) {
// * 2 adds extra safety layer for discords OP 1 requests that we need to respond to
const safeRequests = gateway.maxRequestsPerInterval -
Math.ceil(gateway.queueResetInterval / shard.heartbeat.interval) * 2;
return safeRequests > 0 ? safeRequests : 0;
}
+1 -1
View File
@@ -1,5 +1,5 @@
import { GatewayManager } from "./gateway_manager.ts"; import { GatewayManager } from "./gateway_manager.ts";
import { DiscordenoShard, WebSocketRequest } from "./ws.ts"; import { DiscordenoShard, WebSocketRequest } from "./shard.ts";
export function sendShardMessage( export function sendShardMessage(
gateway: GatewayManager, gateway: GatewayManager,
+2 -4
View File
@@ -40,13 +40,11 @@ export interface DiscordenoShard {
queueStartedAt: number; queueStartedAt: number;
/** The request counter of the queue. */ /** The request counter of the queue. */
queueCounter: number; queueCounter: number;
/** The safe number of requests that can be made while preserving some for required things like heartbeating. */
safeRequestsPerShard: number;
} }
export interface WebSocketRequest { export interface WebSocketRequest {
op: GatewayOpcodes; op: GatewayOpcodes;
d: unknown; d: unknown;
// guildId: bigint;
// shardId: number;
// nonce?: bigint;
// options?: Record<string, unknown>;
} }