Merge remote-tracking branch 'upstream/main' into perm-check-only-if-cached

This commit is contained in:
ITOH
2021-04-04 18:44:38 +02:00
48 changed files with 1080 additions and 625 deletions

View File

@@ -2,12 +2,11 @@ import { getGatewayBot } from "./helpers/misc/get_gateway_bot.ts";
import { DiscordGatewayIntents } from "./types/gateway/gateway_intents.ts";
import { DiscordGetGatewayBot } from "./types/gateway/get_gateway_bot.ts";
import { baseEndpoints, GATEWAY_VERSION } from "./util/constants.ts";
import { spawnShards } from "./ws/shard_manager.ts";
export let authorization = "";
export let restAuthorization = "";
export let botId = "";
export let applicationId = "";
export let secretKey = "";
export let botID = "";
export let applicationID = "";
export let eventHandlers: EventHandlers = {};
@@ -84,10 +83,9 @@ export async function startBigBrainBot(data: BigBrainBotConfig) {
authorization = `Bot ${data.token}`;
identifyPayload.token = `Bot ${data.token}`;
if (data.restAuthorization) restAuthorization = data.restAuthorization;
if (data.secretKey) secretKey = data.secretKey;
if (data.restURL) baseEndpoints.BASE_URL = data.restURL;
if (data.cdnURL) baseEndpoints.CDN_URL = data.cdnURL;
if (data.wsURL) proxyWSURL = data.wsURL;
if (data.eventHandlers) eventHandlers = data.eventHandlers;
if (data.compress) {
identifyPayload.compress = data.compress;
@@ -103,19 +101,21 @@ export async function startBigBrainBot(data: BigBrainBotConfig) {
0,
);
// Initial API connection to get info about bots connection
botGatewayData = await getGatewayBot();
if (!data.wsURL) proxyWSURL = botGatewayData.url;
await spawnShards(
botGatewayData,
identifyPayload,
data.firstShardId,
data.lastShardId ||
(botGatewayData.shards >= 25
? (data.firstShardId + 25)
: botGatewayData.shards),
);
// PROXY DOESNT NEED US SPAWNING SHARDS
if (!data.wsPort) {
// Initial API connection to get info about bots connection
botGatewayData = await getGatewayBot();
proxyWSURL = botGatewayData.url;
await spawnShards(
botGatewayData,
identifyPayload,
data.firstShardID,
data.lastShardID ||
(botGatewayData.shards >= 25
? (data.firstShardID + 25)
: botGatewayData.shards),
);
}
}
export interface BotConfig {
@@ -127,15 +127,15 @@ export interface BotConfig {
export interface BigBrainBotConfig extends BotConfig {
/** The first shard to start at for this worker. Use this to control which shards to run in each worker. */
firstShardId: number;
/** The last shard to start for this worker. By default it will be 25 + the firstShardId. */
lastShardId?: number;
/** This can be used to forward the ws handling to a proxy. */
wsURL?: string;
firstShardID: number;
/** The last shard to start for this worker. By default it will be 25 + the firstShardID. */
lastShardID?: number;
/** This can be used to forward the ws handling to a proxy. It will disable the sharding done by the bot side. */
wsPort?: number;
/** This can be used to forward the REST handling to a proxy. */
restURL?: string;
/** This can be used to forward the CDN handling to a proxy. */
cdnURL?: string;
/** This is the authorization header that your rest proxy will validate */
restAuthorization?: string;
/** This is the authorization header that your servers will send. Helpful to prevent DDOS attacks and such. */
secretKey?: string;
}

View File

@@ -23,7 +23,7 @@ export const cache = {
(
value:
| Collection<string, Member>
| PromiseLike<Collection<string, Member>>
| PromiseLike<Collection<string, Member>>,
) => void
>(),
executedSlashCommands: new Collection<string, string>(),
@@ -31,8 +31,8 @@ export const cache = {
return new Collection<string, Emoji>(
this.guilds.reduce(
(a, b) => [...a, ...b.emojis.map((e) => [e.id, e])],
[] as any[]
)
[] as any[],
),
);
},
};
@@ -78,32 +78,32 @@ export type TableName =
function set(
table: "guilds",
key: string,
value: Guild
value: Guild,
): Promise<Collection<string, Guild>>;
function set(
table: "channels",
key: string,
value: Channel
value: Channel,
): Promise<Collection<string, Channel>>;
function set(
table: "messages",
key: string,
value: Message
value: Message,
): Promise<Collection<string, Message>>;
function set(
table: "members",
key: string,
value: Member
value: Member,
): Promise<Collection<string, Member>>;
function set(
table: "presences",
key: string,
value: PresenceUpdatePayload
value: PresenceUpdatePayload,
): Promise<Collection<string, PresenceUpdatePayload>>;
function set(
table: "unavailableGuilds",
key: string,
value: number
value: number,
): Promise<Collection<string, number>>;
async function set(table: TableName, key: string, value: any) {
return cache[table].set(key, value);
@@ -115,11 +115,11 @@ function get(table: "messages", key: string): Promise<Message | undefined>;
function get(table: "members", key: string): Promise<Member | undefined>;
function get(
table: "presences",
key: string
key: string,
): Promise<PresenceUpdatePayload | undefined>;
function get(
table: "unavailableGuilds",
key: string
key: string,
): Promise<Guild | undefined>;
async function get(table: TableName, key: string) {
return cache[table].get(key);
@@ -127,54 +127,54 @@ async function get(table: TableName, key: string) {
function forEach(
table: "guilds",
callback: (value: Guild, key: string, map: Map<string, Guild>) => unknown
callback: (value: Guild, key: string, map: Map<string, Guild>) => unknown,
): void;
function forEach(
table: "unavailableGuilds",
callback: (value: Guild, key: string, map: Map<string, Guild>) => unknown
callback: (value: Guild, key: string, map: Map<string, Guild>) => unknown,
): void;
function forEach(
table: "channels",
callback: (value: Channel, key: string, map: Map<string, Channel>) => unknown
callback: (value: Channel, key: string, map: Map<string, Channel>) => unknown,
): void;
function forEach(
table: "messages",
callback: (value: Message, key: string, map: Map<string, Message>) => unknown
callback: (value: Message, key: string, map: Map<string, Message>) => unknown,
): void;
function forEach(
table: "members",
callback: (value: Member, key: string, map: Map<string, Member>) => unknown
callback: (value: Member, key: string, map: Map<string, Member>) => unknown,
): void;
function forEach(
table: TableName,
callback: (value: any, key: string, map: Map<string, any>) => unknown
callback: (value: any, key: string, map: Map<string, any>) => unknown,
) {
return cache[table].forEach(callback);
}
function filter(
table: "guilds",
callback: (value: Guild, key: string) => boolean
callback: (value: Guild, key: string) => boolean,
): Promise<Collection<string, Guild>>;
function filter(
table: "unavailableGuilds",
callback: (value: Guild, key: string) => boolean
callback: (value: Guild, key: string) => boolean,
): Promise<Collection<string, Guild>>;
function filter(
table: "channels",
callback: (value: Channel, key: string) => boolean
callback: (value: Channel, key: string) => boolean,
): Promise<Collection<string, Channel>>;
function filter(
table: "messages",
callback: (value: Message, key: string) => boolean
callback: (value: Message, key: string) => boolean,
): Promise<Collection<string, Message>>;
function filter(
table: "members",
callback: (value: Member, key: string) => boolean
callback: (value: Member, key: string) => boolean,
): Promise<Collection<string, Member>>;
async function filter(
table: TableName,
callback: (value: any, key: string) => boolean
callback: (value: any, key: string) => boolean,
) {
return cache[table].filter(callback);
}

View File

@@ -1,7 +1,6 @@
import { eventHandlers } from "../../bot.ts";
import { cache, cacheHandlers } from "../../cache.ts";
import { structures } from "../../structures/mod.ts";
import { basicShards } from "../../ws/shard.ts";
import { DiscordGatewayPayload } from "../../types/gateway/gateway_payload.ts";
import { DiscordGuild } from "../../types/guilds/guild.ts";

View File

@@ -1,6 +1,5 @@
import { eventHandlers } from "../../bot.ts";
import { cacheHandlers } from "../../cache.ts";
import { basicShards } from "../../ws/shard.ts";
import { DiscordGatewayPayload } from "../../types/gateway/gateway_payload.ts";
import { DiscordUnavailableGuild } from "../../types/guilds/unavailable_guild.ts";

View File

@@ -15,7 +15,7 @@ import {
export async function createChannel(
guildId: string,
name: string,
options?: CreateGuildChannel
options?: CreateGuildChannel,
) {
const requiredPerms: Set<PermissionStrings> = new Set(["MANAGE_CHANNELS"]);
@@ -39,7 +39,7 @@ export async function createChannel(
deny: calculateBits(perm.deny),
})),
type: options?.type || DiscordChannelTypes.GUILD_TEXT,
}
},
)) as DiscordChannel;
const channelStruct = await structures.createChannelStruct(result);

View File

@@ -1,18 +1,16 @@
import { Guild } from "../../structures/mod.ts";
import { DiscordImageFormat } from "../../types/misc/image_format.ts";
import { DiscordImageSize } from "../../types/misc/image_size.ts";
import { endpoints } from "../../util/constants.ts";
import { formatImageURL } from "../../util/utils.ts";
/** The full URL of the banner from Discords CDN. Undefined if no banner is set. */
export function guildBannerURL(
guild: Guild,
size: ImageSize = 128,
format?: ImageFormats,
id: string,
banner: string,
size: DiscordImageSize = 128,
format?: DiscordImageFormat,
) {
return guild.banner
? formatImageURL(
endpoints.GUILD_BANNER(guild.id, guild.banner),
size,
format,
)
return banner
? formatImageURL(endpoints.GUILD_BANNER(id, banner), size, format)
: undefined;
}

View File

@@ -1,14 +1,16 @@
import { Guild } from "../../structures/mod.ts";
import { DiscordImageFormat } from "../../types/misc/image_format.ts";
import { DiscordImageSize } from "../../types/misc/image_size.ts";
import { endpoints } from "../../util/constants.ts";
import { formatImageURL } from "../../util/utils.ts";
/** The full URL of the icon from Discords CDN. Undefined when no icon is set. */
export function guildIconURL(
guild: Guild,
size: ImageSize = 128,
format?: ImageFormats,
id: string,
icon: string,
size: DiscordImageSize = 128,
format?: DiscordImageFormat,
) {
return guild.icon
? formatImageURL(endpoints.GUILD_ICON(guild.id, guild.icon), size, format)
return icon
? formatImageURL(endpoints.GUILD_ICON(id, icon), size, format)
: undefined;
}

View File

@@ -1,16 +1,18 @@
import { Guild } from "../../structures/mod.ts";
import { DiscordImageFormat } from "../../types/misc/image_format.ts";
import { DiscordImageSize } from "../../types/misc/image_size.ts";
import { endpoints } from "../../util/constants.ts";
import { formatImageURL } from "../../util/utils.ts";
/** The full URL of the splash from Discords CDN. Undefined if no splash is set. */
export function guildSplashURL(
guild: Guild,
size: ImageSize = 128,
format?: ImageFormats,
id: string,
splash: string,
size: DiscordImageSize = 128,
format?: DiscordImageFormat,
) {
return guild.splash
return splash
? formatImageURL(
endpoints.GUILD_SPLASH(guild.id, guild.splash),
endpoints.GUILD_SPLASH(id, splash),
size,
format,
)

View File

@@ -1,3 +1,5 @@
import { DiscordImageFormat } from "../../types/misc/image_format.ts";
import { DiscordImageSize } from "../../types/misc/image_size.ts";
import { endpoints } from "../../util/constants.ts";
import { formatImageURL } from "../../util/utils.ts";
@@ -6,8 +8,8 @@ export function avatarURL(
userId: string,
discriminator: string,
avatar?: string | null,
size: ImageSize = 128,
format?: ImageFormats,
size: DiscordImageSize = 128,
format?: DiscordImageFormat,
) {
return avatar
? formatImageURL(endpoints.USER_AVATAR(userId, avatar), size, format)

View File

@@ -3,9 +3,6 @@ import { Member } from "../../structures/mod.ts";
import { DiscordGatewayIntents } from "../../types/gateway/gateway_intents.ts";
import { Errors } from "../../types/misc/errors.ts";
import { Collection } from "../../util/collection.ts";
import { requestAllMembers } from "../../ws/shard_manager.ts";
import { DiscordGatewayIntents } from "../../types/gateway/gateway_intents.ts";
import { Errors } from "../../types/misc/errors.ts";
/**
* ⚠️ BEGINNER DEVS!! YOU SHOULD ALMOST NEVER NEED THIS AND YOU CAN GET FROM cache.members.get()

View File

@@ -15,11 +15,10 @@ export async function getMember(
const guild = await cacheHandlers.get("guilds", guildId);
if (!guild && !options?.force) return;
const data =
(await rest.runMethod(
"get",
endpoints.GUILD_MEMBER(guildId, id),
)) as MemberCreatePayload;
const data = (await rest.runMethod(
"get",
endpoints.GUILD_MEMBER(guildId, id),
)) as MemberCreatePayload;
const memberStruct = await structures.createMemberStruct(data, guildId);
await cacheHandlers.set("members", memberStruct.id, memberStruct);

View File

@@ -1,7 +1,6 @@
import { cacheHandlers } from "../../cache.ts";
import { Member } from "../../structures/mod.ts";
import { Collection } from "../../util/collection.ts";
import { requestAllMembers } from "../../ws/shard_manager.ts";
/** Returns guild member objects for the specified user by their nickname/username.
*

View File

@@ -7,6 +7,7 @@ import { getBan } from "../helpers/guilds/get_ban.ts";
import { getBans } from "../helpers/guilds/get_bans.ts";
import { guildBannerURL } from "../helpers/guilds/guild_banner_url.ts";
import { guildIconURL } from "../helpers/guilds/guild_icon_url.ts";
import { guildSplashURL } from "../helpers/guilds/guild_splash_url.ts";
import { leaveGuild } from "../helpers/guilds/leave_guild.ts";
import { getInvites } from "../helpers/invites/get_invites.ts";
import { banMember } from "../helpers/members/ban_member.ts";
@@ -64,7 +65,10 @@ const baseGuild: Partial<GuildStruct> = {
return Boolean(this.features?.includes(DiscordGuildFeatures.VERIFIED));
},
bannerURL(size, format) {
return guildBannerURL(this as unknown as Guild, size, format);
return guildBannerURL(this.id!, this.banner!, size, format);
},
splashURL(size, format) {
return guildSplashURL(this.id!, this.splash!, size, format);
},
delete() {
return deleteServer(this.id!);
@@ -91,7 +95,7 @@ const baseGuild: Partial<GuildStruct> = {
return getInvites(this.id!);
},
iconURL(size, format) {
return guildIconURL(this as unknown as Guild, size, format);
return guildIconURL(this.id!, this.icon!, size, format);
},
leave() {
return leaveGuild(this.id!);
@@ -114,7 +118,7 @@ export async function createGuildStruct(
} = snakeKeysToCamelCase(data) as Guild;
const roles = await Promise.all(
data.roles.map((role) => structures.createRoleStruct(role))
data.roles.map((role) => structures.createRoleStruct(role)),
);
await Promise.all(channels.map(async (channel) => {
@@ -144,7 +148,7 @@ export async function createGuildStruct(
),
memberCount: createNewProp(memberCount),
emojis: createNewProp(
new Collection(emojis.map((emoji) => [emoji.id ?? emoji.name, emoji]))
new Collection(emojis.map((emoji) => [emoji.id ?? emoji.name, emoji])),
),
voiceStates: createNewProp(
new Collection(
@@ -161,11 +165,11 @@ export async function createGuildStruct(
members.map(async (member) => {
const memberStruct = await structures.createMemberStruct(
member,
guild.id
guild.id,
);
return cacheHandlers.set("members", memberStruct.id, memberStruct);
})
}),
);
}
@@ -217,6 +221,11 @@ export interface GuildStruct extends
size?: DiscordImageSize,
format?: DiscordImageFormat,
): string | undefined;
/** The splash url for this server */
splashURL(
size?: DiscordImageSize,
format?: DiscordImageFormat,
): string | undefined;
/** The full URL of the icon from Discords CDN. Undefined when no icon is set. */
iconURL(
size?: DiscordImageSize,

View File

@@ -9,7 +9,7 @@ import { removeAllReactions } from "../helpers/messages/remove_all_reactions.ts"
import { removeReaction } from "../helpers/messages/remove_reaction.ts";
import { removeReactionEmoji } from "../helpers/messages/remove_reaction_emoji.ts";
import { sendMessage } from "../helpers/messages/send_message.ts";
import { CHANNEL_MENTION_REGEX } from "../util/constants";
import { CHANNEL_MENTION_REGEX } from "../util/constants.ts";
import { createNewProp } from "../util/utils.ts";
const baseMessage: Partial<Message> = {
@@ -30,9 +30,8 @@ const baseMessage: Partial<Message> = {
return this.member?.guilds.get(this.guildId);
},
get link() {
return `https://discord.com/channels/${this.guildId || "@me"}/${
this.channelId
}/${this.id}`;
return `https://discord.com/channels/${this.guildId ||
"@me"}/${this.channelId}/${this.id}`;
},
get mentionedRoles() {
return this.mentionRoleIds?.map((id) => this.guild?.roles.get(id)) || [];
@@ -61,20 +60,19 @@ const baseMessage: Partial<Message> = {
return addReactions(this.channelId!, this.id!, reactions, ordered);
},
reply(content) {
const contentWithMention =
typeof content === "string"
? {
content,
mentions: { repliedUser: true },
replyMessageId: this.id,
failReplyIfNotExists: false,
}
: {
...content,
mentions: { ...(content.mentions || {}), repliedUser: true },
replyMessageId: this.id,
failReplyIfNotExists: content.failReplyIfNotExists === true,
};
const contentWithMention = typeof content === "string"
? {
content,
mentions: { repliedUser: true },
replyMessageId: this.id,
failReplyIfNotExists: false,
}
: {
...content,
mentions: { ...(content.mentions || {}), repliedUser: true },
replyMessageId: this.id,
failReplyIfNotExists: content.failReplyIfNotExists === true,
};
if (this.guildId) return sendMessage(this.channelId!, contentWithMention);
return sendDirectMessage(this.author!.id, contentWithMention);
@@ -132,8 +130,8 @@ export async function createMessageStruct(data: MessageCreateOptions) {
}
// Discord doesnt give guild id for getMessage() so this will fill it in
const guildIdFinal =
guildId || (await cacheHandlers.get("channels", channelId))?.guildId || "";
const guildIdFinal = guildId ||
(await cacheHandlers.get("channels", channelId))?.guildId || "";
const message = Object.create(baseMessage, {
...restProps,
@@ -150,16 +148,16 @@ export async function createMessageStruct(data: MessageCreateOptions) {
...mentionChannelIds,
// Add any other ids that can be validated in a channel mention format
...(rest.content.match(CHANNEL_MENTION_REGEX) || []).map((text) =>
// converts the <#123> into 123
// converts the <#123> into 123
text.substring(2, text.length - 1)
),
].map((m) => m.id)
].map((m) => m.id),
),
webhookId: createNewProp(webhookId),
messageReference: createNewProp(messageReference),
timestamp: createNewProp(Date.parse(data.timestamp)),
editedTimestamp: createNewProp(
editedTimestamp ? Date.parse(editedTimestamp) : undefined
editedTimestamp ? Date.parse(editedTimestamp) : undefined,
),
});

View File

@@ -1,12 +1,12 @@
import { Emoji } from "../emojis/emoji.ts";
import { SnakeCaseProps } from "../util.ts";
/** https://discord.com/developers/docs/topics/gateway#guild-emojis-update */
export interface GuildEmojisUpdate {
/** id of the guild */
guild_id: string;
guildId: string;
/** Array of emojis */
emojis: Emoji[];
}
/** https://discord.com/developers/docs/topics/gateway#guild-emojis-update */
export type DiscordGuildEmojisUpdate = SnakeCaseProps<GuildEmojisUpdate>;

View File

@@ -1,6 +1,6 @@
import { SnakeCaseProps } from "../util.ts";
import { IdentifyConnectionProperties } from "./identify_connection_properties.ts";
import { UpdateStatus } from "./update_status.ts";
import { StatusUpdate } from "./status_update.ts";
export interface Identify {
/** Authentication token */
@@ -12,9 +12,9 @@ export interface Identify {
/** Value between 50 and 250, total number of members where the gateway will stop sending offline members in the guild member list */
largeThreshold?: number;
/** Used for Guild Sharding */
shard?: [number, number];
shard?: [shardId: number, numberOfShards: number];
/** Presence structure for initial presence information */
presence?: UpdateStatus;
presence?: StatusUpdate;
/** Enables dispatching of guild subscription events (presence and typing events) */
guildSubscriptions?: boolean;
/** The Gateway Intents you wish to receive */

View File

@@ -1,4 +1,3 @@
/** https://discord.com/developers/docs/topics/gateway#identify-identify-connection-properties */
export interface IdentifyConnectionProperties {
/** Operating system */
$os: string;
@@ -8,4 +7,5 @@ export interface IdentifyConnectionProperties {
$device: string;
}
/** https://discord.com/developers/docs/topics/gateway#identify-identify-connection-properties */
export type DiscordIdentifyConnectionProperties = IdentifyConnectionProperties;

View File

@@ -1,11 +1,11 @@
/** https://discord.com/developers/docs/topics/gateway#resume */
export interface Resume {
/** Session token */
token: string;
/** Session id */
session_id: string;
sessionId: string;
/** Last sequence number received */
seq: number;
}
/** https://discord.com/developers/docs/topics/gateway#resume */
export type DiscordResume = Resume;

View File

@@ -2,7 +2,7 @@ import { Activity } from "../misc/activity.ts";
import { SnakeCaseProps } from "../util.ts";
import { DiscordStatusTypes } from "./status_types.ts";
export interface UpdateStatus {
export interface StatusUpdate {
/** Unix time (in milliseconds) of when the client went idle, or null if the client is not idle */
since: number | null;
/** null, or the user's activities */
@@ -14,4 +14,4 @@ export interface UpdateStatus {
}
/** https://discord.com/developers/docs/topics/gateway#update-status */
export type DiscordUpdateStatus = SnakeCaseProps<UpdateStatus>;
export type DiscordStatusUpdate = SnakeCaseProps<StatusUpdate>;

View File

@@ -1,7 +1,7 @@
import { User } from "../users/user.ts";
import { SnakeCaseProps } from "../util.ts";
export interface GuildBanAdd {
export interface GuildBanAddRemove {
/** id of the guild */
guildId: string;
/** The banned user */
@@ -9,4 +9,4 @@ export interface GuildBanAdd {
}
/** https://discord.com/developers/docs/topics/gateway#guild-ban-add */
export type DiscordGuildBanAdd = SnakeCaseProps<GuildBanAdd>;
export type DiscordGuildBanAddRemove = SnakeCaseProps<GuildBanAddRemove>;

View File

@@ -2,7 +2,7 @@ import { SnakeCaseProps } from "../util.ts";
export interface RequestGuildMembers {
/** id of the guild to get members for */
guild_id: string;
guildId: string;
/** String that username starts with, or an empty string to return all members */
query?: string;
/** Maximum number of members to send matching the query; a limit of 0 can be used with an empty string query to return all members */

View File

@@ -0,0 +1,13 @@
import { SnakeCaseProps } from "../util.ts";
import { ApplicationCommand } from "./application_command.ts";
export interface ApplicationCommandCreateUpdateDelete
extends ApplicationCommand {
/** Id of the guild the command is in */
guildId?: string;
}
/** https://discord.com/developers/docs/topics/gateway#application-command-delete-application-command-extra-fields */
export type DiscordApplicationCommandCreateUpdateDelete = SnakeCaseProps<
ApplicationCommandCreateUpdateDelete
>;

View File

@@ -1,4 +1,5 @@
import { Emoji } from "../emojis/emoji.ts";
import { GuildMember } from "../guilds/guild_member.ts";
import { SnakeCaseProps } from "../util.ts";
export interface MessageReactionAdd {

View File

@@ -14,11 +14,11 @@ export interface Activity {
/** Stream url, is validated when type is 1 */
url?: string | null;
/** Unix timestamp of when the activity was added to the user's session */
created_at: number;
createdAt: number;
/** Unix timestamps for start and/or end of the game */
timestamps?: ActivityTimestamps;
/** Application id for the game */
application_id?: string;
applicationId?: string;
/** What the player is currently doing */
details?: string | null;
/** The user's current party status */

View File

@@ -7,7 +7,7 @@ export interface PresenceUpdate {
/** The user presence is being updated for */
user: User;
/** id of the guild */
guild_id: string;
guildId: string;
/** Either "idle", "dnd", "online", or "offline" */
status: "idle" | "dnd" | "online" | "offline";
/** User's current activities */

View File

@@ -46,7 +46,7 @@ export * from "./gateway/ready.ts";
export * from "./gateway/resume.ts";
export * from "./gateway/session_start_limit.ts";
export * from "./gateway/status_types.ts";
export * from "./gateway/update_status.ts";
export * from "./gateway/status_update.ts";
export * from "./guilds/ban.ts";
export * from "./guilds/begin_guild_prune.ts";
export * from "./guilds/create_guild.ts";

View File

@@ -1,6 +1,6 @@
import { SnakeCaseProps } from "../util.ts";
import { DiscordVisibilityTypes } from "./visibility_types.ts";
import { Integration } from "../guilds/integration.ts"
import { Integration } from "../guilds/integration.ts";
export interface Connection {
/** id of the connection account */

View File

@@ -1,11 +1,11 @@
import { SnakeCaseProps } from "../util.ts";
export interface WebhooksUpdate {
export interface WebhookUpdate {
/** id of the guild */
guildId: string;
/** id of the channel */
channelId: string;
}
/** https://discord.com/developers/docs/topics/gateway#webhooks-update */
export type DiscordWebhooksUpdate = SnakeCaseProps<WebhooksUpdate>;
/** https://discord.com/developers/docs/topics/gateway#webhooks-update-webhook-update-event-fields */
export type DiscordWebhookUpdate = SnakeCaseProps<WebhookUpdate>;

View File

@@ -177,4 +177,4 @@ export const endpoints = {
};
export const SLASH_COMMANDS_NAME_REGEX = /^[\w-]{1,32}$/;
export const CHANNEL_MENTION_REGEX = /<#[0-9]+>/g;
export const CHANNEL_MENTION_REGEX = /<#[0-9]+>/g;

View File

@@ -1,7 +1,8 @@
import { encode } from "../../deps.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { Errors } from "../types/misc/errors.ts";
import { basicShards, sendWS } from "../ws/shard.ts";
import { DiscordImageFormat } from "../types/misc/image_format.ts";
import { DiscordImageSize } from "../types/misc/image_size.ts";
import { SLASH_COMMANDS_NAME_REGEX } from "./constants.ts";
export const sleep = (timeout: number) => {
@@ -46,8 +47,8 @@ export function delay(ms: number): Promise<void> {
export const formatImageURL = (
url: string,
size: ImageSize = 128,
format?: ImageFormats,
size: DiscordImageSize = 128,
format?: DiscordImageFormat,
) => {
return `${url}.${format ||
(url.includes("/a_") ? "gif" : "jpg")}?size=${size}`;

204
src/ws/README.md Normal file
View File

@@ -0,0 +1,204 @@
# Standalone WS / Proxy WS
This WS service is meant for ADVANCED DEVELOPERS ONLY!
## Benefits
- **Zero Downtime Updates**:
- Your bot can be updated in a matter of seconds. With normal sharding, you
have to restart which also has to process identifying all your shards with a
1/~5s rate limit. With WS handling moved to a proxy process, this allows you
to instantly get the bot code restarted without any concerns of delays. If
you have a bot on 200,000 servers normally this would mean a 20 minute delay
to restart your bot if you made a small change and restarted.
- **Zero Downtime Resharding**:
- Discord stops letting your bot get added to new servers at certain points in
time. For example, suppose you had 150,000 servers running 150 shards. The
maximum amount of servers your shards could hold is 150 * 2500 = 375,000. If
your bot reaches this, it can no longer join new servers until it re-shards.
- DD proxy provides 2 types of re-sharding. Automated and manual. You can also
have both.
- `Automated`: This system will automatically begin a Zero-downtime
resharding process behind the scenes when you reach 80% of your maximum
servers allowed by your shards. For example, since 375,000 was the max, at
300,000 we would begin re-sharding behind the scenes with `ZERO DOWNTIME`.
- 80% of maximum servers reached (The % of 80% is customizable.)
- Identify limits have room to allow re-sharding. (Also customizable)
- `Manual`: You can also trigger this manually should you choose.
- **Horizontal Scaling**:
- The proxy system allows you to scale the bot horizontally. When you reach a
huge size, you can either keep spending more money to keep beefing up your
server or you can buy several cheaper servers and scale horizontally. The
proxy means you can have WS handling on a completely separate system.
- **No Loss Restarts**:
- When you restart a bot without the proxy system, normally you would lose
many events. Users may be using commands or messages are sent that will not
be filtered. As your bot's grow this number rises dramatically. Users may
join who wont get the auto-roles or any other actions your bot should take.
With the proxy system, you can keep restarting your bot and never lose any
events. Events will be put into a queue while your bot is down(max size of
queue is customizable), once the bot is available the queue will begin
processing all events.
- **Controllers**:
- The controller aspect gives you full control over everything inside the
proxy. You can provide a function to simply override the handler. For
example, if you would like a certain function to do something different,
instead of having to fork and maintain your fork, you can just provide a
function to override.
- **Clustering With Workers**:
- Take full advantage of all your CPU cores by using workers to spread the
load. Control how many shards per worker and how many workers to maximize
efficiency!
## Usage
```ts
startGateway({
/** The bot token. */
token: "BOT_TOKEN_HERE",
/** Whether or not to use compression for gateway payloads. */
compress: true,
/** The intents you would like to enable. */
intents: ["GUILDS", "GUILD_MESSAGES"],
/** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */
maxShards: 885,
/** The first shard ID for this group of shards. */
firstShardID: 100,
/** The last shard ID for this group. If none is provided, it will default to loading all shards. */
lastShardID: 124,
/** The url to forward all payloads to. */
url: "http://urlToYourServerHere",
/** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */
shardsPerCluster: 25,
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters: 46,
/** Whether or not you want to allow automated sharding. By default this is true. */
reshard: true;
});
```
## API / Docs
```ts
// CONTROLLER LIKE INTERFACE FOR WS HANDLING
export const ws = {
/** The url that all discord payloads for the dispatch type should be sent to. */
url: "",
/** Whether or not to automatically reshard. */
reshard: true,
/** The percentage at which resharding should occur. */
reshardPercentage: 80,
/** The maximum shard ID number. Useful for zero-downtime updates or resharding. */
maxShards: 1,
/** The amount of shards to load per cluster */
shardsPerCluster: 25,
/** The maximum amount of clusters to use for your bot. */
maxClusters: 4,
/** The first shard ID to start spawning. */
firstShardID: 0,
/** The last shard ID for this cluster. */
lastShardID: 1,
/** This prop decides whether Discord allows our next shard to be started. When 1 starts, this is set to false until it is ready for the next one. */
createNextShard: true,
/** The identify payload holds the necessary data to connect and stay connected with Discords WSS. */
identifyPayload: {
token: "",
compress: false,
properties: {
$os: "linux",
$browser: "Discordeno",
$device: "Discordeno",
},
intents: 0,
shard: [0, 0],
},
botGatewayData: {
/** The WSS URL that can be used for connecting to the gateway. */
url: "wss://gateway.discord.gg/?v=8&encoding=json",
/** The recommended number of shards to use when connecting. */
shards: 1,
/** Info on the current start limit. */
sessionStartLimit: {
/** The total number of session starts the current user is allowed. */
total: 1000,
/** The remaining number of session starts the current user is allowed. */
remaining: 1000,
/** Milliseconds left until limit is reset. */
resetAfter: 0,
/** The number of identify requests allowed per 5 seconds.
* So, if you had a max concurrency of 16, and 16 shards for example, you could start them all up at the same time.
* Whereas if you had 32 shards, if you tried to start up shard 0 and 16 at the same time for example, it would not work. You can start shards 0-15 concurrently, then 16-31...
*/
maxConcurrency: 1,
},
},
shards: new Collection<number, DiscordenoShard>(),
loadingShards: new Collection<
number,
{
shardID: number;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
startedAt: number;
}
>(),
utf8decoder: new TextDecoder(),
// METHODS
/** The handler function that starts the gateway. */
startGateway,
/** The handler for spawning ALL the shards. */
spawnShards,
/** Create the websocket and adds the proper handlers to the websocket. */
createShard,
/** Begins identification of the shard to discord */
identify,
/** Begins heartbeating of the shard to keep it alive */
heartbeat,
/** Sends the discord payload to another server. */
handleDiscordPayload,
/** Tell the cluster/worker to begin identifying this shard */
tellClusterToIdentify,
/** Handle the different logs. Used for debugging. */
log,
/** Handles resharding the bot when necessary. */
resharder,
/** Cleanups loading shards that were unable to load. */
cleanupLoadingShards,
};
export interface DiscordenoShard {
/** The shard id number */
id: number;
/** The websocket for this shard */
ws: WebSocket;
/** The amount of milliseconds to wait between heartbeats */
resumeInterval: number;
/** The session id important for resuming connections. */
sessionID: string;
/** The previous sequence number, important for resuming connections. */
previousSequenceNumber: number | null;
/** Whether the shard is currently resuming. */
resuming: boolean;
heartbeat: {
/** The exact timestamp the last heartbeat was sent */
lastSentAt: number;
/** The timestamp the last heartbeat ACK was received from discord. */
lastReceivedAt: number;
/** Whether or not the heartbeat was acknowledged by discord in time. */
acknowledged: boolean;
/** Whether or not to keep heartbeating. Useful for when needing to stop heartbeating. */
keepAlive: boolean;
/** The interval between heartbeats requested by discord. */
interval: number;
/** The id of the interval, useful for stopping the interval if ws closed. */
intervalID: number;
};
}
```

View File

@@ -0,0 +1,24 @@
import { delay } from "../util/utils.ts";
import { ws } from "./ws.ts";
/** The handler to clean up shards that identified but never received a READY. */
export async function cleanupLoadingShards() {
while (ws.loadingShards.size) {
const now = Date.now();
ws.loadingShards.forEach((loadingShard) => {
console.log(
now > loadingShard.startedAt + 60000,
now,
loadingShard.startedAt,
);
// Not a minute yet. Max should be few seconds but do a minute to be safe.
if (now < loadingShard.startedAt + 60000) return;
loadingShard.reject(
`[Identify Failure] Shard ${loadingShard.shardID} has not received READY event in over a minute.`,
);
});
await delay(1000);
}
}

48
src/ws/create_shard.ts Normal file
View File

@@ -0,0 +1,48 @@
import { identify } from "./identify.ts";
import { resume } from "./resume.ts";
import { ws } from "./ws.ts";
// deno-lint-ignore require-await
export async function createShard(shardID: number) {
const socket = new WebSocket(ws.botGatewayData.url);
socket.binaryType = "arraybuffer";
socket.onerror = (errorEvent) => {
ws.log("ERROR", { shardID, error: errorEvent });
};
socket.onmessage = ({ data: message }) => handleOnMessage(message, shardID);
socket.onclose = (event) => {
ws.log("CLOSED", { shardID, payload: event });
// TODO: ENUM FOR THESE CODES?
switch (event.code) {
case 4001:
case 4002:
case 4004:
case 4005:
case 4010:
case 4011:
case 4012:
case 4013:
case 4014:
throw new Error(
event.reason || "Discord gave no reason! GG! You broke Discord!",
);
// THESE ERRORS CAN NO BE RESUMED! THEY MUST RE-IDENTIFY!
case 4003:
case 4007:
case 4008:
case 4009:
ws.log("CLOSED_RECONNECT", { shardID, payload: event });
identify(shardID, ws.maxShards);
break;
default:
resume(shardID);
break;
}
};
return socket;
}

62
src/ws/events.ts Normal file
View File

@@ -0,0 +1,62 @@
import { DiscordenoShard } from "./ws.ts";
/** The handler for logging different actions happening inside the ws. User can override and put custom handling per event. */
export function log(
type: "CLOSED",
data: { shardID: number; payload: CloseEvent },
): unknown;
export function log(
type: "CLOSED_RECONNECT",
data: { shardID: number; payload: CloseEvent },
): unknown;
export function log(
type: "ERROR",
data: Record<string, unknown> & { shardID: number },
): unknown;
export function log(
type: "HEARTBEATING",
data: { shardID: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_CLOSED",
data: { shardID: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_DETAILS",
data: { shardID: number; interval: number; shard: DiscordenoShard },
): unknown;
export function log(
type: "HEARTBEATING_STARTED",
data: { shardID: number; interval: number },
): unknown;
export function log(
type: "IDENTIFYING",
data: { shardID: number; maxShards: number },
): unknown;
export function log(
type: "INVALID_SESSION",
data: { shardID: number; payload: DiscordPayload },
): unknown;
export function log(type: "RAW", data: Record<string, unknown>): unknown;
export function log(type: "RECONNECT", data: { shardID: number }): unknown;
export function log(type: "RESUMED", data: { shardID: number }): unknown;
export function log(type: "RESUMING", data: { shardID: number }): unknown;
export function log(
type:
| "CLOSED"
| "CLOSED_RECONNECT"
| "ERROR"
| "HEARTBEATING"
| "HEARTBEATING_CLOSED"
| "HEARTBEATING_DETAILS"
| "HEARTBEATING_STARTED"
| "IDENTIFYING"
| "INVALID_SESSION"
| "RAW"
| "RECONNECT"
| "RESUMED"
| "RESUMING",
data: unknown,
) {
console.log(type, data);
}

View File

@@ -0,0 +1,18 @@
import { ws } from "./ws.ts";
/** Handler for processing all dispatch payloads that should be sent/forwarded to another server/vps/process. */
export async function handleDiscordPayload(
data: DiscordPayload,
shardID: number,
) {
await fetch(ws.url, {
headers: {
authorization: ws.secretKey,
},
method: "post",
body: JSON.stringify({
shardID,
data,
}),
}).catch(console.error);
}

View File

@@ -0,0 +1,96 @@
import { identify } from "./identify.ts";
import { resume } from "./resume.ts";
import { ws } from "./ws.ts";
import { decompressWith } from "./deps.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { DiscordReady } from "../types/gateway/ready.ts";
/** Handler for handling every message event from websocket. */
// deno-lint-ignore no-explicit-any
export function handleOnMessage(message: any, shardID: number) {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(
message,
0,
(slice: Uint8Array) => ws.utf8decoder.decode(slice),
);
}
if (typeof message !== "string") return;
const messageData = JSON.parse(message);
ws.log("RAW", messageData);
switch (messageData.op) {
case DiscordGatewayOpcodes.Hello:
ws.heartbeat(
shardID,
(messageData.d as DiscordHeartbeat).heartbeat_interval,
);
break;
case DiscordGatewayOpcodes.HeartbeatACK:
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.heartbeat.acknowledged = true;
}
break;
case DiscordGatewayOpcodes.Reconnect:
ws.log("RECONNECT", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
case DiscordGatewayOpcodes.InvalidSession:
ws.log("INVALID_SESSION", { shardID, payload: messageData });
// When d is false we need to reidentify
if (!messageData.d) {
identify(shardID, ws.maxShards);
break;
}
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = true;
}
resume(shardID);
break;
default:
if (messageData.t === "RESUMED") {
ws.log("RESUMED", { shardID });
if (ws.shards.has(shardID)) {
ws.shards.get(shardID)!.resuming = false;
}
break;
}
// Important for RESUME
if (messageData.t === "READY") {
const shard = ws.shards.get(shardID);
if (shard) {
shard.sessionID = (messageData.d as DiscordReady).session_id;
}
ws.loadingShards.get(shardID)?.resolve(true);
ws.loadingShards.delete(shardID);
}
// Update the sequence number if it is present
if (messageData.s) {
const shard = ws.shards.get(shardID);
if (shard) {
shard.previousSequenceNumber = messageData.s;
}
}
ws.handleDiscordPayload(messageData, shardID);
break;
}
}

40
src/ws/heartbeat.ts Normal file
View File

@@ -0,0 +1,40 @@
import { ws } from "./ws.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
export function heartbeat(shardID: number, interval: number) {
ws.log("HEARTBEATING_STARTED", { shardID, interval });
const shard = ws.shards.get(shardID);
if (!shard) return;
ws.log("HEARTBEATING_DETAILS", { shardID, interval, shard });
shard.heartbeat.keepAlive = true;
shard.heartbeat.acknowledged = false;
shard.heartbeat.lastSentAt = Date.now();
shard.heartbeat.interval = interval;
shard.heartbeat.intervalID = setInterval(() => {
const currentShard = ws.shards.get(shardID);
if (!currentShard) return;
ws.log("HEARTBEATING", { shardID, shard: currentShard });
if (
currentShard.ws.readyState === WebSocket.CLOSED ||
!currentShard.heartbeat.keepAlive
) {
ws.log("HEARTBEATING_CLOSED", { shardID, shard: currentShard });
// STOP THE HEARTBEAT
return clearInterval(currentShard.heartbeat.intervalID);
}
currentShard.ws.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Heartbeat,
d: currentShard.previousSequenceNumber,
}),
);
}, interval);
}

47
src/ws/identify.ts Normal file
View File

@@ -0,0 +1,47 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
export async function identify(shardID: number, maxShards: number) {
ws.log("IDENTIFYING", { shardID, maxShards });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// Identify can just set/reset the settings for the shard
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID: "",
previousSequenceNumber: 0,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
socket.onopen = () => {
socket.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Identify,
d: { ...ws.identifyPayload, shard: [shardID, maxShards] },
}),
);
};
return new Promise((resolve, reject) => {
ws.loadingShards.set(shardID, {
shardID,
resolve,
reject,
startedAt: Date.now(),
});
ws.cleanupLoadingShards();
});
}

View File

@@ -1,2 +1,14 @@
export * from "./shard.ts";
export * from "./shard_manager.ts";
export * from "./cleanup_loading_shards.ts";
export * from "./create_shard.ts";
export * from "./events.ts";
export * from "./handle_discord_payload.ts";
export * from "./handle_on_message.ts";
export * from "./heartbeat.ts";
export * from "./identify.ts";
export * from "./resharder.ts";
export * from "./resume.ts";
export * from "./spawn_shards.ts";
export * from "./start_gateway_options.ts";
export * from "./start_gateway.ts";
export * from "./tell_cluster_to_identify.ts";
export * from "./ws.ts";

30
src/ws/resharder.ts Normal file
View File

@@ -0,0 +1,30 @@
import { ws } from "./ws.ts";
import { getGatewayBot } from "../helpers/misc/get_gateway_bot.ts";
/** The handler to automatically reshard when necessary. */
export async function resharder() {
const data = await getGatewayBot();
const percentage = (data.shards - ws.maxShards) / ws.maxShards * 100;
// Less than necessary% being used so do nothing
if (percentage < ws.reshardPercentage) return;
// Don't have enough identify rate limits to reshard
if (data.session_start_limit.remaining < data.shards) return;
// Begin resharding
ws.maxShards = data.shards;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
}

54
src/ws/resume.ts Normal file
View File

@@ -0,0 +1,54 @@
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { ws } from "./ws.ts";
export async function resume(shardID: number) {
ws.log("RESUMING", { shardID });
// CREATE A SHARD
const socket = await ws.createShard(shardID);
// NOW WE HANDLE RESUMING THIS SHARD
// Get the old data for this shard necessary for resuming
const oldShard = ws.shards.get(shardID);
if (oldShard) {
// HOW TO CLOSE OLD SHARD SOCKET!!!
oldShard.ws.close(4009, "Resuming the shard, closing old shard.");
// STOP OLD HEARTBEAT
clearInterval(oldShard.heartbeat.intervalID);
}
const sessionID = oldShard?.sessionID || "";
const previousSequenceNumber = oldShard?.previousSequenceNumber || 0;
ws.shards.set(shardID, {
id: shardID,
ws: socket,
resumeInterval: 0,
sessionID,
previousSequenceNumber,
resuming: false,
heartbeat: {
lastSentAt: 0,
lastReceivedAt: 0,
acknowledged: false,
keepAlive: false,
interval: 0,
intervalID: 0,
},
});
// Resume on open
socket.onopen = () => {
socket.send(
JSON.stringify({
op: DiscordGatewayOpcodes.Resume,
d: {
token: ws.identifyPayload.token,
session_id: sessionID,
seq: previousSequenceNumber,
},
}),
);
};
}

View File

@@ -1,383 +0,0 @@
import { botGatewayData, eventHandlers, proxyWSURL } from "../bot.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { Collection } from "../util/collection.ts";
import { delay } from "../util/utils.ts";
import { decompressWith } from "./deps.ts";
import { handleDiscordPayload } from "./shard_manager.ts";
export const basicShards = new Collection<number, BasicShard>();
const heartbeating = new Map<number, boolean>();
const utf8decoder = new TextDecoder();
const RequestMembersQueue: RequestMemberQueuedRequest[] = [];
let processQueue = false;
export function createShard(
data: DiscordBotGatewayData,
identifyPayload: DiscordIdentify,
resuming = false,
shardId = 0,
) {
const oldShard = basicShards.get(shardId);
const ws = new WebSocket(proxyWSURL);
ws.binaryType = "arraybuffer";
const basicShard: BasicShard = {
id: shardId,
ws,
resumeInterval: 0,
sessionId: oldShard?.sessionId || "",
previousSequenceNumber: oldShard?.previousSequenceNumber || 0,
needToResume: false,
ready: false,
unavailableGuildIds: new Set<string>(),
};
basicShards.set(basicShard.id, basicShard);
ws.onopen = () => {
if (!resuming) {
// Initial identify with the gateway
identify(basicShard, identifyPayload);
} else {
resume(basicShard, identifyPayload);
}
};
ws.onerror = (errorEvent) => {
eventHandlers.debug?.({
type: "wsError",
data: { shardId: basicShard.id, ...errorEvent },
});
};
ws.onmessage = async ({ data: message }) => {
if (message instanceof ArrayBuffer) {
message = new Uint8Array(message);
}
if (message instanceof Uint8Array) {
message = decompressWith(
message,
0,
(slice: Uint8Array) => utf8decoder.decode(slice),
);
}
if (typeof message === "string") {
const messageData = JSON.parse(message);
if (!messageData.t) eventHandlers.rawGateway?.(messageData);
switch (messageData.op) {
case DiscordGatewayOpcodes.Hello:
if (!heartbeating.has(basicShard.id)) {
await heartbeat(
basicShard,
(messageData.d as DiscordHello).heartbeat_interval,
identifyPayload,
data,
);
}
break;
case DiscordGatewayOpcodes.HeartbeatACK:
heartbeating.set(shardId, true);
break;
case DiscordGatewayOpcodes.Reconnect:
eventHandlers.debug?.(
{ type: "gatewayReconnect", data: { shardId: basicShard.id } },
);
basicShard.needToResume = true;
await resumeConnection(data, identifyPayload, basicShard.id);
break;
case DiscordGatewayOpcodes.InvalidSession:
eventHandlers.debug?.(
{
type: "gatewayInvalidSession",
data: { shardId: basicShard.id, data },
},
);
// When d is false we need to reidentify
if (!messageData.d) {
createShard(data, identifyPayload, false, shardId);
break;
}
basicShard.needToResume = true;
await resumeConnection(data, identifyPayload, basicShard.id);
break;
default:
if (messageData.t === "RESUMED") {
eventHandlers.debug?.(
{ type: "gatewayResumed", data: { shardId: basicShard.id } },
);
basicShard.needToResume = false;
break;
}
// Important for RESUME
if (messageData.t === "READY") {
basicShard.sessionId = (messageData.d as ReadyPayload).session_id;
}
// Update the sequence number if it is present
if (messageData.s) basicShard.previousSequenceNumber = messageData.s;
await handleDiscordPayload(messageData, basicShard.id);
break;
}
}
};
ws.onclose = async ({ reason, code, wasClean }) => {
eventHandlers.debug?.(
{
type: "wsClose",
data: { shardId: basicShard.id, code, reason, wasClean },
},
);
if ([4001, 4002, 4004, 4005, 4010, 4011, 4012, 4013, 4014].includes(code)) {
throw new Error(reason);
} else if ([4000, 4003, 4007, 4008, 4009].includes(code)) {
eventHandlers.debug?.({
type: "wsReconnect",
data: { shardId: basicShard.id, code, reason, wasClean },
});
createShard(data, identifyPayload, false, shardId);
} else if (code === 3069 && reason === "[discordeno] requested closure") {
return;
} else {
basicShard.needToResume = true;
await resumeConnection(botGatewayData, identifyPayload, shardId);
}
};
}
function identify(shard: BasicShard, payload: DiscordIdentify) {
eventHandlers.debug?.(
{
type: "gatewayIdentify",
data: {
shardId: shard.id,
},
},
);
sendWS({
op: DiscordGatewayOpcodes.Identify,
d: { ...payload, shard: [shard.id, payload.shard[1]] },
}, shard.id);
}
function resume(shard: BasicShard, payload: DiscordIdentify) {
sendWS({
op: DiscordGatewayOpcodes.Resume,
d: {
token: payload.token,
session_id: shard.sessionId,
seq: shard.previousSequenceNumber,
},
}, shard.id);
}
async function heartbeat(
shard: BasicShard,
interval: number,
payload: DiscordIdentify,
data: DiscordGetGatewayBot,
) {
// We lost socket connection between heartbeats, resume connection
if (shard.ws.readyState === WebSocket.CLOSED) {
shard.needToResume = true;
await resumeConnection(data, payload, shard.id);
heartbeating.delete(shard.id);
return;
}
if (heartbeating.has(shard.id)) {
const receivedACK = heartbeating.get(shard.id);
// If a ACK response was not received since last heartbeat, issue invalid session close
if (!receivedACK) {
eventHandlers.debug?.(
{
type: "gatewayHeartbeatStopped",
data: {
interval,
previousSequenceNumber: shard.previousSequenceNumber,
shardId: shard.id,
},
},
);
return shard.ws.close(4009, "Session timed out");
}
}
// Set it to false as we are issuing a new heartbeat
heartbeating.set(shard.id, false);
sendWS(
{ op: DiscordGatewayOpcodes.Heartbeat, d: shard.previousSequenceNumber },
shard.id,
);
eventHandlers.debug?.(
{
type: "gatewayHeartbeat",
data: {
interval,
previousSequenceNumber: shard.previousSequenceNumber,
shardId: shard.id,
},
},
);
await delay(interval);
await heartbeat(shard, interval, payload, data);
}
async function resumeConnection(
data: DiscordGetGatewayBot,
payload: DiscordIdentify,
shardId: number,
) {
const shard = basicShards.get(shardId);
if (!shard) {
eventHandlers.debug?.(
{ type: "missingShard", data: { shardId: shardId } },
);
return;
}
if (!shard.needToResume) return;
eventHandlers.debug?.({ type: "gatewayResume", data: { shardId: shard.id } });
// Run it once
createShard(data, payload, true, shard.id);
// Then retry every 15 seconds
await delay(1000 * 15);
if (shard.needToResume) await resumeConnection(data, payload, shardId);
}
export async function requestGuildMembers(
guildId: string,
shardId: number,
nonce: string,
options?: FetchMembersOptions,
queuedRequest = false,
) {
const shard = basicShards.get(shardId);
// This request was not from this queue so we add it to queue first
if (!queuedRequest) {
RequestMembersQueue.push({
guildId,
shardId,
nonce,
options,
});
if (!processQueue) {
processQueue = true;
return processGatewayQueue();
}
return;
}
// If its closed add back to queue to redo on resume
if (shard?.ws.readyState === WebSocket.CLOSED) {
await requestGuildMembers(guildId, shardId, nonce, options);
return;
}
sendWS({
op: DiscordGatewayOpcodes.RequestGuildMembers,
d: {
guild_id: guildId,
// If a query is provided use it, OR if a limit is NOT provided use ""
query: options?.query || (options?.limit ? undefined : ""),
limit: options?.limit || 0,
presences: options?.presences || false,
user_ids: options?.userIds,
nonce,
},
}, shard?.id);
}
async function processGatewayQueue() {
if (!RequestMembersQueue.length) {
processQueue = false;
return;
}
await Promise.all(basicShards.map(async (shard) => {
const index = RequestMembersQueue.findIndex((q) => q.shardId === shard.id);
// 2 events per second is the rate limit.
const request = RequestMembersQueue[index];
if (request) {
eventHandlers.debug?.(
{
type: "requestMembersProcessing",
data: {
remaining: RequestMembersQueue.length,
request,
},
},
);
await requestGuildMembers(
request.guildId,
request.shardId,
request.nonce,
request.options,
true,
);
// Remove item from queue
RequestMembersQueue.splice(index, 1);
const secondIndex = RequestMembersQueue.findIndex((q) =>
q.shardId === shard.id
);
const secondRequest = RequestMembersQueue[secondIndex];
if (secondRequest) {
eventHandlers.debug?.(
{
type: "requestMembersProcessing",
data: {
remaining: RequestMembersQueue.length,
request,
},
},
);
await requestGuildMembers(
secondRequest.guildId,
secondRequest.shardId,
secondRequest.nonce,
secondRequest.options,
true,
);
// Remove item from queue
RequestMembersQueue.splice(secondIndex, 1);
}
}
}));
await delay(1500);
await processGatewayQueue();
}
/** Enqueues the specified data to be transmitted to the server over the WebSocket connection, */
export function sendWS(payload: DiscordGatewayPayload, shardId = 0) {
const shard = basicShards.get(shardId);
if (!shard) return false;
const serialized = JSON.stringify(payload);
shard.ws.send(serialized);
return true;
}
/** Closes the WebSocket connection or connection attempt */
export function closeWS(shardId = 0) {
const shard = basicShards.get(shardId);
if (!shard) return false;
shard.ws.close(3069, "[discordeno] requested closure");
return true;
}

View File

@@ -1,100 +0,0 @@
import { eventHandlers } from "../bot.ts";
import { cache } from "../cache.ts";
import { handlers } from "../handlers/mod.ts";
import { Member } from "../structures/mod.ts";
import { DiscordGatewayOpcodes } from "../types/codes/gateway_opcodes.ts";
import { Collection } from "../util/collection.ts";
import { delay } from "../util/utils.ts";
import { createShard, requestGuildMembers } from "./mod.ts";
let createNextShard = true;
/** This function is meant to be used on the ready event to alert the library to start the next shard. */
export function allowNextShard(enabled = true) {
createNextShard = enabled;
}
export async function spawnShards(
data: DiscordBotGatewayData,
payload: DiscordIdentify,
shardId: number,
lastShardId: number,
skipChecks?: number,
) {
// All shards on this worker have started! Cancel out.
if (shardId >= lastShardId) return;
if (skipChecks) {
payload.shard = [
shardId,
data.shards > lastShardId ? data.shards : lastShardId,
];
// Start The shard
createShard(data, payload, false, shardId);
// Spawn next shard
await spawnShards(
data,
payload,
shardId + 1,
lastShardId,
skipChecks - 1,
);
return;
}
// Make sure we can create a shard or we are waiting for shards to connect still.
if (createNextShard) {
createNextShard = false;
// Start the next few shards based on max concurrency
await spawnShards(
data,
payload,
shardId,
lastShardId,
data.session_start_limit.max_concurrency,
);
return;
}
await delay(1000);
await spawnShards(data, payload, shardId, lastShardId, skipChecks);
}
export async function handleDiscordPayload(
data: DiscordPayload,
shardId: number,
) {
eventHandlers.raw?.(data);
await eventHandlers.dispatchRequirements?.(data, shardId);
switch (data.op) {
case DiscordGatewayOpcodes.HeartbeatACK:
// In case the user wants to listen to heartbeat responses
return eventHandlers.heartbeat?.();
case DiscordGatewayOpcodes.Dispatch:
if (!data.t) return;
// Run the appropriate handler for this event.
return handlers[data.t]?.(data, shardId);
default:
return;
}
}
export async function requestAllMembers(
guildId: string,
shardId: number,
resolve: (
value: Collection<string, Member> | PromiseLike<Collection<string, Member>>,
) => void,
options?: FetchMembersOptions,
) {
const nonce = `${guildId}-${Date.now()}`;
cache.fetchAllMembersProcessingRequests.set(nonce, resolve);
await requestGuildMembers(
guildId,
shardId,
nonce,
options,
);
}

52
src/ws/spawn_shards.ts Normal file
View File

@@ -0,0 +1,52 @@
import { Collection } from "../util/collection.ts";
import { ws } from "./ws.ts";
/** Begin spawning shards. */
export function spawnShards(firstShardID = 0) {
/** Stored as bucketID: [clusterID, [ShardIDs]] */
const buckets = new Collection<number, number[][]>();
const maxShards = ws.maxShards || ws.botGatewayData.shards;
let cluster = 0;
for (
let index = firstShardID;
index < ws.botGatewayData.sessionStartLimit.maxConcurrency;
index++
) {
// ORGANIZE ALL SHARDS INTO THEIR OWN BUCKETS
for (let i = 0; i < maxShards; i++) {
const bucketID = i % ws.botGatewayData.sessionStartLimit.maxConcurrency;
const bucket = buckets.get(bucketID);
if (!bucket) {
// Create the bucket since it doesnt exist
buckets.set(bucketID, [[cluster, i]]);
if (cluster + 1 <= ws.maxClusters) cluster++;
} else {
// FIND A QUEUE IN THIS BUCKET THAT HAS SPACE
const queue = bucket.find((q) => q.length < ws.shardsPerCluster + 1);
if (queue) {
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
queue.push(i);
} else {
if (cluster + 1 <= ws.maxClusters) cluster++;
// ADD A NEW QUEUE FOR THIS SHARD
bucket.push([cluster, i]);
}
}
}
}
// SPREAD THIS OUT TO DIFFERENT CLUSTERS TO BEGIN STARTING UP
buckets.forEach(async (bucket, bucketID) => {
for (const [clusterID, ...queue] of bucket) {
let shardID = queue.shift();
while (shardID !== undefined) {
await ws.tellClusterToIdentify(clusterID as number, shardID, bucketID);
shardID = queue.shift();
}
}
});
}

55
src/ws/start_gateway.ts Normal file
View File

@@ -0,0 +1,55 @@
import { StartGatewayOptions } from "./start_gateway_options.ts";
import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts";
import { ws } from "./ws.ts";
/** ADVANCED DEVS ONLY!!!!!!
* Starts the standalone gateway.
* This will require starting the bot separately.
*/
export async function startGateway(options: StartGatewayOptions) {
ws.identifyPayload.token = `Bot ${options.token}`;
ws.secretKey = options.secretKey;
ws.firstShardID = options.firstShardID;
ws.url = options.url;
if (options.shardsPerCluster) ws.shardsPerCluster = options.shardsPerCluster;
if (options.maxClusters) ws.maxClusters = options.maxClusters;
if (options.compress) {
ws.identifyPayload.compress = options.compress;
}
if (options.reshard) ws.reshard = options.reshard;
// Once an hour check if resharding is necessary
setInterval(ws.resharder, 1000 * 60 * 60);
ws.identifyPayload.intents = options.intents.reduce(
(
bits,
next,
) => (bits |= typeof next === "string"
? DiscordGatewayIntents[next]
: next),
0,
);
const data = (await fetch(`https://discord.com/api/gateway/bot`, {
headers: { Authorization: ws.identifyPayload.token },
}).then((res) => res.json())) as DiscordBotGatewayData;
ws.maxShards = options.maxShards || data.shards;
ws.lastShardID = options.lastShardID || data.shards - 1;
// TODO: ALL THE FOLLOWING CAN BE REPLACED BY THIS 1 LINE
// ws.botGatewayData = snakeToCamel(await getGatewayBot())
ws.botGatewayData.sessionStartLimit.total = data.session_start_limit.total;
ws.botGatewayData.sessionStartLimit.resetAfter =
data.session_start_limit.reset_after;
ws.botGatewayData.sessionStartLimit.remaining =
data.session_start_limit.remaining;
ws.botGatewayData.sessionStartLimit.maxConcurrency =
data.session_start_limit.max_concurrency;
ws.botGatewayData.shards = data.shards;
ws.botGatewayData.url = data.url;
ws.spawnShards(ws.firstShardID);
ws.cleanupLoadingShards();
}

View File

@@ -0,0 +1,26 @@
import { DiscordGatewayIntents } from "../types/gateway/gateway_intents.ts";
export interface StartGatewayOptions {
/** The bot token. */
token: string;
/** Whether or not to use compression for gateway payloads. */
compress?: boolean;
/** The intents you would like to enable. */
intents: (DiscordGatewayIntents | keyof typeof DiscordGatewayIntents)[];
/** The max amount of shards used for identifying. This can be useful for zero-downtime updates or resharding. */
maxShards?: number;
/** The first shard ID for this group of shards. */
firstShardID: number;
/** The last shard ID for this group. If none is provided, it will default to loading all shards. */
lastShardID?: number;
/** The url to forward all payloads to. */
url: string;
/** The amount of shards per cluster. By default this is 25. Use this to spread the load from shards to different CPU cores. */
shardsPerCluster?: number;
/** The maximum amount of clusters available. By default this is 4. Another way to think of cluster is how many CPU cores does your server/machine have. */
maxClusters?: number;
/** Whether or not you want to allow automated sharding. By default this is true. */
reshard?: boolean;
/** The authorization key that the bot http server will expect. */
secretKey: string;
}

View File

@@ -0,0 +1,18 @@
import { ws } from "./ws.ts";
/** Allows users to hook in and change to communicate to different clusters across different servers or anything they like. For example using redis pubsub to talk to other servers. */
export async function tellClusterToIdentify(
workerID: number,
shardID: number,
bucketID: number,
) {
// When resharding this may exist already
const oldShard = ws.shards.get(shardID);
// TODO: Use workers
await ws.identify(shardID, ws.maxShards);
if (oldShard) {
oldShard.ws.close(4009, "Resharded!");
}
}

133
src/ws/ws.ts Normal file
View File

@@ -0,0 +1,133 @@
import { Collection } from "../util/collection.ts";
import { log } from "./events.ts";
import { resharder } from "./resharder.ts";
import { startGateway } from "./start_gateway.ts";
import { spawnShards } from "./spawn_shards.ts";
import { createShard } from "./create_shard.ts";
import { identify } from "./identify.ts";
import { heartbeat } from "./heartbeat.ts";
import { handleDiscordPayload } from "./handle_discord_payload.ts";
import { tellClusterToIdentify } from "./tell_cluster_to_identify.ts";
import { cleanupLoadingShards } from "./cleanup_loading_shards.ts";
import { handleOnMessage } from "./handle_on_message.ts";
// CONTROLLER LIKE INTERFACE FOR WS HANDLING
export const ws = {
/** The secret key authorization header the bot will expect when sending payloads */
secretKey: "",
/** The url that all discord payloads for the dispatch type should be sent to. */
url: "",
/** Whether or not to automatically reshard. */
reshard: true,
/** The percentage at which resharding should occur. */
reshardPercentage: 80,
/** The maximum shard ID number. Useful for zero-downtime updates or resharding. */
maxShards: 1,
/** The amount of shards to load per cluster */
shardsPerCluster: 25,
/** The maximum amount of clusters to use for your bot. */
maxClusters: 4,
/** The first shard ID to start spawning. */
firstShardID: 0,
/** The last shard ID for this cluster. */
lastShardID: 1,
/** This prop decides whether Discord allows our next shard to be started. When 1 starts, this is set to false until it is ready for the next one. */
createNextShard: true,
/** The identify payload holds the necessary data to connect and stay connected with Discords WSS. */
identifyPayload: {
token: "",
compress: false,
properties: {
$os: "linux",
$browser: "Discordeno",
$device: "Discordeno",
},
intents: 0,
shard: [0, 0],
},
botGatewayData: {
/** The WSS URL that can be used for connecting to the gateway. */
url: "wss://gateway.discord.gg/?v=8&encoding=json",
/** The recommended number of shards to use when connecting. */
shards: 1,
/** Info on the current start limit. */
sessionStartLimit: {
/** The total number of session starts the current user is allowed. */
total: 1000,
/** The remaining number of session starts the current user is allowed. */
remaining: 1000,
/** Milliseconds left until limit is reset. */
resetAfter: 0,
/** The number of identify requests allowed per 5 seconds.
* So, if you had a max concurrency of 16, and 16 shards for example, you could start them all up at the same time.
* Whereas if you had 32 shards, if you tried to start up shard 0 and 16 at the same time for example, it would not work. You can start shards 0-15 concurrently, then 16-31...
*/
maxConcurrency: 1,
},
},
shards: new Collection<number, DiscordenoShard>(),
loadingShards: new Collection<
number,
{
shardID: number;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
startedAt: number;
}
>(),
utf8decoder: new TextDecoder(),
// METHODS
/** The handler function that starts the gateway. */
startGateway,
/** The handler for spawning ALL the shards. */
spawnShards,
/** Create the websocket and adds the proper handlers to the websocket. */
createShard,
/** Begins identification of the shard to discord */
identify,
/** Begins heartbeating of the shard to keep it alive */
heartbeat,
/** Sends the discord payload to another server. */
handleDiscordPayload,
/** Tell the cluster/worker to begin identifying this shard */
tellClusterToIdentify,
/** Handle the different logs. Used for debugging. */
log,
/** Handles resharding the bot when necessary. */
resharder,
/** Cleanups loading shards that were unable to load. */
cleanupLoadingShards,
/** Handles the message events from websocket */
handleOnMessage,
};
export interface DiscordenoShard {
/** The shard id number */
id: number;
/** The websocket for this shard */
ws: WebSocket;
/** The amount of milliseconds to wait between heartbeats */
resumeInterval: number;
/** The session id important for resuming connections. */
sessionID: string;
/** The previous sequence number, important for resuming connections. */
previousSequenceNumber: number | null;
/** Whether the shard is currently resuming. */
resuming: boolean;
heartbeat: {
/** The exact timestamp the last heartbeat was sent */
lastSentAt: number;
/** The timestamp the last heartbeat ACK was received from discord. */
lastReceivedAt: number;
/** Whether or not the heartbeat was acknowledged by discord in time. */
acknowledged: boolean;
/** Whether or not to keep heartbeating. Useful for when needing to stop heartbeating. */
keepAlive: boolean;
/** The interval between heartbeats requested by discord. */
interval: number;
/** The id of the interval, useful for stopping the interval if ws closed. */
intervalID: number;
};
}