docs(big-bot-guide): Updated to dd v16, fixed bugs, added missing code (#2501)

* docs(big-bot): Updated to dd v16, fixed bugs with it (cache), added missing code for events listener in bot part

* Add dd server link to big bot guide docs

* Fix: typo
This commit is contained in:
Awesome Stickz
2022-10-08 02:46:21 +05:30
committed by GitHub
parent a0f130db50
commit a7d6529fe8
5 changed files with 744 additions and 288 deletions

View File

@@ -16,7 +16,7 @@ play, when deciding where to keep your cache. Another reason to use this is, whe
gateway process.
- Start rest process
- Start event handler process(bot)
- Start event handler process (bot)
- Start gateway process.
- Guild create events arrive providing all the data needed to cache in the bot process.
- Restart event handler process(maybe for an update/reboot)
@@ -54,6 +54,8 @@ CREATE TABLE IF NOT EXISTS "users" (
)
```
Note that you can cache only properties you want and leave all other properties that you won't use.
Now that we have this schema ready for our users cache. Go ahead and repeat this for all other cache tables.
Cache Tables:

View File

@@ -31,84 +31,12 @@ import { psql } from "./cache/mod.ts";
export const bot = createBot({
token: DISCORD_TOKEN,
botId: 270010330782892032n,
// applicationId: 270010330782892032,
intents: Intents.Guilds | Intents.GuildMessages,
events: {
messageCreate: function (bot, message) {
console.log("message arrived");
},
},
cache: {
isAsync: true,
customTableCreator: function (table) {
const tables = {
users: "users",
channels: "channels",
guilds: "guilds",
messages: "messages",
presences: "presences",
threads: "threads",
unavailableGuilds: "unavailableGuilds",
members: "members",
};
if (!tables[table]) throw new Error("I HACKED ITOH!");
return {
/** Get a single item from the table */
async get(key) {
return await psql`SELECT * FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`;
},
/** Completely empty this table. */
async clear() {
await psql`TRUNCATE TABLE ${psql(tables[table])}`;
},
/** Delete the data related to this key from table. */
async delete(key) {
await psql`DELETE FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`;
return true;
},
/** Check if there is data assigned to this key. */
async has(key) {
return Boolean(
await psql`SELECT 1 FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`,
);
},
/** Check how many items are stored in this table. */
async size() {
return (await psql`SELECT COUNT("id") FROM ${psql(tables[table])}`)
.count;
},
/** Store new data to this table. */
async set(key, data) {
await psql`INSERT INTO ${psql(tables[table])} ${
psql(
data,
...Object.keys(data),
)
}`;
return true;
},
// THESE TWO ARE USELESS FOR CUSTOM CACHE BUT NEED TO SHUT UP TS ERRORS
async forEach(callback) {},
async filter(callback) {
return new Collection();
},
};
},
},
});
```
@@ -131,50 +59,66 @@ Alright that was a lot of code. Now let's break it down little by little.
- `events`: These are your event handler functions. When a MESSAGE_CREATE event arrives from Discord it will be
processed here. We will set up the routing to run these functions later in the guide but for now you can see how to
set it up. Note, you can create these functions in separate files and just import them here as you wish.
- `cache`: This is going to be the cache part. We will discuss this more below.
### Understanding Cache Option
## Using Your Cache
Since we are using a standalone gateway a custom cache is essentially required as explained in step 3 of this guide.
Please remember, to mark the cache as `async`
Since we are using a standalone gateway, a custom cache is essentially required as explained in step 3 of this guide.
Here we'll have some basic functions to make use of the cache we created in step 3.
```ts
cache: {
isAsync: true,
}
```
When you opt into the async cache, you must also provide a table creator function. This will not actually create any
tables but it will create an object with methods to manage your "tables". Man we need a better name for this. Please
send recommendations to @Skillz4Killz in discord. Thanks. Until then, please blame wolf for the terrible name. :)
Alrighty, now let's dig deeper into this function.
```ts
const tables = {
users: "users",
channels: "channels",
guilds: "guilds",
messages: "messages",
presences: "presences",
threads: "threads",
unavailableGuilds: "unavailableGuilds",
members: "members",
const cache = {
/** Get a single item from the table */
async get(key) {
return await psql`SELECT * FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`;
},
/** Completely empty this table. */
async clear() {
await psql`TRUNCATE TABLE ${psql(tables[table])}`;
},
/** Delete the data related to this key from table. */
async delete(key) {
await psql`DELETE FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`;
return true;
},
/** Check if there is data assigned to this key. */
async has(key) {
return Boolean(
await psql`SELECT 1 FROM ${
psql(
tables[table],
)
} WHERE "id" = ${psql.types.bigint(key)}`,
);
},
/** Check how many items are stored in this table. */
async size() {
return (await psql`SELECT COUNT("id") FROM ${psql(tables[table])}`)
.count;
},
/** Store new data to this table. */
async set(key, data) {
await psql`INSERT INTO ${psql(tables[table])} ${
psql(
data,
...Object.keys(data),
)
}`;
return true;
},
// THESE TWO ARE USELESS FOR CUSTOM CACHE BUT NEED TO SHUT UP TS ERRORS
async forEach(callback) {},
async filter(callback) {
return new Collection();
},
};
if (!tables[table]) throw new Error("I HACKED ITOH!");
```
This part of the code is only going to make sense if you are used to PGSQL. To prevent any attacks here we will forcibly
control which table will be used.
This function must return an object with several methods on it. You can see the methods above.
```ts
/** Get a single item from the table */
async get(key) {
// WHATEVER CODE YOU WOULD LIKE TO USE HERE
}
```
You can insert any code you desire for your cache system here. Since we were using PGSQL, we used sql queries to make
@@ -260,3 +204,57 @@ yourself a headache trying to maintain your fork with updates.
Remember, this is a separate process we need to make sure we are listening to incoming events from our gateway
instances. Since we used http in our Gateway step, we can create an http listener here as well.
Create a file in a path like `src/bot/gatewayEventsListener.ts`
Now we should create a http listener, check for authorization in headers, run `bot.events.raw` and `bot.handlers[event]`
```ts
import { DiscordGatewayPayload } from "discordeno";
import { EVENT_HANDLER_PORT, REST_AUTHORIZATION } from "../../configs.ts";
const server = Deno.listen({ port: EVENT_HANDLER_PORT });
// Connections to the server will be yielded up as an async iterable.
for await (const conn of server) {
// In order to not be blocking, we need to handle each connection individually
// in its own async function.
handleRequest(conn);
}
async function handleRequest(conn: Deno.Conn) {
// This "upgrades" a network connection into an HTTP connection.
const httpConn = Deno.serveHttp(conn);
// Each request sent over the HTTP connection will be yielded as an async
// iterator from the HTTP connection.
for await (const requestEvent of httpConn) {
if (!REST_AUTHORIZATION || REST_AUTHORIZATION !== requestEvent.request.headers.get("AUTHORIZATION")) {
return requestEvent.respondWith(
new Response(JSON.stringify({ error: "Invalid authorization key." }), {
status: 401,
}),
);
}
const json = (await requestEvent.request.json()) as {
message: DiscordGatewayPayload;
shardId: number;
};
// Run raw event.
bot.events.raw(bot, json.message, json.shardId);
if (json.message.t && json.message.t !== "RESUMED") {
// When a guild or something isn't in cache this will fetch it before doing anything else.
if (!["READY", "GUILD_LOADED_DD"].includes(json.message.t)) {
await bot.events.dispatchRequirements(bot, json.message, json.shardId);
}
// Run event function provided in bot.events
bot.handlers[json.message.t]?.(bot, json.message, json.shardId);
}
new Response(undefined, { status: 200 });
}
}
```

View File

@@ -84,6 +84,8 @@ Now we need to use this rest manager to call the api to get information about ho
your bot.
```ts
import { routes } from "../../deps.ts";
const rest = createRestManager({
token: DISCORD_TOKEN,
secretKey: REST_AUTHORIZATION,
@@ -91,7 +93,7 @@ const rest = createRestManager({
});
// CALL THE REST PROCESS TO GET GATEWAY DATA
const result = await rest.runMethod(rest, "GET", endpoints.GATEWAY_BOT).then((res) => ({
const gatewayBot = await rest.runMethod(rest, "GET", routes.GATEWAY_BOT()).then((res) => ({
url: res.url,
shards: res.shards,
sessionStartLimit: {
@@ -108,33 +110,20 @@ With this info, we can now create our gateway manager.
### Understanding Gateway Manager
```ts
import { INTENTS, SHARDS_PER_WORKER, TOTAL_WORKERS } from "../../configs.ts";
const gateway = createGatewayManager({
secretKey: EVENT_HANDLER_SECRET_KEY,
token: DISCORD_TOKEN,
intents: Intents.Guilds | Intents.GuildMessages,
shardsRecommended: result.shards,
sessionStartLimitTotal: result.sessionStartLimit.total,
sessionStartLimitRemaining: result.sessionStartLimit.remaining,
sessionStartLimitResetAfter: result.sessionStartLimit.resetAfter,
maxConcurrency: result.sessionStartLimit.maxConcurrency,
maxShards: result.shards,
lastShardId: result.shards,
// debug: console.log,
handleDiscordPayload: async function (_, data, shardId) {
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
headers: {
Authorization: gateway.secretKey,
method: "POST",
body: JSON.stringify({
shardId,
data,
}),
},
})
// BELOW IS FOR SOLVING DENO MEMORY LEAK. Node users do your thing.
.then((res) => res.text())
.catch(() => null);
gatewayBot,
gatewayConfig: {
token: DISCORD_TOKEN,
intents: INTENTS,
},
totalShards: gatewayBot.shards,
shardsPerWorker: SHARDS_PER_WORKER,
totalWorkers: TOTAL_WORKERS,
// debug: console.log,
// THIS WILL BE USED LATER IN WORKER SO LEAVE IT HERE
handleDiscordPayload: () => {},
});
```
@@ -143,34 +132,21 @@ const gateway = createGatewayManager({
- `EVENT_HANDLER_SECRET_KEY` is from your configs that will be used to make sure requests sent to your event handler
process are indeed from you.
- `DISCORD_TOKEN` if you can't figure this out, this guide isn't for you. Please find another.
- `intents` pass in a number or a string of intents. Autocomplete/type-safety is provided for strings :)
**Discord Data Keys**: These keys will be the data you got from the gateway request we made earlier.
- `shardsRecommended`
- `sessionStartLimitTotal`
- `sessionStartLimitRemaining`
- `sessionStartLimitResetAfter`
- `maxConcurrency`
- `INTENTS` pass in a number or a string of intents. Autocomplete/type-safety is provided for strings :)
**Powerful Keys**
If your bot is going to be run on one process, you can re-use the data that discord gave you to connect.
- `maxShards`: is the maximum number of shards you want to use for connecting your bot. Should you think Discord is not
smart enough to recommend a good amount, use this to override their choice. Highly recommend just using theirs.
- `totalShards`: is the maximum number of shards you want to use for connecting your bot. Should you think Discord is
not smart enough to recommend a good amount, use this to override their choice. Highly recommend just using theirs.
- `lastShardId`: is the last shard you want to connect in this process.
- Using a combination of `lastShardId` & `firstShardId`, you can create several processes or even several servers to
handle different amounts of shards should your bot get that big to require horizontal scaling. You can control how
many shards each gateway manager will be responsible for.
- `reshard`: Whether or not to automatically reshard the bot when necessary with zero downtime deployment strategy.
Default: true.
- `reshardPercentage`: The % of servers to trigger a reshard. Default: 80%.
- `spawnShardDelay`: The delay in milliseconds to wait before spawning next shard.
- `useOptimalLargeBotSharding`: Whether or not the resharder should automatically switch to LARGE BOT SHARDING when you
are above 100K servers.
- `shardsPerCluster`: The amount of shards to load per worker. Discussed in detail below.
- `maxClusters`: The maximum amount of workers to use for your bot.
- `shardsPerWorker`: The amount of shards to load per worker. Discussed in detail below.
- `totalWorkers`: The maximum amount of workers to use for your bot.
#### Gateway Cache
@@ -198,48 +174,420 @@ gateway.heartbeat = function (gateway, shardId, interval) {
};
```
### Handle Discord Payloads
## Workers
One of the big things we didn't cover yet is the handler for discord payloads. This is the main sauce of your gateway
process here. This is going to take the events that the gateway manager processed and send it to your event handler. How
you wish to communicate with your event handler is up to you. For this guide, we will use http, but you can replace that
Now, we should take a minute here to talk about workers. Workers are just Clusters in Node.js
When you have a big bot and you are processing millions of events, you need to speed up that processing. Keeping it in 1
thread is not very nice since JavaScript is single threaded. This means it can only process 1 event at a time. With
workers, you can make it process several events at the same time. We mentioned the `shardsPerWorker` earlier. This
option was added to allow you to choose how many shards should be managed by each worker.
When shards are spawned, they are triggered by a method on gateway: `tellWorkerToIdentify`, so we'll have to modify it
to create workers and send message:
```ts
gateway.tellWorkerToIdentify = async (_gateway, workerId, shardId, _bucketId) => {
let worker = workers.get(workerId);
if (!worker) {
worker = createWorker(workerId);
workers.set(workerId, worker);
}
// TYPE TYPE WorkerMessage IS FROM WORKER FILE, DISCUSSED IN DETAIL BELOW
const identify: WorkerMessage = {
type: "IDENTIFY_SHARD",
shardId,
};
worker.postMessage(identify);
};
```
You can choose to replace the handler with any desired functionality you like. For example, should should you want to
create a new worker for each new workerId that appears and have that worker trigger the identify functionaly. How you
choose to handler workers is left in your care.
Now that we've setup our initial gateway manager and added `tellWorkerToIdentify` to `gateway`, we need to do the rest
of the work: creating workers, spawning shards etc.
```ts
import { EVENT_HANDLER_SECRET_KEY, EVENT_HANDLER_URL } from "../../configs.ts";
import { Worker } from "worker_threads";
import { WorkerCreateData, WorkerGetShardInfo, WorkerMessage, WorkerShardInfo, WorkerShardPayload } from "./worker.js";
// A COLLECTION OF WORKERS
const workers = new Collection<number, Worker>();
const nonces = new Collection<string, (data: any) => void>();
function createWorker(workerId: number) {
const workerData: WorkerCreateData = {
intents: gateway.manager.gatewayConfig.intents ?? 0,
token: DISCORD_TOKEN,
// TODO: PUT THIS SEPARATELY. CAN USE MULTIPLE URLS IF YOU HAVE MULTIPLE BOT PROCESSES HANDLING DIFFERENT SHARDS' EVENTS
handlerUrls: [EVENT_HANDLER_URL],
handlerAuthorization: EVENT_HANDLER_SECRET_KEY,
path: "./worker.ts",
totalShards: gateway.manager.totalShards,
workerId,
};
const worker = new Worker("./worker.js", {
workerData,
});
worker.on("message", async (data: ManagerMessage) => {
switch (data.type) {
case "REQUEST_IDENTIFY": {
await gateway.manager.requestIdentify(data.shardId);
const allowIdentify: WorkerMessage = {
type: "ALLOW_IDENTIFY",
shardId: data.shardId,
};
worker.postMessage(allowIdentify);
break;
}
case "NONCE_REPLY": {
nonces.get(data.nonce)?.(data.data);
}
}
});
return worker;
}
// TYPES WE USE
export type ManagerMessage = ManagerRequestIdentify | ManagerNonceReply<WorkerShardInfo[]>;
export type ManagerRequestIdentify = {
type: "REQUEST_IDENTIFY";
shardId: number;
};
export type ManagerNonceReply<T> = {
type: "NONCE_REPLY";
nonce: string;
data: T;
};
```
## Spawning Shards
Once you are ready and the gateway has been created as you desired, we can begin spawning the shards.
```ts
gateway.spawnShards(gateway);
```
This code now handles creating gateway manager, creating workers, spawning shards and sending the info to each workers.
Next is creating a worker file to receive these info, connecting to gateway and sending the events to bot process.
Here's the full code of `src/gateway/mod.ts`:
```ts
import {
DISCORD_TOKEN,
EVENT_HANDLER_SECRET_KEY,
EVENT_HANDLER_URL,
INTENTS,
REST_AUTHORIZATION,
REST_PORT,
SHARDS_PER_WORKER,
TOTAL_WORKERS,
} from "../../configs.ts";
import { BASE_URL, createRestManager, routes } from "../../deps.ts";
import { Worker } from "worker_threads";
import { WorkerCreateData, WorkerGetShardInfo, WorkerMessage, WorkerShardInfo, WorkerShardPayload } from "./worker.ts";
const rest = createRestManager({
token: DISCORD_TOKEN,
secretKey: REST_AUTHORIZATION,
customUrl: `http://localhost:${REST_PORT}`,
});
const rest = createRestManager({
token: DISCORD_TOKEN,
secretKey: REST_AUTHORIZATION,
customUrl: `http://localhost:${REST_PORT}`,
});
// CALL THE REST PROCESS TO GET GATEWAY DATA
const gatewayBot = await rest.runMethod(rest, "GET", routes.GATEWAY_BOT()).then((res) => ({
url: res.url,
shards: res.shards,
sessionStartLimit: {
total: res.session_start_limit.total,
remaining: res.session_start_limit.remaining,
resetAfter: res.session_start_limit.reset_after,
maxConcurrency: res.session_start_limit.max_concurrency,
},
}));
const gateway = createGatewayManager({
gatewayBot,
gatewayConfig: {
token: DISCORD_TOKEN,
intents: INTENTS,
},
totalShards: gatewayBot.shards,
shardsPerWorker: SHARDS_PER_WORKER,
totalWorkers: TOTAL_WORKERS,
// debug: console.log,
handleDiscordPayload: () => {},
tellWorkerToIdentify: async (_gateway, workerId, shardId, _bucketId) => {
let worker = workers.get(workerId);
if (!worker) {
worker = createWorker(workerId);
workers.set(workerId, worker);
}
// TYPE TYPE WorkerMessage IS FROM WORKER FILE, DISCUSSED IN DETAIL BELOW
const identify: WorkerMessage = {
type: "IDENTIFY_SHARD",
shardId,
};
worker.postMessage(identify);
},
});
// A COLLECTION OF WORKERS
const workers = new Collection<number, Worker>();
const nonces = new Collection<string, (data: any) => void>();
function createWorker(workerId: number) {
const workerData: WorkerCreateData = {
intents: gateway.manager.gatewayConfig.intents ?? 0,
token: DISCORD_TOKEN,
handlerUrls: [EVENT_HANDLER_URL],
handlerAuthorization: EVENT_HANDLER_SECRET_KEY,
path: "./worker.ts",
totalShards: gateway.manager.totalShards,
workerId,
};
const worker = new Worker("./worker.ts", {
workerData,
});
worker.on("message", async (data: ManagerMessage) => {
switch (data.type) {
case "REQUEST_IDENTIFY": {
await gateway.manager.requestIdentify(data.shardId);
const allowIdentify: WorkerMessage = {
type: "ALLOW_IDENTIFY",
shardId: data.shardId,
};
worker.postMessage(allowIdentify);
break;
}
case "NONCE_REPLY": {
nonces.get(data.nonce)?.(data.data);
}
}
});
return worker;
}
// TYPES WE USE
export type ManagerMessage = ManagerRequestIdentify | ManagerNonceReply<WorkerShardInfo[]>;
export type ManagerRequestIdentify = {
type: "REQUEST_IDENTIFY";
shardId: number;
};
export type ManagerNonceReply<T> = {
type: "NONCE_REPLY";
nonce: string;
data: T;
};
// SPAWN SHARDS INTO WORKERS
gateway.spawnShards();
```
## Worker File
Now that we've handled creating gateway, workers, we need to create a worker file to identify, receive gateway events
and send them to bot process.
Create a file in a path like `src/gateway/worker.ts`.
Now we'll have to create a Shard Manager, this is what will handle identifying, receiving events.
```ts
import { createShardManager } from "discordeno";
import { parentPort, workerData } from "worker_threads";
if (!parentPort) {
throw new Error("Parent port is null");
}
// THE DATA WE GET FROM GATEWAY FILE
const script: WorkerCreateData = workerData;
const identifyPromises = new Map<number, () => void>();
const manager = createShardManager({
gatewayConfig: {
intents: script.intents,
token: script.token,
},
shardIds: [],
totalShards: script.totalShards,
// WE WILL COVER THESE TWO FUNCTIONS IN LATER PART OF THE GUIDE, FOR NOW, LEAVE IT THIS WAY
handleMessage: () => {},
requestIdentify: async () => {},
});
```
The above code only creates a shard manager, we now have 3 more things to do:
- Listening to gateway process, sending events received to respective shards in the manager.
- Handling Discord Payloads.
- Requesting Identify.
## Sending events from Gateway to Shards in Manager
In order for the shards to receive events and send to bot process, we need to receive the event payloads from gateway
first, we can do this by using `message` event in `parentPort` like shown below:
```ts
import { Shard } from "discordeno";
function buildShardInfo(shard: Shard): WorkerShardInfo {
return {
workerId: script.workerId,
shardId: shard.id,
rtt: shard.heart.rtt || -1,
state: shard.state,
};
}
parentPort.on("message", async (data: WorkerMessage) => {
switch (data.type) {
// Gateway sends IDENTIFY_SHARD in gateway.tellWorkerToIdentify
case "IDENTIFY_SHARD": {
await manager.identify(data.shardId);
break;
}
// Gateway sends ALLOW_IDENTIFY when worker requests to identify
case "ALLOW_IDENTIFY": {
identifyPromises.get(data.shardId)?.();
identifyPromises.delete(data.shardId);
break;
}
// Gateway sends SHARD_PAYLOAD for every events it receives from Discord
case "SHARD_PAYLOAD": {
manager.shards.get(data.shardId)?.send(data.data);
break;
}
// Send shard info if gateway sends GET_SHARD_INFO
case "GET_SHARD_INFO": {
const infos = manager.shards.map(buildShardInfo);
parentPort?.postMessage({ type: "NONCE_REPLY", nonce: data.nonce, data: infos });
}
}
});
```
Now TypeScript will error because of missing types, add these to your code:
```ts
import { ShardSocketRequest, ShardState } from "discordeno";
export type WorkerMessage = WorkerIdentifyShard | WorkerAllowIdentify | WorkerShardPayload | WorkerGetShardInfo;
export type WorkerIdentifyShard = {
type: "IDENTIFY_SHARD";
shardId: number;
};
export type WorkerAllowIdentify = {
type: "ALLOW_IDENTIFY";
shardId: number;
};
export type WorkerShardPayload = {
type: "SHARD_PAYLOAD";
shardId: number;
data: ShardSocketRequest;
};
export type WorkerGetShardInfo = {
type: "GET_SHARD_INFO";
nonce: string;
};
export type WorkerCreateData = {
intents: number;
token: string;
handlerUrls: string[];
handlerAuthorization: string;
path: string;
totalShards: number;
workerId: number;
};
export type WorkerShardInfo = {
workerId: number;
shardId: number;
rtt: number;
state: ShardState;
};
```
## Handling Discord Payloads
One of the big things we didn't cover yet is the handler for discord payloads. This is the main sauce of your worker
process here. This is going to take the events that the gateway manager sent and send it to your event handler. How you
wish to communicate with your event handler is up to you. For this guide, we will use http, but you can replace that
with anything you like.
```ts
handleDiscordPayload: async function (_, data, shardId) {
// CHANGE FROM SENDING THROUGH HTTP TO USING A WS FOR FASTER PROCESSING! OR HTTP3 OR WHATEVER!
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
headers: {
Authorization: gateway.secretKey,
method: 'POST',
body: JSON.stringify({
shardId,
data,
}),
},
})
// BELOW IS FOR SOLVING DENO MEMORY LEAK. Node users do your thing.
.then((res) => res.text())
.catch(() => null)
},
manager.createShardOptions.handleMessage = async (shard, message) => {
const url = script.handlerUrls[shard.id % script.handlerUrls.length];
if (!url) return console.error("ERROR: NO URL FOUND TO SEND MESSAGE");
await fetch(url, {
method: "POST",
body: JSON.stringify({ message, shardId: shard.id }),
headers: { "Content-Type": "application/json", Authorization: script.handlerAuthorization },
}).catch((error) => console.error(error));
};
```
You can change this function to use a WS or any form of communication you prefer to use to send this to your event
handler.
This is also the place where you make use of the [Gateway Cache](#gateway-cache) we mentioned earlier (`guildIds`,
`loadingGuildIds`, `editedMessages`).
## Gateway Queue
One thing we can add on here, which you will find already done in the template if you are using it. However, it is still
good to read this to learn and understand the logic behind it. When you need a downtime for whatever reason, you can
create a queue like system to avoid any missed events. Let's create a simple queue. If it errors, assuming something
like the bot event listener process is down for whatever reason, the `.catch` will run adding this event to the queue to
try again in one second by calling the `handleQueue` function.
like the bot event listener process is down for whatever reason, the `.catch` in `fetch` will run adding this event to
the queue to try again in one second by calling the `handleQueue` function.
```ts
.catch(() => {
// IF FAILED TRY TO QUEUE MAYBE LISTENER IS DOWN
if (data.t === "INTERACTION_CREATE") handleInteractionQueueing(gateway, data, shardId);
else queue.events.push({ shardId, data });
if (message.t === "INTERACTION_CREATE") handleInteractionQueueing(message, shard.id);
else queue.events.push({ shardId: shard.id, message });
setTimeout(handleQueue, 1000);
});
@@ -249,13 +597,15 @@ Now TypeScript will probably throw some errors at your face, so let's fix those
hold the queue of events for our gateway.
```ts
import { DiscordGatewayPayload } from "discordeno";
const queue: GatewayQueue = {
processing: false,
events: [],
};
export interface QueuedEvent {
data: GatewayPayload;
message: DiscordGatewayPayload;
shardId: number;
}
@@ -268,7 +618,7 @@ async function handleQueue() {
// PLACEHOLDER FUNCTION THAT WILL HANDLE PROCESSING THE QUEUE
}
async function handleInteractionQueueing(gateway: GatewayManager, data: GatewayPayload, shardId: number) {
async function handleInteractionQueueing(message: DiscordGatewayPayload, shardId: number) {
// PLACEHOLDER FUNCTION
}
```
@@ -277,15 +627,15 @@ Alrighty, since TypeScript stopped being annoying, let's continue. Next, we shou
queue is already processing or has events queued up. This will help us preserve the order of events in the queue.
```ts
handleDiscordPayload: async function (_, data, shardId) {
handleMessage: async function (shard, message) {
// IF QUEUE IS RUNNING JUST ADD TO QUEUE
if (queue.processing) {
if (data.t === "INTERACTION_CREATE") return handleInteractionQueueing(gateway, data, shardId);
if (message.t === "INTERACTION_CREATE") return handleInteractionQueueing(message, shard.id);
return queue.events.push({ shardId, data });
return queue.events.push({ shardId: shard.id, message });
}
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
await fetch(EVENT_HANDLER_URL, {
```
Typescript must be at it again so let's shut it up again. Keep in mind that we are handling interaction events
@@ -294,16 +644,19 @@ automatically respond to the ones that can not be deferred. For the interactions
defer them and add this event to the queue.
```ts
async function handleInteractionQueueing(gateway: GatewayManager, data: GatewayPayload, shardId: number) {
if (data.t !== "INTERACTION_CREATE") return;
import { DiscordInteraction, InteractionResponseTypes, InteractionTypes, routes } from "discordeno";
import { BOT_SERVER_INVITE_CODE } from "../../configs.ts";
const interaction = data.d as SnakeCasedPropertiesDeep<Interaction>;
async function handleInteractionQueueing(message: DiscordGatewayPayload, shardId: number) {
if (message.t !== "INTERACTION_CREATE") return;
const interaction = message.d as DiscordInteraction;
// IF THIS INTERACTION IS NOT DEFERABLE
if ([InteractionTypes.ModalSubmit, InteractionTypes.ApplicationCommandAutocomplete].includes(interaction.type)) {
return await rest.runMethod(
rest,
"POST",
endpoints.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token),
routes.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token),
{
type: InteractionResponseTypes.ChannelMessageWithSource,
data: {
@@ -322,7 +675,7 @@ async function handleInteractionQueueing(gateway: GatewayManager, data: GatewayP
});
// ADD EVENT TO QUEUE
queue.events.push({ shardId, data });
queue.events.push({ shardId, message });
}
```
@@ -353,7 +706,7 @@ async function handleQueue() {
return;
}
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
await fetch(EVENT_HANDLER_URL, {
headers: {
Authorization: EVENT_HANDLER_SECRET_KEY,
"Content-Type": "application/json",
@@ -361,7 +714,7 @@ async function handleQueue() {
method: "POST",
body: JSON.stringify({
shardId: event.shardId,
data: event.data,
message: event.message,
}),
})
.then((res) => {
@@ -376,65 +729,151 @@ async function handleQueue() {
}
```
Full code is below:
## Requesting Identify
We need to request identify in order to trigger initial handshake with the gateway, we'll use `manager.requestIdentify`
to do this.
```ts
import { ManagerMessage } from "./mod.ts";
manager.requestIdentify = async function (shardId: number): Promise<void> {
return await new Promise((resolve) => {
identifyPromises.set(shardId, resolve);
const identifyRequest: ManagerMessage = {
type: "REQUEST_IDENTIFY",
shardId,
};
parentPort?.postMessage(identifyRequest);
});
};
```
That's all, you've now setup your gateway and worker. Here's the full code of `src/gateway/worker.ts`:
```ts
import {
createRestManager,
createShardManager,
DiscordGatewayPayload,
DiscordInteraction,
InteractionResponseTypes,
InteractionTypes,
routes,
Shard,
ShardSocketRequest,
ShardState,
} from "discordeno";
import { parentPort, workerData } from "worker_threads";
import { ManagerMessage } from "./mod";
import {
BOT_SERVER_INVITE_CODE,
DISCORD_TOKEN,
EVENT_HANDLER_SECRET_KEY,
EVENT_HANDLER_URL,
REST_AUTHORIZATION,
REST_URL,
} from "../../configs.ts";
if (!parentPort) {
throw new Error("Parent port is null");
}
// THE DATA WE GET FROM GATEWAY FILE
const script: WorkerCreateData = workerData;
const identifyPromises = new Map<number, () => void>();
const rest = createRestManager({
token: DISCORD_TOKEN,
secretKey: REST_AUTHORIZATION,
customUrl: REST_URL,
});
const manager = createShardManager({
gatewayConfig: {
intents: script.intents,
token: script.token,
},
shardIds: [],
totalShards: script.totalShards,
handleMessage: async (shard, message) => {
const url = script.handlerUrls[shard.id % script.handlerUrls.length];
if (!url) return console.error("ERROR: NO URL FOUND TO SEND MESSAGE");
await fetch(url, {
method: "POST",
body: JSON.stringify({ message, shardId: shard.id }),
headers: { "Content-Type": "application/json", Authorization: script.handlerAuthorization },
}).catch(() => {
// IF FAILED TRY TO QUEUE MAYBE LISTENER IS DOWN
if (message.t === "INTERACTION_CREATE") handleInteractionQueueing(message, shard.id);
else queue.events.push({ shardId: shard.id, message });
setTimeout(handleQueue, 1000);
});
},
requestIdentify: async function (shardId: number): Promise<void> {
return await new Promise((resolve) => {
identifyPromises.set(shardId, resolve);
const identifyRequest: ManagerMessage = {
type: "REQUEST_IDENTIFY",
shardId,
};
parentPort?.postMessage(identifyRequest);
});
},
});
function buildShardInfo(shard: Shard): WorkerShardInfo {
return {
workerId: script.workerId,
shardId: shard.id,
rtt: shard.heart.rtt || -1,
state: shard.state,
};
}
parentPort.on("message", async (data: WorkerMessage) => {
switch (data.type) {
// Gateway sends IDENTIFY_SHARD in gateway.tellWorkerToIdentify
case "IDENTIFY_SHARD": {
await manager.identify(data.shardId);
break;
}
// Gateway sends ALLOW_IDENTIFY when worker requests to identify
case "ALLOW_IDENTIFY": {
identifyPromises.get(data.shardId)?.();
identifyPromises.delete(data.shardId);
break;
}
// Gateway sends SHARD_PAYLOAD for every events it receives from Discord
case "SHARD_PAYLOAD": {
manager.shards.get(data.shardId)?.send(data.data);
break;
}
// Send shard info if gateway sends GET_SHARD_INFO
case "GET_SHARD_INFO": {
const infos = manager.shards.map(buildShardInfo);
parentPort?.postMessage({ type: "NONCE_REPLY", nonce: data.nonce, data: infos });
}
}
});
const queue: GatewayQueue = {
processing: false,
events: [],
};
export interface QueuedEvent {
data: GatewayPayload;
shardId: number;
}
export interface GatewayQueue {
processing: boolean;
events: QueuedEvent[];
}
const gateway = createGatewayManager({
// ... OTHER PROPERTIES HERE SHORTENED FOR GUIDE
handleDiscordPayload: async function (_, data, shardId) {
// IF QUEUE IS RUNNING JUST ADD TO QUEUE
if (queue.processing) {
if (data.t === "INTERACTION_CREATE") return handleInteractionQueueing(gateway, data, shardId);
return queue.events.push({ shardId, data });
}
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
headers: {
Authorization: gateway.secretKey,
"Content-Type": "application/json",
},
method: "POST",
body: JSON.stringify({
shardId,
data,
}),
})
// THIS IS FOR DENO MEMORY LEAK
.then((res) => res.text())
.catch(() => {
// IF FAILED TRY TO QUEUE MAYBE LISTENER IS DOWN
if (data.t === "INTERACTION_CREATE") handleInteractionQueueing(gateway, data, shardId);
else queue.events.push({ shardId, data });
setTimeout(handleQueue, 1000);
});
},
});
async function handleQueue() {
// TRY THE FIRST ITEM IN THE QUEUE
const event = queue.events.shift();
// QUEUE IS EMPTY
if (!event) {
@@ -443,7 +882,7 @@ async function handleQueue() {
return;
}
await fetch(`${EVENT_HANDLER_URL}:${EVENT_HANDLER_PORT}`, {
await fetch(EVENT_HANDLER_URL, {
headers: {
Authorization: EVENT_HANDLER_SECRET_KEY,
"Content-Type": "application/json",
@@ -451,7 +890,7 @@ async function handleQueue() {
method: "POST",
body: JSON.stringify({
shardId: event.shardId,
data: event.data,
message: event.message,
}),
})
.then((res) => {
@@ -461,32 +900,26 @@ async function handleQueue() {
.catch(() => {
// EVENT HANDLER STILL NOT ACCEPTING REQUEST. SO ADD BACK TO QUEUE
queue.events.unshift(event);
// RETRY IN ONE SECOND
setTimeout(handleQueue, 1000);
});
}
async function handleInteractionQueueing(gateway: GatewayManager, data: GatewayPayload, shardId: number) {
if (data.t !== "INTERACTION_CREATE") return;
async function handleInteractionQueueing(message: DiscordGatewayPayload, shardId: number) {
if (message.t !== "INTERACTION_CREATE") return;
const interaction = data.d as SnakeCasedPropertiesDeep<Interaction>;
const interaction = message.d as DiscordInteraction;
// IF THIS INTERACTION IS NOT DEFERABLE
if ([InteractionTypes.ModalSubmit, InteractionTypes.ApplicationCommandAutocomplete].includes(interaction.type)) {
return await rest.runMethod(
rest,
"POST",
endpoints.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token),
{
type: InteractionResponseTypes.ChannelMessageWithSource,
data: {
content:
`The bot is having a temporary issue, please try again or contact us at https://discord.gg/${BOT_SERVER_INVITE_CODE}`,
},
return await rest.runMethod(rest, "POST", routes.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token), {
type: InteractionResponseTypes.ChannelMessageWithSource,
data: {
content:
`The bot is having a temporary issue, please try again or contact us at https://discord.gg/${BOT_SERVER_INVITE_CODE}`,
},
);
});
}
await rest.runMethod(rest, "POST", endpoints.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token), {
await rest.runMethod(rest, "POST", routes.INTERACTION_ID_TOKEN(BigInt(interaction.id), interaction.token), {
// MESSAGE COMPONENTS NEED SPECIAL DEFER
type: InteractionTypes.MessageComponent === interaction.type
? InteractionResponseTypes.DeferredUpdateMessage
@@ -494,41 +927,64 @@ async function handleInteractionQueueing(gateway: GatewayManager, data: GatewayP
});
// ADD EVENT TO QUEUE
queue.events.push({ shardId, data });
queue.events.push({ shardId, message });
}
export type WorkerMessage = WorkerIdentifyShard | WorkerAllowIdentify | WorkerShardPayload | WorkerGetShardInfo;
export type WorkerIdentifyShard = {
type: "IDENTIFY_SHARD";
shardId: number;
};
export type WorkerAllowIdentify = {
type: "ALLOW_IDENTIFY";
shardId: number;
};
export type WorkerShardPayload = {
type: "SHARD_PAYLOAD";
shardId: number;
data: ShardSocketRequest;
};
export type WorkerGetShardInfo = {
type: "GET_SHARD_INFO";
nonce: string;
};
export type WorkerCreateData = {
intents: number;
token: string;
handlerUrls: string[];
handlerAuthorization: string;
path: string;
totalShards: number;
workerId: number;
};
export type WorkerShardInfo = {
workerId: number;
shardId: number;
rtt: number;
state: ShardState;
};
export interface QueuedEvent {
message: DiscordGatewayPayload;
shardId: number;
}
export interface GatewayQueue {
processing: boolean;
events: QueuedEvent[];
}
```
If you have any questions please contact us on discord. Note, you can take this concept and expand on it as much as you
Note, you can take this concept and expand on it as much as you
like. You can swap out the fetch() with websockets or any other system you like to communicate between your processes. I
highly recommend you take some time to add checks in place to prevent adding to queue when the queue reaches a certain
size. You don't want this to become a memory leak of infinite size and crash your gateway. So take the time to do this
right in your setup.
## Spawning Shards
Once you are ready and the gateway has been created as you desired, we can begin spawning the shards.
```ts
gateway.spawnShards(gateway);
```
## Workers
Now, we should take a minute here to talk about workers. Workers are just Clusters in Node.js
When you have a big bot and you are processing millions of events, you need to speed up that processing. Keeping it in 1
thread is not very nice since JavaScript is single threaded. This means it can only process 1 event at a time. With
workers, you can make it process several events at the same time. We mentioned the `shardsPerCluster` earlier. This
option was added to allow you to choose how many shards should be managed by each worker.
When shards are spawn they are triggered by a method on gateway.
```ts
gateway.tellWorkerToIdentify = async function (gateway, workerId, shardId, bucketId) {
await gateway.identify(gateway, shardId, gateway.maxShards);
};
```
You can choose to replace the handler with any desired functionality you like. For example, should should you want to
create a new worker for each new workerId that appears and have that worker trigger the identify functionaly. How you
choose to handler workers is left in your care.
If you have any questions, please contact us on our [discord server](https://discord.gg/ddeno).

View File

@@ -54,7 +54,7 @@ const rest = createRestManager({
```
- `createRestManager` is imported from your deps file which should have exported everything from discordeno.
- `DISCORD_TOKEN` is the bots token itself.
- `DISCORD_TOKEN` is the bot's token itself.
- `REST_AUTHORIZATION` is a special password you want to use to authenticate that requests being sent to your port are
indeed from you.
- `customUrl` the url where this rest process will be running. This can be localhost which we are using in this guide if

View File

@@ -4,7 +4,7 @@ sidebar_position: 1
# Step By Step Guide
THIS IS A WORK IN PROGRESS GUIDE USING THE NEW v13 OF DISCORDENO.
THIS IS A WORK IN PROGRESS GUIDE USING THE NEW v16 OF DISCORDENO.
## Understanding The Goals of This Guide