From 321175a72fcf2ed52e69f5470ea20df7366bf1a0 Mon Sep 17 00:00:00 2001 From: ayntee Date: Wed, 23 Dec 2020 11:03:06 +0400 Subject: [PATCH] Revert "chore: verbose imports" This reverts commit dfe6f371d35db49549fc12af08201689815803c1. --- src/api/controllers/cache.ts | 5 +- src/api/controllers/channels.ts | 6 +- src/api/controllers/guilds.ts | 4 +- src/api/controllers/interactions.ts | 10 +- src/api/controllers/members.ts | 8 +- src/api/controllers/messages.ts | 9 +- src/api/controllers/misc.ts | 12 +- src/api/controllers/reactions.ts | 6 +- src/api/controllers/roles.ts | 6 +- src/api/handlers/channel.ts | 12 +- src/api/handlers/guild.ts | 37 +-- src/api/handlers/member.ts | 7 +- src/api/handlers/message.ts | 8 +- src/api/handlers/webhook.ts | 8 +- src/api/structures/structures.ts | 7 + src/rest/mod.ts | 419 +++++++++++++++++++++++++- src/rest/request_manager.ts | 418 -------------------------- src/util/cache.ts | 10 +- src/util/permissions.ts | 3 +- src/ws/mod.ts | 438 +++++++++++++++++++++++++++- src/ws/shard.ts | 436 --------------------------- src/ws/shard_manager.ts | 4 +- 22 files changed, 937 insertions(+), 936 deletions(-) create mode 100644 src/api/structures/structures.ts delete mode 100644 src/rest/request_manager.ts delete mode 100644 src/ws/shard.ts diff --git a/src/api/controllers/cache.ts b/src/api/controllers/cache.ts index 370d80ce9..10edc18ba 100644 --- a/src/api/controllers/cache.ts +++ b/src/api/controllers/cache.ts @@ -1,10 +1,7 @@ +import { Channel, Guild, Member, Message } from "../structures/structures.ts"; import { PresenceUpdatePayload } from "../../types/types.ts"; import { cache } from "../../util/cache.ts"; import { Collection } from "../../util/collection.ts"; -import { Channel } from "../structures/channel.ts"; -import { Guild } from "../structures/guild.ts"; -import { Member } from "../structures/member.ts"; -import { Message } from "../structures/message.ts"; export type TableName = | "guilds" diff --git a/src/api/controllers/channels.ts b/src/api/controllers/channels.ts index eb7dbc72b..f967c1830 100644 --- a/src/api/controllers/channels.ts +++ b/src/api/controllers/channels.ts @@ -1,17 +1,17 @@ import { eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { ChannelCreatePayload, ChannelTypes, DiscordPayload, } from "../../types/types.ts"; -import { createChannel } from "../structures/channel.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalChannelCreate(data: DiscordPayload) { if (data.t !== "CHANNEL_CREATE") return; const payload = data.d as ChannelCreatePayload; - const channel = await createChannel(payload); + const channel = await structures.createChannel(payload); await cacheHandlers.set("channels", channel.id, channel); eventHandlers.channelCreate?.(channel); @@ -57,7 +57,7 @@ export async function handleInternalChannelUpdate(data: DiscordPayload) { const payload = data.d as ChannelCreatePayload; const cachedChannel = await cacheHandlers.get("channels", payload.id); - const channel = await createChannel(payload); + const channel = await structures.createChannel(payload); cacheHandlers.set("channels", channel.id, channel); if (!cachedChannel) return; diff --git a/src/api/controllers/guilds.ts b/src/api/controllers/guilds.ts index 4ab3009b6..c8c2d41fe 100644 --- a/src/api/controllers/guilds.ts +++ b/src/api/controllers/guilds.ts @@ -1,4 +1,5 @@ import { eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { CreateGuildPayload, DiscordPayload, @@ -8,7 +9,6 @@ import { UpdateGuildPayload, } from "../../types/types.ts"; import { cache } from "../../util/cache.ts"; -import { createGuild } from "../structures/guild.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalGuildCreate( @@ -21,7 +21,7 @@ export async function handleInternalGuildCreate( // When shards resume they emit GUILD_CREATE again. if (await cacheHandlers.has("guilds", payload.id)) return; - const guild = await createGuild( + const guild = await structures.createGuild( data.d as CreateGuildPayload, shardID, ); diff --git a/src/api/controllers/interactions.ts b/src/api/controllers/interactions.ts index c3e3dbfc4..7e99e3a5b 100644 --- a/src/api/controllers/interactions.ts +++ b/src/api/controllers/interactions.ts @@ -1,9 +1,7 @@ +import { DiscordPayload } from "../../types/types.ts"; import { eventHandlers } from "../../bot.ts"; -import { - DiscordPayload, - InteractionCommandPayload, -} from "../../types/types.ts"; -import { createMember } from "../structures/member.ts"; +import { structures } from "../structures/mod.ts"; +import { InteractionCommandPayload } from "../../types/types.ts"; export async function handleInternalInteractionsCreate(data: DiscordPayload) { if (data.t !== "INTERACTION_CREATE") return; @@ -13,7 +11,7 @@ export async function handleInternalInteractionsCreate(data: DiscordPayload) { eventHandlers.interactionCreate?.( { ...payload, - member: await createMember(payload.member, payload.guild_id), + member: await structures.createMember(payload.member, payload.guild_id), }, ); } diff --git a/src/api/controllers/members.ts b/src/api/controllers/members.ts index aff083985..1e7983ca8 100644 --- a/src/api/controllers/members.ts +++ b/src/api/controllers/members.ts @@ -1,4 +1,5 @@ import { eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { DiscordPayload, GuildBanPayload, @@ -7,7 +8,6 @@ import { GuildMemberUpdatePayload, } from "../../types/types.ts"; import { cache } from "../../util/cache.ts"; -import { createMember } from "../structures/member.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalGuildMemberAdd(data: DiscordPayload) { @@ -18,7 +18,7 @@ export async function handleInternalGuildMemberAdd(data: DiscordPayload) { if (!guild) return; guild.memberCount++; - const member = await createMember( + const member = await structures.createMember( payload, payload.guild_id, ); @@ -60,7 +60,7 @@ export async function handleInternalGuildMemberUpdate(data: DiscordPayload) { mute: guildMember?.mute || false, roles: payload.roles, }; - const member = await createMember( + const member = await structures.createMember( newMemberData, payload.guild_id, ); @@ -98,7 +98,7 @@ export async function handleInternalGuildMembersChunk(data: DiscordPayload) { if (!guild) return; await Promise.all( - payload.members.map((member) => createMember(member, guild.id)), + payload.members.map((member) => structures.createMember(member, guild.id)), ); // Check if its necessary to resolve the fetchmembers promise for this chunk or if more chunks will be coming diff --git a/src/api/controllers/messages.ts b/src/api/controllers/messages.ts index 613e5b144..5a2d62ae6 100644 --- a/src/api/controllers/messages.ts +++ b/src/api/controllers/messages.ts @@ -1,12 +1,11 @@ import { eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { DiscordPayload, MessageCreateOptions, MessageDeleteBulkPayload, MessageDeletePayload, } from "../../types/types.ts"; -import { createMember } from "../structures/member.ts"; -import { createMessage } from "../structures/message.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalMessageCreate(data: DiscordPayload) { @@ -22,7 +21,7 @@ export async function handleInternalMessageCreate(data: DiscordPayload) { if (payload.member && guild) { // If in a guild cache the author as a member - await createMember( + await structures.createMember( { ...payload.member, user: payload.author }, guild.id, ); @@ -31,14 +30,14 @@ export async function handleInternalMessageCreate(data: DiscordPayload) { payload.mentions.forEach((mention) => { // Cache the member if its a valid member if (mention.member && guild) { - createMember( + structures.createMember( { ...mention.member, user: mention }, guild.id, ); } }); - const message = await createMessage(payload); + const message = await structures.createMessage(payload); // Cache the message cacheHandlers.set("messages", payload.id, message); diff --git a/src/api/controllers/misc.ts b/src/api/controllers/misc.ts index 50bc4f18e..bf23d8966 100644 --- a/src/api/controllers/misc.ts +++ b/src/api/controllers/misc.ts @@ -1,4 +1,9 @@ +import { + initialMemberLoadQueue, + structures, +} from "../structures/structures.ts"; import { eventHandlers, setBotID } from "../../bot.ts"; +import { allowNextShard } from "../../ws/shard_manager.ts"; import { DiscordPayload, PresenceUpdatePayload, @@ -10,9 +15,6 @@ import { } from "../../types/types.ts"; import { cache } from "../../util/cache.ts"; import { delay } from "../../util/utils.ts"; -import { allowNextShard } from "../../ws/shard_manager.ts"; -import { initialMemberLoadQueue } from "../structures/guild.ts"; -import { createMember } from "../structures/member.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalReady( @@ -35,7 +37,7 @@ export async function handleInternalReady( // All the members that came in on guild creates should now be processed 1 by 1 for (const [guildID, members] of initialMemberLoadQueue.entries()) { await Promise.all( - members.map((member) => createMember(member, guildID)), + members.map((member) => structures.createMember(member, guildID)), ); } } @@ -85,7 +87,7 @@ export async function handleInternalVoiceStateUpdate(data: DiscordPayload) { if (!guild) return; const member = payload.member - ? await createMember(payload.member, guild.id) + ? await structures.createMember(payload.member, guild.id) : await cacheHandlers.get("members", payload.user_id); if (!member) return; diff --git a/src/api/controllers/reactions.ts b/src/api/controllers/reactions.ts index 51b8e0b27..d0a19fc9d 100644 --- a/src/api/controllers/reactions.ts +++ b/src/api/controllers/reactions.ts @@ -1,11 +1,11 @@ import { botID, eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { BaseMessageReactionPayload, DiscordPayload, MessageReactionPayload, MessageReactionRemoveEmojiPayload, } from "../../types/types.ts"; -import { createMember } from "../structures/member.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalMessageReactionAdd(data: DiscordPayload) { @@ -40,7 +40,7 @@ export async function handleInternalMessageReactionAdd(data: DiscordPayload) { if (payload.member && payload.guild_id) { const guild = await cacheHandlers.get("guilds", payload.guild_id); if (guild) { - await createMember(payload.member, guild.id); + await structures.createMember(payload.member, guild.id); } } @@ -93,7 +93,7 @@ export async function handleInternalMessageReactionRemove( if (payload.member && payload.guild_id) { const guild = await cacheHandlers.get("guilds", payload.guild_id); if (guild) { - await createMember( + await structures.createMember( payload.member, guild.id, ); diff --git a/src/api/controllers/roles.ts b/src/api/controllers/roles.ts index a1f03facf..95a8f439a 100644 --- a/src/api/controllers/roles.ts +++ b/src/api/controllers/roles.ts @@ -1,10 +1,10 @@ import { eventHandlers } from "../../bot.ts"; +import { structures } from "../structures/structures.ts"; import { DiscordPayload, GuildRoleDeletePayload, GuildRolePayload, } from "../../types/types.ts"; -import { createRole } from "../structures/role.ts"; import { cacheHandlers } from "./cache.ts"; export async function handleInternalGuildRoleCreate(data: DiscordPayload) { @@ -14,7 +14,7 @@ export async function handleInternalGuildRoleCreate(data: DiscordPayload) { const guild = await cacheHandlers.get("guilds", payload.guild_id); if (!guild) return; - const role = await createRole(payload.role); + const role = await structures.createRole(payload.role); const roles = guild.roles.set(payload.role.id, role); guild.roles = roles; return eventHandlers.roleCreate?.(guild, role); @@ -55,7 +55,7 @@ export async function handleInternalGuildRoleUpdate(data: DiscordPayload) { const cachedRole = guild.roles.get(payload.role.id); if (!cachedRole) return; - const role = await createRole(payload.role); + const role = await structures.createRole(payload.role); guild.roles.set(payload.role.id, role); eventHandlers.roleUpdate?.(guild, role, cachedRole); } diff --git a/src/api/handlers/channel.ts b/src/api/handlers/channel.ts index 02b174dda..205882d00 100644 --- a/src/api/handlers/channel.ts +++ b/src/api/handlers/channel.ts @@ -1,4 +1,6 @@ +import { cacheHandlers } from "../controllers/cache.ts"; import { RequestManager } from "../../rest/mod.ts"; +import { structures } from "../structures/structures.ts"; import { ChannelEditOptions, ChannelTypes, @@ -21,8 +23,6 @@ import { botHasChannelPermissions, calculateBits, } from "../../util/permissions.ts"; -import { cacheHandlers } from "../controllers/cache.ts"; -import { createMessage } from "../structures/message.ts"; /** Checks if a channel overwrite for a user id or a role id has permission in this channel */ export function channelOverwriteHasPermission( @@ -73,7 +73,7 @@ export async function getMessage( const result = await RequestManager.get( endpoints.CHANNEL_MESSAGE(channelID, id), ) as MessageCreateOptions; - return createMessage(result); + return structures.createMessage(result); } /** Fetches between 2-100 messages. Requires VIEW_CHANNEL and READ_MESSAGE_HISTORY */ @@ -111,7 +111,7 @@ export async function getMessages( endpoints.CHANNEL_MESSAGES(channelID), options, )) as MessageCreateOptions[]; - return Promise.all(result.map((res) => createMessage(res))); + return Promise.all(result.map((res) => structures.createMessage(res))); } /** Get pinned messages in this channel. */ @@ -119,7 +119,7 @@ export async function getPins(channelID: string) { const result = (await RequestManager.get( endpoints.CHANNEL_PINS(channelID), )) as MessageCreateOptions[]; - return Promise.all(result.map((res) => createMessage(res))); + return Promise.all(result.map((res) => structures.createMessage(res))); } /** Send a message to the channel. Requires SEND_MESSAGES permission. */ @@ -227,7 +227,7 @@ export async function sendMessage( }, ); - return createMessage(result as MessageCreateOptions); + return structures.createMessage(result as MessageCreateOptions); } /** Delete messages from the channel. 2-100. Requires the MANAGE_MESSAGES permission */ diff --git a/src/api/handlers/guild.ts b/src/api/handlers/guild.ts index 78a1ab0d5..69a1f3193 100644 --- a/src/api/handlers/guild.ts +++ b/src/api/handlers/guild.ts @@ -1,5 +1,13 @@ +import { cacheHandlers } from "../controllers/cache.ts"; import { identifyPayload } from "../../bot.ts"; import { RequestManager } from "../../rest/mod.ts"; +import { requestAllMembers } from "../../ws/shard_manager.ts"; +import { + Guild, + Member, + structures, + Template, +} from "../structures/structures.ts"; import { AuditLogs, BannedUser, @@ -37,13 +45,6 @@ import { Collection } from "../../util/collection.ts"; import { endpoints } from "../../util/constants.ts"; import { botHasPermission, calculateBits } from "../../util/permissions.ts"; import { urlToBase64 } from "../../util/utils.ts"; -import { requestAllMembers } from "../../ws/shard_manager.ts"; -import { cacheHandlers } from "../controllers/cache.ts"; -import { createChannel } from "../structures/channel.ts"; -import { createGuild, Guild } from "../structures/guild.ts"; -import { createMember, Member } from "../structures/member.ts"; -import { createRole } from "../structures/role.ts"; -import { createTemplate, Template } from "../structures/template.ts"; /** Create a new guild. Returns a guild object on success. Fires a Guild Create Gateway event. This endpoint can be used only by bots in less than 10 guilds. */ export async function createServer(options: CreateServerOptions) { @@ -51,7 +52,7 @@ export async function createServer(options: CreateServerOptions) { endpoints.GUILDS, options, ) as CreateGuildPayload; - return createGuild(guild, 0); + return structures.createGuild(guild, 0); } /** Delete a guild permanently. User must be owner. Returns 204 No Content on success. Fires a Guild Delete Gateway event. @@ -135,7 +136,7 @@ export async function createGuildChannel( type: options?.type || ChannelTypes.GUILD_TEXT, })) as ChannelCreatePayload; - const channel = await createChannel(result); + const channel = await structures.createChannel(result); return channel; } @@ -165,7 +166,7 @@ export async function getChannels(guildID: string, addToCache = true) { endpoints.GUILD_CHANNELS(guildID), ) as ChannelCreatePayload[]; return Promise.all(result.map(async (res) => { - const channel = await createChannel(res, guildID); + const channel = await structures.createChannel(res, guildID); if (addToCache) { cacheHandlers.set("channels", channel.id, channel); } @@ -181,7 +182,7 @@ export async function getChannel(channelID: string, addToCache = true) { const result = await RequestManager.get( endpoints.GUILD_CHANNEL(channelID), ) as ChannelCreatePayload; - const channel = await createChannel(result, result.guild_id); + const channel = await structures.createChannel(result, result.guild_id); if (addToCache) cacheHandlers.set("channels", channel.id, channel); return channel; } @@ -216,7 +217,7 @@ export async function getMember( endpoints.GUILD_MEMBER(guildID, id), ) as MemberCreatePayload; - const member = await createMember(data, guildID); + const member = await structures.createMember(data, guildID); return member; } @@ -322,7 +323,7 @@ export async function createGuildRole( ); const roleData = result as RoleData; - const role = await createRole(roleData); + const role = await structures.createRole(roleData); const guild = await cacheHandlers.get("guilds", guildID); guild?.roles.set(role.id, role); return role; @@ -712,7 +713,7 @@ export async function getGuildTemplates(guildID: string) { const templates = await RequestManager.get( endpoints.GUILD_TEMPLATES(guildID), ) as GuildTemplate[]; - return templates.map((template) => createTemplate(template)); + return templates.map((template) => structures.createTemplate(template)); } /** @@ -729,7 +730,7 @@ export async function deleteGuildTemplate( const deletedTemplate = await RequestManager.delete( `${endpoints.GUILD_TEMPLATES(guildID)}/${templateCode}`, ) as GuildTemplate; - return createTemplate(deletedTemplate); + return structures.createTemplate(deletedTemplate); } /** @@ -760,7 +761,7 @@ export async function createGuildTemplate( endpoints.GUILD_TEMPLATES(guildID), data, ) as GuildTemplate; - return createTemplate(template); + return structures.createTemplate(template); } /** @@ -774,7 +775,7 @@ export async function syncGuildTemplate(guildID: string, templateCode: string) { const template = await RequestManager.put( `${endpoints.GUILD_TEMPLATES(guildID)}/${templateCode}`, ) as GuildTemplate; - return createTemplate(template); + return structures.createTemplate(template); } /** @@ -804,5 +805,5 @@ export async function editGuildTemplate( `${endpoints.GUILD_TEMPLATES(guildID)}/${templateCode}`, data, ) as GuildTemplate; - return createTemplate(template); + return structures.createTemplate(template); } diff --git a/src/api/handlers/member.ts b/src/api/handlers/member.ts index e74c18a87..7abb1840b 100644 --- a/src/api/handlers/member.ts +++ b/src/api/handlers/member.ts @@ -1,5 +1,7 @@ +import { cacheHandlers } from "../controllers/cache.ts"; import { botID } from "../../bot.ts"; import { RequestManager } from "../../rest/mod.ts"; +import { Member, structures } from "../structures/structures.ts"; import { DMChannelCreatePayload, EditMemberOptions, @@ -16,9 +18,6 @@ import { highestRole, } from "../../util/permissions.ts"; import { urlToBase64 } from "../../util/utils.ts"; -import { cacheHandlers } from "../controllers/cache.ts"; -import { createChannel } from "../structures/channel.ts"; -import { Member } from "../structures/member.ts"; import { sendMessage } from "./channel.ts"; /** The users custom avatar or the default avatar if you don't have a member object. */ @@ -124,7 +123,7 @@ export async function sendDirectMessage( ) as DMChannelCreatePayload; // Channel create event will have added this channel to the cache cacheHandlers.delete("channels", dmChannelData.id); - const channel = await createChannel(dmChannelData); + const channel = await structures.createChannel(dmChannelData); // Recreate the channel and add it undert he users id cacheHandlers.set("channels", memberID, channel); dmChannel = channel; diff --git a/src/api/handlers/message.ts b/src/api/handlers/message.ts index 47883b2a0..855266069 100644 --- a/src/api/handlers/message.ts +++ b/src/api/handlers/message.ts @@ -1,5 +1,7 @@ +import { cacheHandlers } from "../controllers/cache.ts"; import { botID } from "../../bot.ts"; import { RequestManager } from "../../rest/mod.ts"; +import { Message, structures } from "../structures/structures.ts"; import { Errors, MessageContent, @@ -9,8 +11,6 @@ import { import { endpoints } from "../../util/constants.ts"; import { botHasChannelPermissions } from "../../util/permissions.ts"; import { delay } from "../../util/utils.ts"; -import { cacheHandlers } from "../controllers/cache.ts"; -import { createMessage, Message } from "../structures/message.ts"; /** Delete a message with the channel id and message id only. */ export async function deleteMessageByID( @@ -274,7 +274,7 @@ export async function editMessage( endpoints.CHANNEL_MESSAGE(message.channelID, message.id), content, ); - return createMessage(result as MessageCreateOptions); + return structures.createMessage(result as MessageCreateOptions); } export async function publishMessage(channelID: string, messageID: string) { @@ -282,5 +282,5 @@ export async function publishMessage(channelID: string, messageID: string) { endpoints.CHANNEL_MESSAGE_CROSSPOST(channelID, messageID), ) as MessageCreateOptions; - return createMessage(data); + return structures.createMessage(data); } diff --git a/src/api/handlers/webhook.ts b/src/api/handlers/webhook.ts index d225e8ea4..ce42683c0 100644 --- a/src/api/handlers/webhook.ts +++ b/src/api/handlers/webhook.ts @@ -1,5 +1,5 @@ -import { botID } from "../../bot.ts"; import { RequestManager } from "../../rest/mod.ts"; +import { structures } from "../structures/structures.ts"; import { CreateSlashCommandOptions, EditSlashCommandOptions, @@ -13,11 +13,11 @@ import { WebhookCreateOptions, WebhookPayload, } from "../../types/types.ts"; -import { cache } from "../../util/cache.ts"; import { endpoints } from "../../util/constants.ts"; import { botHasChannelPermissions } from "../../util/permissions.ts"; import { urlToBase64 } from "../../util/utils.ts"; -import { createMessage } from "../structures/message.ts"; +import { botID } from "../../bot.ts"; +import { cache } from "../../util/cache.ts"; /** Create a new webhook. Requires the MANAGE_WEBHOOKS permission. Returns a webhook object on success. Webhook names follow our naming restrictions that can be found in our Usernames and Nicknames documentation, with the following additional stipulations: * @@ -110,7 +110,7 @@ export async function executeWebhook( ); if (!options.wait) return; - return createMessage(result as MessageCreateOptions); + return structures.createMessage(result as MessageCreateOptions); } export function getWebhook(webhookID: string) { diff --git a/src/api/structures/structures.ts b/src/api/structures/structures.ts new file mode 100644 index 000000000..5c2bdd66c --- /dev/null +++ b/src/api/structures/structures.ts @@ -0,0 +1,7 @@ +export * from "./channel.ts"; +export * from "./guild.ts"; +export * from "./member.ts"; +export * from "./message.ts"; +export * from "./mod.ts"; +export * from "./role.ts"; +export * from "./template.ts"; diff --git a/src/rest/mod.ts b/src/rest/mod.ts index bc5c8ff99..e722f9a9b 100644 --- a/src/rest/mod.ts +++ b/src/rest/mod.ts @@ -1 +1,418 @@ -export * from "./request_manager.ts"; +import { Errors, HttpResponseCode, RequestMethods } from "../types/types.ts"; +import { baseEndpoints, discordAPIURLS } from "../util/constants.ts"; +import { delay } from "../util/utils.ts"; +import { authorization, eventHandlers } from "../bot.ts"; + +const pathQueues: { [key: string]: QueuedRequest[] } = {}; +const ratelimitedPaths = new Map(); +let globallyRateLimited = false; +let queueInProcess = false; + +export interface QueuedRequest { + callback: () => Promise< + void | { + rateLimited: any; + beforeFetch: boolean; + bucketID?: string | null; + } + >; + bucketID?: string | null; + url: string; +} + +export interface RateLimitedPath { + url: string; + resetTimestamp: number; + bucketID: string | null; +} + +async function processRateLimitedPaths() { + const now = Date.now(); + ratelimitedPaths.forEach((value, key) => { + if (value.resetTimestamp > now) return; + ratelimitedPaths.delete(key); + if (key === "global") globallyRateLimited = false; + }); + + await delay(1000); + processRateLimitedPaths(); +} + +function addToQueue(request: QueuedRequest) { + const route = request.url.substring(baseEndpoints.BASE_URL.length + 1); + const parts = route.split("/"); + // Remove the major param + parts.shift(); + const [id] = parts; + + if (pathQueues[id]) { + pathQueues[id].push(request); + } else { + pathQueues[id] = [request]; + } +} + +async function cleanupQueues() { + Object.entries(pathQueues).map(([key, value]) => { + if (!value.length) { + // Remove it entirely + delete pathQueues[key]; + } + }); +} + +async function processQueue() { + while (queueInProcess) { + if ( + (Object.keys(pathQueues).length) && !globallyRateLimited + ) { + await Promise.allSettled( + Object.values(pathQueues).map(async (pathQueue) => { + const request = pathQueue.shift(); + if (!request) return; + + const rateLimitedURLResetIn = await checkRatelimits(request.url); + + if (request.bucketID) { + const rateLimitResetIn = await checkRatelimits(request.bucketID); + if (rateLimitResetIn) { + // This request is still rate limited readd to queue + addToQueue(request); + } else if (rateLimitedURLResetIn) { + // This URL is rate limited readd to queue + addToQueue(request); + } else { + // This request is not rate limited so it should be run + const result = await request.callback(); + if (result && result.rateLimited) { + addToQueue( + { ...request, bucketID: result.bucketID || request.bucketID }, + ); + } + } + } else { + if (rateLimitedURLResetIn) { + // This URL is rate limited readd to queue + addToQueue(request); + } else { + // This request has no bucket id so it should be processed + const result = await request.callback(); + if (request && result && result.rateLimited) { + addToQueue( + { ...request, bucketID: result.bucketID || request.bucketID }, + ); + } + } + } + }), + ); + } + + if (Object.keys(pathQueues).length) { + cleanupQueues(); + } else queueInProcess = false; + } +} + +processRateLimitedPaths(); + +export const RequestManager = { + get: async (url: string, body?: unknown) => { + return runMethod("get", url, body); + }, + post: (url: string, body?: unknown) => { + return runMethod("post", url, body); + }, + delete: (url: string, body?: unknown) => { + return runMethod("delete", url, body); + }, + patch: (url: string, body?: unknown) => { + return runMethod("patch", url, body); + }, + put: (url: string, body?: unknown) => { + return runMethod("put", url, body); + }, +}; + +function createRequestBody(body: any, method: RequestMethods) { + const headers: { [key: string]: string } = { + Authorization: authorization, + "User-Agent": + `DiscordBot (https://github.com/skillz4killz/discordeno, v10)`, + }; + + if (method === "get") body = undefined; + + if (body?.reason) { + headers["X-Audit-Log-Reason"] = encodeURIComponent(body.reason); + } + + if (body?.file) { + const form = new FormData(); + form.append("file", body.file.blob, body.file.name); + form.append("payload_json", JSON.stringify({ ...body, file: undefined })); + body.file = form; + } else if ( + body && !["get", "delete"].includes(method) + ) { + headers["Content-Type"] = "application/json"; + } + + return { + headers, + body: body?.file || JSON.stringify(body), + method: method.toUpperCase(), + }; +} + +async function checkRatelimits(url: string) { + const ratelimited = ratelimitedPaths.get(url); + const global = ratelimitedPaths.get("global"); + const now = Date.now(); + + if (ratelimited && now < ratelimited.resetTimestamp) { + return ratelimited.resetTimestamp - now; + } + if (global && now < global.resetTimestamp) { + return global.resetTimestamp - now; + } + + return false; +} + +async function runMethod( + method: RequestMethods, + url: string, + body?: unknown, + retryCount = 0, + bucketID?: string | null, +) { + eventHandlers.debug?.( + { + type: "requestManager", + data: { method, url, body, retryCount, bucketID }, + }, + ); + + const errorStack = new Error("Location:"); + Error.captureStackTrace(errorStack); + + // For proxies we don't need to do any of the legwork so we just forward the request + if ( + !url.startsWith(discordAPIURLS.BASE_URL) && + !url.startsWith(discordAPIURLS.CDN_URL) + ) { + return fetch(url, { method, body: body ? JSON.stringify(body) : undefined }) + .then((res) => res.json()) + .catch((error) => { + console.error(error); + throw errorStack; + }); + } + + // No proxy so we need to handl all rate limiting and such + return new Promise((resolve, reject) => { + const callback = async () => { + try { + const rateLimitResetIn = await checkRatelimits(url); + if (rateLimitResetIn) { + return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketID }; + } + + const query = method === "get" && body + ? Object.entries(body as any).map(([key, value]) => + `${encodeURIComponent(key)}=${encodeURIComponent(value as any)}` + ) + .join("&") + : ""; + const urlToUse = method === "get" && query ? `${url}?${query}` : url; + + eventHandlers.debug?.( + { + type: "requestManagerFetching", + data: { method, url, body, retryCount, bucketID }, + }, + ); + const response = await fetch(urlToUse, createRequestBody(body, method)); + eventHandlers.debug?.( + { + type: "requestManagerFetched", + data: { method, url, body, retryCount, bucketID, response }, + }, + ); + const bucketIDFromHeaders = processHeaders(url, response.headers); + handleStatusCode(response, errorStack); + + // Sometimes Discord returns an empty 204 response that can't be made to JSON. + if (response.status === 204) return resolve(undefined); + + const json = await response.json(); + if ( + json.retry_after || + json.message === "You are being rate limited." + ) { + if (retryCount > 10) { + eventHandlers.debug?.( + { + type: "error", + data: { method, url, body, retryCount, bucketID, errorStack }, + }, + ); + throw new Error(Errors.RATE_LIMIT_RETRY_MAXED); + } + + return { + rateLimited: json.retry_after, + beforeFetch: false, + bucketID: bucketIDFromHeaders, + }; + } + + eventHandlers.debug?.( + { + type: "requestManagerSuccess", + data: { method, url, body, retryCount, bucketID }, + }, + ); + return resolve(json); + } catch (error) { + eventHandlers.debug?.( + { + type: "error", + data: { method, url, body, retryCount, bucketID, errorStack }, + }, + ); + return reject(error); + } + }; + + addToQueue({ + callback, + bucketID, + url, + }); + if (!queueInProcess) { + queueInProcess = true; + processQueue(); + } + }); +} + +async function logErrors(response: Response, errorStack?: unknown) { + try { + const error = await response.json(); + console.error(error); + + eventHandlers.debug?.({ type: "error", data: { errorStack, error } }); + } catch { + eventHandlers.debug?.( + { + type: "error", + data: { errorStack }, + }, + ); + console.error(response); + } +} + +function handleStatusCode(response: Response, errorStack?: unknown) { + const status = response.status; + + if ( + (status >= 200 && status < 400) || + status === HttpResponseCode.TooManyRequests + ) { + return true; + } + + logErrors(response, errorStack); + + switch (status) { + case HttpResponseCode.BadRequest: + console.error( + "The request was improperly formatted, or the server couldn't understand it.", + ); + throw errorStack; + case HttpResponseCode.Unauthorized: + console.error("The Authorization header was missing or invalid."); + throw errorStack; + case HttpResponseCode.Forbidden: + console.error( + "The Authorization token you passed did not have permission to the resource.", + ); + throw errorStack; + case HttpResponseCode.NotFound: + console.error("The resource at the location specified doesn't exist."); + throw errorStack; + case HttpResponseCode.MethodNotAllowed: + console.error( + "The HTTP method used is not valid for the location specified.", + ); + throw errorStack; + case HttpResponseCode.GatewayUnavailable: + console.error( + "There was not a gateway available to process your request. Wait a bit and retry.", + ); + throw errorStack; + // left are all unknown + default: + console.error(Errors.REQUEST_UNKNOWN_ERROR); + throw errorStack; + } +} + +function processHeaders(url: string, headers: Headers) { + let ratelimited = false; + + // Get all useful headers + const remaining = headers.get("x-ratelimit-remaining"); + const resetTimestamp = headers.get("x-ratelimit-reset"); + const retryAfter = headers.get("retry-after"); + const global = headers.get("x-ratelimit-global"); + const bucketID = headers.get("x-ratelimit-bucket"); + + // If there is no remaining rate limit for this endpoint, we save it in cache + if (remaining && remaining === "0") { + ratelimited = true; + + ratelimitedPaths.set(url, { + url, + resetTimestamp: Number(resetTimestamp) * 1000, + bucketID, + }); + + if (bucketID) { + ratelimitedPaths.set(bucketID, { + url, + resetTimestamp: Number(resetTimestamp) * 1000, + bucketID, + }); + } + } + + // If there is no remaining global limit, we save it in cache + if (global) { + const reset = Date.now() + (Number(retryAfter) * 1000); + eventHandlers.debug?.( + { type: "globallyRateLimited", data: { url, reset } }, + ); + globallyRateLimited = true; + ratelimited = true; + + ratelimitedPaths.set("global", { + url: "global", + resetTimestamp: reset, + bucketID, + }); + + if (bucketID) { + ratelimitedPaths.set(bucketID, { + url: "global", + resetTimestamp: reset, + bucketID, + }); + } + } + + return ratelimited ? bucketID : undefined; +} diff --git a/src/rest/request_manager.ts b/src/rest/request_manager.ts deleted file mode 100644 index e722f9a9b..000000000 --- a/src/rest/request_manager.ts +++ /dev/null @@ -1,418 +0,0 @@ -import { Errors, HttpResponseCode, RequestMethods } from "../types/types.ts"; -import { baseEndpoints, discordAPIURLS } from "../util/constants.ts"; -import { delay } from "../util/utils.ts"; -import { authorization, eventHandlers } from "../bot.ts"; - -const pathQueues: { [key: string]: QueuedRequest[] } = {}; -const ratelimitedPaths = new Map(); -let globallyRateLimited = false; -let queueInProcess = false; - -export interface QueuedRequest { - callback: () => Promise< - void | { - rateLimited: any; - beforeFetch: boolean; - bucketID?: string | null; - } - >; - bucketID?: string | null; - url: string; -} - -export interface RateLimitedPath { - url: string; - resetTimestamp: number; - bucketID: string | null; -} - -async function processRateLimitedPaths() { - const now = Date.now(); - ratelimitedPaths.forEach((value, key) => { - if (value.resetTimestamp > now) return; - ratelimitedPaths.delete(key); - if (key === "global") globallyRateLimited = false; - }); - - await delay(1000); - processRateLimitedPaths(); -} - -function addToQueue(request: QueuedRequest) { - const route = request.url.substring(baseEndpoints.BASE_URL.length + 1); - const parts = route.split("/"); - // Remove the major param - parts.shift(); - const [id] = parts; - - if (pathQueues[id]) { - pathQueues[id].push(request); - } else { - pathQueues[id] = [request]; - } -} - -async function cleanupQueues() { - Object.entries(pathQueues).map(([key, value]) => { - if (!value.length) { - // Remove it entirely - delete pathQueues[key]; - } - }); -} - -async function processQueue() { - while (queueInProcess) { - if ( - (Object.keys(pathQueues).length) && !globallyRateLimited - ) { - await Promise.allSettled( - Object.values(pathQueues).map(async (pathQueue) => { - const request = pathQueue.shift(); - if (!request) return; - - const rateLimitedURLResetIn = await checkRatelimits(request.url); - - if (request.bucketID) { - const rateLimitResetIn = await checkRatelimits(request.bucketID); - if (rateLimitResetIn) { - // This request is still rate limited readd to queue - addToQueue(request); - } else if (rateLimitedURLResetIn) { - // This URL is rate limited readd to queue - addToQueue(request); - } else { - // This request is not rate limited so it should be run - const result = await request.callback(); - if (result && result.rateLimited) { - addToQueue( - { ...request, bucketID: result.bucketID || request.bucketID }, - ); - } - } - } else { - if (rateLimitedURLResetIn) { - // This URL is rate limited readd to queue - addToQueue(request); - } else { - // This request has no bucket id so it should be processed - const result = await request.callback(); - if (request && result && result.rateLimited) { - addToQueue( - { ...request, bucketID: result.bucketID || request.bucketID }, - ); - } - } - } - }), - ); - } - - if (Object.keys(pathQueues).length) { - cleanupQueues(); - } else queueInProcess = false; - } -} - -processRateLimitedPaths(); - -export const RequestManager = { - get: async (url: string, body?: unknown) => { - return runMethod("get", url, body); - }, - post: (url: string, body?: unknown) => { - return runMethod("post", url, body); - }, - delete: (url: string, body?: unknown) => { - return runMethod("delete", url, body); - }, - patch: (url: string, body?: unknown) => { - return runMethod("patch", url, body); - }, - put: (url: string, body?: unknown) => { - return runMethod("put", url, body); - }, -}; - -function createRequestBody(body: any, method: RequestMethods) { - const headers: { [key: string]: string } = { - Authorization: authorization, - "User-Agent": - `DiscordBot (https://github.com/skillz4killz/discordeno, v10)`, - }; - - if (method === "get") body = undefined; - - if (body?.reason) { - headers["X-Audit-Log-Reason"] = encodeURIComponent(body.reason); - } - - if (body?.file) { - const form = new FormData(); - form.append("file", body.file.blob, body.file.name); - form.append("payload_json", JSON.stringify({ ...body, file: undefined })); - body.file = form; - } else if ( - body && !["get", "delete"].includes(method) - ) { - headers["Content-Type"] = "application/json"; - } - - return { - headers, - body: body?.file || JSON.stringify(body), - method: method.toUpperCase(), - }; -} - -async function checkRatelimits(url: string) { - const ratelimited = ratelimitedPaths.get(url); - const global = ratelimitedPaths.get("global"); - const now = Date.now(); - - if (ratelimited && now < ratelimited.resetTimestamp) { - return ratelimited.resetTimestamp - now; - } - if (global && now < global.resetTimestamp) { - return global.resetTimestamp - now; - } - - return false; -} - -async function runMethod( - method: RequestMethods, - url: string, - body?: unknown, - retryCount = 0, - bucketID?: string | null, -) { - eventHandlers.debug?.( - { - type: "requestManager", - data: { method, url, body, retryCount, bucketID }, - }, - ); - - const errorStack = new Error("Location:"); - Error.captureStackTrace(errorStack); - - // For proxies we don't need to do any of the legwork so we just forward the request - if ( - !url.startsWith(discordAPIURLS.BASE_URL) && - !url.startsWith(discordAPIURLS.CDN_URL) - ) { - return fetch(url, { method, body: body ? JSON.stringify(body) : undefined }) - .then((res) => res.json()) - .catch((error) => { - console.error(error); - throw errorStack; - }); - } - - // No proxy so we need to handl all rate limiting and such - return new Promise((resolve, reject) => { - const callback = async () => { - try { - const rateLimitResetIn = await checkRatelimits(url); - if (rateLimitResetIn) { - return { rateLimited: rateLimitResetIn, beforeFetch: true, bucketID }; - } - - const query = method === "get" && body - ? Object.entries(body as any).map(([key, value]) => - `${encodeURIComponent(key)}=${encodeURIComponent(value as any)}` - ) - .join("&") - : ""; - const urlToUse = method === "get" && query ? `${url}?${query}` : url; - - eventHandlers.debug?.( - { - type: "requestManagerFetching", - data: { method, url, body, retryCount, bucketID }, - }, - ); - const response = await fetch(urlToUse, createRequestBody(body, method)); - eventHandlers.debug?.( - { - type: "requestManagerFetched", - data: { method, url, body, retryCount, bucketID, response }, - }, - ); - const bucketIDFromHeaders = processHeaders(url, response.headers); - handleStatusCode(response, errorStack); - - // Sometimes Discord returns an empty 204 response that can't be made to JSON. - if (response.status === 204) return resolve(undefined); - - const json = await response.json(); - if ( - json.retry_after || - json.message === "You are being rate limited." - ) { - if (retryCount > 10) { - eventHandlers.debug?.( - { - type: "error", - data: { method, url, body, retryCount, bucketID, errorStack }, - }, - ); - throw new Error(Errors.RATE_LIMIT_RETRY_MAXED); - } - - return { - rateLimited: json.retry_after, - beforeFetch: false, - bucketID: bucketIDFromHeaders, - }; - } - - eventHandlers.debug?.( - { - type: "requestManagerSuccess", - data: { method, url, body, retryCount, bucketID }, - }, - ); - return resolve(json); - } catch (error) { - eventHandlers.debug?.( - { - type: "error", - data: { method, url, body, retryCount, bucketID, errorStack }, - }, - ); - return reject(error); - } - }; - - addToQueue({ - callback, - bucketID, - url, - }); - if (!queueInProcess) { - queueInProcess = true; - processQueue(); - } - }); -} - -async function logErrors(response: Response, errorStack?: unknown) { - try { - const error = await response.json(); - console.error(error); - - eventHandlers.debug?.({ type: "error", data: { errorStack, error } }); - } catch { - eventHandlers.debug?.( - { - type: "error", - data: { errorStack }, - }, - ); - console.error(response); - } -} - -function handleStatusCode(response: Response, errorStack?: unknown) { - const status = response.status; - - if ( - (status >= 200 && status < 400) || - status === HttpResponseCode.TooManyRequests - ) { - return true; - } - - logErrors(response, errorStack); - - switch (status) { - case HttpResponseCode.BadRequest: - console.error( - "The request was improperly formatted, or the server couldn't understand it.", - ); - throw errorStack; - case HttpResponseCode.Unauthorized: - console.error("The Authorization header was missing or invalid."); - throw errorStack; - case HttpResponseCode.Forbidden: - console.error( - "The Authorization token you passed did not have permission to the resource.", - ); - throw errorStack; - case HttpResponseCode.NotFound: - console.error("The resource at the location specified doesn't exist."); - throw errorStack; - case HttpResponseCode.MethodNotAllowed: - console.error( - "The HTTP method used is not valid for the location specified.", - ); - throw errorStack; - case HttpResponseCode.GatewayUnavailable: - console.error( - "There was not a gateway available to process your request. Wait a bit and retry.", - ); - throw errorStack; - // left are all unknown - default: - console.error(Errors.REQUEST_UNKNOWN_ERROR); - throw errorStack; - } -} - -function processHeaders(url: string, headers: Headers) { - let ratelimited = false; - - // Get all useful headers - const remaining = headers.get("x-ratelimit-remaining"); - const resetTimestamp = headers.get("x-ratelimit-reset"); - const retryAfter = headers.get("retry-after"); - const global = headers.get("x-ratelimit-global"); - const bucketID = headers.get("x-ratelimit-bucket"); - - // If there is no remaining rate limit for this endpoint, we save it in cache - if (remaining && remaining === "0") { - ratelimited = true; - - ratelimitedPaths.set(url, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketID, - }); - - if (bucketID) { - ratelimitedPaths.set(bucketID, { - url, - resetTimestamp: Number(resetTimestamp) * 1000, - bucketID, - }); - } - } - - // If there is no remaining global limit, we save it in cache - if (global) { - const reset = Date.now() + (Number(retryAfter) * 1000); - eventHandlers.debug?.( - { type: "globallyRateLimited", data: { url, reset } }, - ); - globallyRateLimited = true; - ratelimited = true; - - ratelimitedPaths.set("global", { - url: "global", - resetTimestamp: reset, - bucketID, - }); - - if (bucketID) { - ratelimitedPaths.set(bucketID, { - url: "global", - resetTimestamp: reset, - bucketID, - }); - } - } - - return ratelimited ? bucketID : undefined; -} diff --git a/src/util/cache.ts b/src/util/cache.ts index 2d894c312..fe6f7fe03 100644 --- a/src/util/cache.ts +++ b/src/util/cache.ts @@ -1,7 +1,9 @@ -import { Channel } from "../api/structures/channel.ts"; -import { Guild } from "../api/structures/guild.ts"; -import { Member } from "../api/structures/member.ts"; -import { Message } from "../api/structures/message.ts"; +import { + Channel, + Guild, + Member, + Message, +} from "../api/structures/structures.ts"; import { PresenceUpdatePayload } from "../types/types.ts"; import { Collection } from "./collection.ts"; diff --git a/src/util/permissions.ts b/src/util/permissions.ts index 94b73e869..919464584 100644 --- a/src/util/permissions.ts +++ b/src/util/permissions.ts @@ -1,7 +1,6 @@ import { cacheHandlers } from "../api/controllers/cache.ts"; -import { Guild } from "../api/structures/guild.ts"; -import { Role } from "../api/structures/role.ts"; import { botID } from "../bot.ts"; +import { Guild, Role } from "../api/structures/structures.ts"; import { Permission, Permissions, RawOverwrite } from "../types/types.ts"; /** Checks if the member has this permission. If the member is an owner or has admin perms it will always be true. */ diff --git a/src/ws/mod.ts b/src/ws/mod.ts index 907ca87b3..035b198b9 100644 --- a/src/ws/mod.ts +++ b/src/ws/mod.ts @@ -1,2 +1,436 @@ -export * from "./shard.ts"; -export * from "./shard_manager.ts"; +import { botGatewayData, eventHandlers } from "../bot.ts"; +import { + DiscordBotGatewayData, + DiscordHeartbeatPayload, + FetchMembersOptions, + GatewayOpcode, + ReadyPayload, +} from "../types/types.ts"; +import { BotStatusRequest, delay } from "../util/utils.ts"; +import { IdentifyPayload, proxyWSURL } from "../bot.ts"; +import { handleDiscordPayload } from "./shard_manager.ts"; +import { decompressWith } from "./deps.ts"; + +const basicShards = new Map(); +const heartbeating = new Map(); +const utf8decoder = new TextDecoder(); +const RequestMembersQueue: RequestMemberQueuedRequest[] = []; +let processQueue = false; + +export interface BasicShard { + id: number; + socket: WebSocket; + resumeInterval: number; + sessionID: string; + previousSequenceNumber: number | null; + needToResume: boolean; +} + +interface RequestMemberQueuedRequest { + guildID: string; + shardID: number; + nonce: string; + options?: FetchMembersOptions; +} + +export async function createShard( + data: DiscordBotGatewayData, + identifyPayload: IdentifyPayload, + resuming = false, + shardID = 0, +) { + const oldShard = basicShards.get(shardID); + + const socket = new WebSocket(proxyWSURL); + socket.binaryType = "arraybuffer"; + const basicShard: BasicShard = { + id: shardID, + socket, + resumeInterval: 0, + sessionID: oldShard?.sessionID || "", + previousSequenceNumber: oldShard?.previousSequenceNumber || 0, + needToResume: false, + }; + + basicShards.set(basicShard.id, basicShard); + + socket.onopen = async () => { + if (!resuming) { + // Initial identify with the gateway + await identify(basicShard, identifyPayload); + } else { + await resume(basicShard, identifyPayload); + } + }; + + socket.onerror = ({ timeStamp }) => { + eventHandlers.debug?.({ type: "wsError", data: { timeStamp } }); + }; + + socket.onmessage = ({ data: message }) => { + if (message instanceof ArrayBuffer) { + message = new Uint8Array(message); + } + + if (message instanceof Uint8Array) { + message = decompressWith( + message, + 0, + (slice: Uint8Array) => utf8decoder.decode(slice), + ); + } + + if (typeof message === "string") { + const data = JSON.parse(message); + if (!data.t) eventHandlers.rawGateway?.(data); + switch (data.op) { + case GatewayOpcode.Hello: + if (!heartbeating.has(basicShard.id)) { + heartbeat( + basicShard, + (data.d as DiscordHeartbeatPayload).heartbeat_interval, + identifyPayload, + data, + ); + } + break; + case GatewayOpcode.HeartbeatACK: + heartbeating.set(shardID, true); + break; + case GatewayOpcode.Reconnect: + eventHandlers.debug?.( + { type: "reconnect", data: { shardID: basicShard.id } }, + ); + basicShard.needToResume = true; + resumeConnection(data, identifyPayload, basicShard.id); + break; + case GatewayOpcode.InvalidSession: + eventHandlers.debug?.( + { type: "invalidSession", data: { shardID: basicShard.id, data } }, + ); + // When d is false we need to reidentify + if (!data.d) { + createShard(data, identifyPayload, false, shardID); + break; + } + basicShard.needToResume = true; + resumeConnection(data, identifyPayload, basicShard.id); + break; + default: + if (data.t === "RESUMED") { + eventHandlers.debug?.( + { type: "resumed", data: { shardID: basicShard.id } }, + ); + + basicShard.needToResume = false; + break; + } + // Important for RESUME + if (data.t === "READY") { + basicShard.sessionID = (data.d as ReadyPayload).session_id; + } + + // Update the sequence number if it is present + if (data.s) basicShard.previousSequenceNumber = data.s; + + handleDiscordPayload(data, basicShard.id); + break; + } + } + }; + + // TODO(ayntee): better ws* event names + socket.onclose = ({ reason, code, wasClean }) => { + eventHandlers.debug?.( + { + type: "wsClose", + data: { shardID: basicShard.id, code, reason, wasClean }, + }, + ); + + switch (code) { + case 4001: + throw new Error( + "[Unknown opcode] Sent an invalid Gateway opcode or an invalid payload for an opcode.", + ); + case 4002: + throw new Error("[Decode error] Sent an invalid payload to API."); + case 4004: + throw new Error( + "[Authentication failed] The account token sent with your identify payload is incorrect.", + ); + case 4005: + throw new Error( + "[Already authenticated] Sent more than one identify payload.", + ); + case 4010: + throw new Error( + "[Invalid shard] Sent an invalid shard when identifying.", + ); + case 4011: + throw new Error( + "[Sharding required] The session would have handled too many guilds - you are required to shard your connection in order to connect.", + ); + case 4012: + throw new Error( + "[Invalid API version] Sent an invalid version for the gateway.", + ); + case 4013: + throw new Error( + "[Invalid intent(s)] Sent an invalid intent for a Gateway Intent.", + ); + case 4014: + throw new Error( + "[Disallowed intent(s)] Sent a disallowed intent for a Gateway Intent. You may have tried to specify an intent that you have not enabled or are not whitelisted for.", + ); + case 4003: + case 4007: + case 4008: + case 4009: + eventHandlers.debug?.({ + type: "wsReconnect", + data: { shardID: basicShard.id, code, reason, wasClean }, + }); + createShard(data, identifyPayload, false, shardID); + break; + default: + basicShard.needToResume = true; + resumeConnection(botGatewayData, identifyPayload, shardID); + break; + } + }; +} + +function identify(shard: BasicShard, payload: IdentifyPayload) { + eventHandlers.debug?.( + { + type: "identifying", + data: { + shardID: shard.id, + }, + }, + ); + + return shard.socket.send( + JSON.stringify( + { + op: GatewayOpcode.Identify, + d: { ...payload, shard: [shard.id, payload.shard[1]] }, + }, + ), + ); +} + +function resume(shard: BasicShard, payload: IdentifyPayload) { + return shard.socket.send(JSON.stringify({ + op: GatewayOpcode.Resume, + d: { + token: payload.token, + session_id: shard.sessionID, + seq: shard.previousSequenceNumber, + }, + })); +} + +async function heartbeat( + shard: BasicShard, + interval: number, + payload: IdentifyPayload, + data: DiscordBotGatewayData, +) { + // We lost socket connection between heartbeats, resume connection + if (shard.socket.readyState === WebSocket.CLOSED) { + shard.needToResume = true; + resumeConnection(data, payload, shard.id); + heartbeating.delete(shard.id); + return; + } + + if (heartbeating.has(shard.id)) { + const receivedACK = heartbeating.get(shard.id); + // If a ACK response was not received since last heartbeat, issue invalid session close + if (!receivedACK) { + eventHandlers.debug?.( + { + type: "heartbeatStopped", + data: { + interval, + previousSequenceNumber: shard.previousSequenceNumber, + shardID: shard.id, + }, + }, + ); + return shard.socket.send(JSON.stringify({ op: 4009 })); + } + } + + // Set it to false as we are issuing a new heartbeat + heartbeating.set(shard.id, false); + + shard.socket.send( + JSON.stringify( + { op: GatewayOpcode.Heartbeat, d: shard.previousSequenceNumber }, + ), + ); + eventHandlers.debug?.( + { + type: "heartbeat", + data: { + interval, + previousSequenceNumber: shard.previousSequenceNumber, + shardID: shard.id, + }, + }, + ); + await delay(interval); + heartbeat(shard, interval, payload, data); +} + +async function resumeConnection( + data: DiscordBotGatewayData, + payload: IdentifyPayload, + shardID: number, +) { + const shard = basicShards.get(shardID); + if (!shard) { + eventHandlers.debug?.( + { type: "missingShard", data: { shardID: shardID } }, + ); + return; + } + + if (!shard.needToResume) return; + + eventHandlers.debug?.({ type: "resuming", data: { shardID: shard.id } }); + // Run it once + createShard(data, payload, true, shard.id); + // Then retry every 15 seconds + await delay(1000 * 15); + if (shard.needToResume) resumeConnection(data, payload, shardID); +} + +export function requestGuildMembers( + guildID: string, + shardID: number, + nonce: string, + options?: FetchMembersOptions, + queuedRequest = false, +) { + const shard = basicShards.get(shardID); + + // This request was not from this queue so we add it to queue first + if (!queuedRequest) { + RequestMembersQueue.push({ + guildID, + shardID, + nonce, + options, + }); + + if (!processQueue) { + processQueue = true; + processGatewayQueue(); + } + return; + } + + // If its closed add back to queue to redo on resume + if (shard?.socket.readyState === WebSocket.CLOSED) { + requestGuildMembers(guildID, shardID, nonce, options); + return; + } + + shard?.socket.send(JSON.stringify({ + op: GatewayOpcode.RequestGuildMembers, + d: { + guild_id: guildID, + // If a query is provided use it, OR if a limit is NOT provided use "" + query: options?.query || (options?.limit ? undefined : ""), + limit: options?.limit || 0, + presences: options?.presences || false, + user_ids: options?.userIDs, + nonce, + }, + })); +} + +async function processGatewayQueue() { + if (!RequestMembersQueue.length) { + processQueue = false; + return; + } + + basicShards.forEach((shard) => { + const index = RequestMembersQueue.findIndex((q) => q.shardID === shard.id); + // 2 events per second is the rate limit. + const request = RequestMembersQueue[index]; + if (request) { + eventHandlers.debug?.( + { + type: "requestMembersProcessing", + data: { + remaining: RequestMembersQueue.length, + request, + }, + }, + ); + requestGuildMembers( + request.guildID, + request.shardID, + request.nonce, + request.options, + true, + ); + // Remove item from queue + RequestMembersQueue.splice(index, 1); + + const secondIndex = RequestMembersQueue.findIndex((q) => + q.shardID === shard.id + ); + const secondRequest = RequestMembersQueue[secondIndex]; + if (secondRequest) { + eventHandlers.debug?.( + { + type: "requestMembersProcessing", + data: { + remaining: RequestMembersQueue.length, + request, + }, + }, + ); + requestGuildMembers( + secondRequest.guildID, + secondRequest.shardID, + secondRequest.nonce, + secondRequest.options, + true, + ); + // Remove item from queue + RequestMembersQueue.splice(secondIndex, 1); + } + } + }); + + await delay(1500); + + processGatewayQueue(); +} + +export function botGatewayStatusRequest(payload: BotStatusRequest) { + basicShards.forEach((shard) => { + shard.socket.send(JSON.stringify({ + op: GatewayOpcode.StatusUpdate, + d: { + since: null, + game: payload.game.name + ? { + name: payload.game.name, + type: payload.game.type, + } + : null, + status: payload.status, + afk: false, + }, + })); + }); +} diff --git a/src/ws/shard.ts b/src/ws/shard.ts deleted file mode 100644 index 035b198b9..000000000 --- a/src/ws/shard.ts +++ /dev/null @@ -1,436 +0,0 @@ -import { botGatewayData, eventHandlers } from "../bot.ts"; -import { - DiscordBotGatewayData, - DiscordHeartbeatPayload, - FetchMembersOptions, - GatewayOpcode, - ReadyPayload, -} from "../types/types.ts"; -import { BotStatusRequest, delay } from "../util/utils.ts"; -import { IdentifyPayload, proxyWSURL } from "../bot.ts"; -import { handleDiscordPayload } from "./shard_manager.ts"; -import { decompressWith } from "./deps.ts"; - -const basicShards = new Map(); -const heartbeating = new Map(); -const utf8decoder = new TextDecoder(); -const RequestMembersQueue: RequestMemberQueuedRequest[] = []; -let processQueue = false; - -export interface BasicShard { - id: number; - socket: WebSocket; - resumeInterval: number; - sessionID: string; - previousSequenceNumber: number | null; - needToResume: boolean; -} - -interface RequestMemberQueuedRequest { - guildID: string; - shardID: number; - nonce: string; - options?: FetchMembersOptions; -} - -export async function createShard( - data: DiscordBotGatewayData, - identifyPayload: IdentifyPayload, - resuming = false, - shardID = 0, -) { - const oldShard = basicShards.get(shardID); - - const socket = new WebSocket(proxyWSURL); - socket.binaryType = "arraybuffer"; - const basicShard: BasicShard = { - id: shardID, - socket, - resumeInterval: 0, - sessionID: oldShard?.sessionID || "", - previousSequenceNumber: oldShard?.previousSequenceNumber || 0, - needToResume: false, - }; - - basicShards.set(basicShard.id, basicShard); - - socket.onopen = async () => { - if (!resuming) { - // Initial identify with the gateway - await identify(basicShard, identifyPayload); - } else { - await resume(basicShard, identifyPayload); - } - }; - - socket.onerror = ({ timeStamp }) => { - eventHandlers.debug?.({ type: "wsError", data: { timeStamp } }); - }; - - socket.onmessage = ({ data: message }) => { - if (message instanceof ArrayBuffer) { - message = new Uint8Array(message); - } - - if (message instanceof Uint8Array) { - message = decompressWith( - message, - 0, - (slice: Uint8Array) => utf8decoder.decode(slice), - ); - } - - if (typeof message === "string") { - const data = JSON.parse(message); - if (!data.t) eventHandlers.rawGateway?.(data); - switch (data.op) { - case GatewayOpcode.Hello: - if (!heartbeating.has(basicShard.id)) { - heartbeat( - basicShard, - (data.d as DiscordHeartbeatPayload).heartbeat_interval, - identifyPayload, - data, - ); - } - break; - case GatewayOpcode.HeartbeatACK: - heartbeating.set(shardID, true); - break; - case GatewayOpcode.Reconnect: - eventHandlers.debug?.( - { type: "reconnect", data: { shardID: basicShard.id } }, - ); - basicShard.needToResume = true; - resumeConnection(data, identifyPayload, basicShard.id); - break; - case GatewayOpcode.InvalidSession: - eventHandlers.debug?.( - { type: "invalidSession", data: { shardID: basicShard.id, data } }, - ); - // When d is false we need to reidentify - if (!data.d) { - createShard(data, identifyPayload, false, shardID); - break; - } - basicShard.needToResume = true; - resumeConnection(data, identifyPayload, basicShard.id); - break; - default: - if (data.t === "RESUMED") { - eventHandlers.debug?.( - { type: "resumed", data: { shardID: basicShard.id } }, - ); - - basicShard.needToResume = false; - break; - } - // Important for RESUME - if (data.t === "READY") { - basicShard.sessionID = (data.d as ReadyPayload).session_id; - } - - // Update the sequence number if it is present - if (data.s) basicShard.previousSequenceNumber = data.s; - - handleDiscordPayload(data, basicShard.id); - break; - } - } - }; - - // TODO(ayntee): better ws* event names - socket.onclose = ({ reason, code, wasClean }) => { - eventHandlers.debug?.( - { - type: "wsClose", - data: { shardID: basicShard.id, code, reason, wasClean }, - }, - ); - - switch (code) { - case 4001: - throw new Error( - "[Unknown opcode] Sent an invalid Gateway opcode or an invalid payload for an opcode.", - ); - case 4002: - throw new Error("[Decode error] Sent an invalid payload to API."); - case 4004: - throw new Error( - "[Authentication failed] The account token sent with your identify payload is incorrect.", - ); - case 4005: - throw new Error( - "[Already authenticated] Sent more than one identify payload.", - ); - case 4010: - throw new Error( - "[Invalid shard] Sent an invalid shard when identifying.", - ); - case 4011: - throw new Error( - "[Sharding required] The session would have handled too many guilds - you are required to shard your connection in order to connect.", - ); - case 4012: - throw new Error( - "[Invalid API version] Sent an invalid version for the gateway.", - ); - case 4013: - throw new Error( - "[Invalid intent(s)] Sent an invalid intent for a Gateway Intent.", - ); - case 4014: - throw new Error( - "[Disallowed intent(s)] Sent a disallowed intent for a Gateway Intent. You may have tried to specify an intent that you have not enabled or are not whitelisted for.", - ); - case 4003: - case 4007: - case 4008: - case 4009: - eventHandlers.debug?.({ - type: "wsReconnect", - data: { shardID: basicShard.id, code, reason, wasClean }, - }); - createShard(data, identifyPayload, false, shardID); - break; - default: - basicShard.needToResume = true; - resumeConnection(botGatewayData, identifyPayload, shardID); - break; - } - }; -} - -function identify(shard: BasicShard, payload: IdentifyPayload) { - eventHandlers.debug?.( - { - type: "identifying", - data: { - shardID: shard.id, - }, - }, - ); - - return shard.socket.send( - JSON.stringify( - { - op: GatewayOpcode.Identify, - d: { ...payload, shard: [shard.id, payload.shard[1]] }, - }, - ), - ); -} - -function resume(shard: BasicShard, payload: IdentifyPayload) { - return shard.socket.send(JSON.stringify({ - op: GatewayOpcode.Resume, - d: { - token: payload.token, - session_id: shard.sessionID, - seq: shard.previousSequenceNumber, - }, - })); -} - -async function heartbeat( - shard: BasicShard, - interval: number, - payload: IdentifyPayload, - data: DiscordBotGatewayData, -) { - // We lost socket connection between heartbeats, resume connection - if (shard.socket.readyState === WebSocket.CLOSED) { - shard.needToResume = true; - resumeConnection(data, payload, shard.id); - heartbeating.delete(shard.id); - return; - } - - if (heartbeating.has(shard.id)) { - const receivedACK = heartbeating.get(shard.id); - // If a ACK response was not received since last heartbeat, issue invalid session close - if (!receivedACK) { - eventHandlers.debug?.( - { - type: "heartbeatStopped", - data: { - interval, - previousSequenceNumber: shard.previousSequenceNumber, - shardID: shard.id, - }, - }, - ); - return shard.socket.send(JSON.stringify({ op: 4009 })); - } - } - - // Set it to false as we are issuing a new heartbeat - heartbeating.set(shard.id, false); - - shard.socket.send( - JSON.stringify( - { op: GatewayOpcode.Heartbeat, d: shard.previousSequenceNumber }, - ), - ); - eventHandlers.debug?.( - { - type: "heartbeat", - data: { - interval, - previousSequenceNumber: shard.previousSequenceNumber, - shardID: shard.id, - }, - }, - ); - await delay(interval); - heartbeat(shard, interval, payload, data); -} - -async function resumeConnection( - data: DiscordBotGatewayData, - payload: IdentifyPayload, - shardID: number, -) { - const shard = basicShards.get(shardID); - if (!shard) { - eventHandlers.debug?.( - { type: "missingShard", data: { shardID: shardID } }, - ); - return; - } - - if (!shard.needToResume) return; - - eventHandlers.debug?.({ type: "resuming", data: { shardID: shard.id } }); - // Run it once - createShard(data, payload, true, shard.id); - // Then retry every 15 seconds - await delay(1000 * 15); - if (shard.needToResume) resumeConnection(data, payload, shardID); -} - -export function requestGuildMembers( - guildID: string, - shardID: number, - nonce: string, - options?: FetchMembersOptions, - queuedRequest = false, -) { - const shard = basicShards.get(shardID); - - // This request was not from this queue so we add it to queue first - if (!queuedRequest) { - RequestMembersQueue.push({ - guildID, - shardID, - nonce, - options, - }); - - if (!processQueue) { - processQueue = true; - processGatewayQueue(); - } - return; - } - - // If its closed add back to queue to redo on resume - if (shard?.socket.readyState === WebSocket.CLOSED) { - requestGuildMembers(guildID, shardID, nonce, options); - return; - } - - shard?.socket.send(JSON.stringify({ - op: GatewayOpcode.RequestGuildMembers, - d: { - guild_id: guildID, - // If a query is provided use it, OR if a limit is NOT provided use "" - query: options?.query || (options?.limit ? undefined : ""), - limit: options?.limit || 0, - presences: options?.presences || false, - user_ids: options?.userIDs, - nonce, - }, - })); -} - -async function processGatewayQueue() { - if (!RequestMembersQueue.length) { - processQueue = false; - return; - } - - basicShards.forEach((shard) => { - const index = RequestMembersQueue.findIndex((q) => q.shardID === shard.id); - // 2 events per second is the rate limit. - const request = RequestMembersQueue[index]; - if (request) { - eventHandlers.debug?.( - { - type: "requestMembersProcessing", - data: { - remaining: RequestMembersQueue.length, - request, - }, - }, - ); - requestGuildMembers( - request.guildID, - request.shardID, - request.nonce, - request.options, - true, - ); - // Remove item from queue - RequestMembersQueue.splice(index, 1); - - const secondIndex = RequestMembersQueue.findIndex((q) => - q.shardID === shard.id - ); - const secondRequest = RequestMembersQueue[secondIndex]; - if (secondRequest) { - eventHandlers.debug?.( - { - type: "requestMembersProcessing", - data: { - remaining: RequestMembersQueue.length, - request, - }, - }, - ); - requestGuildMembers( - secondRequest.guildID, - secondRequest.shardID, - secondRequest.nonce, - secondRequest.options, - true, - ); - // Remove item from queue - RequestMembersQueue.splice(secondIndex, 1); - } - } - }); - - await delay(1500); - - processGatewayQueue(); -} - -export function botGatewayStatusRequest(payload: BotStatusRequest) { - basicShards.forEach((shard) => { - shard.socket.send(JSON.stringify({ - op: GatewayOpcode.StatusUpdate, - d: { - since: null, - game: payload.game.name - ? { - name: payload.game.name, - type: payload.game.type, - } - : null, - status: payload.status, - afk: false, - }, - })); - }); -} diff --git a/src/ws/shard_manager.ts b/src/ws/shard_manager.ts index dec07be08..8236d561a 100644 --- a/src/ws/shard_manager.ts +++ b/src/ws/shard_manager.ts @@ -1,6 +1,5 @@ import { controllers } from "../api/controllers/mod.ts"; -import { Guild } from "../api/structures/guild.ts"; -import { eventHandlers, IdentifyPayload } from "../bot.ts"; +import { Guild } from "../api/structures/structures.ts"; import { DiscordBotGatewayData, DiscordPayload, @@ -14,6 +13,7 @@ import { createShard, requestGuildMembers, } from "./mod.ts"; +import { eventHandlers, IdentifyPayload } from "../bot.ts"; let createNextShard = true;