Skip to content
This repository has been archived by the owner on Apr 17, 2023. It is now read-only.

feat(Rest): custom bucket support #106

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"@cordis/common": "workspace:^1.2.0",
"@cordis/error": "workspace:^1.2.0",
"@cordis/queue": "workspace:^1.2.0",
"@cordis/rest": "workspace:^1.2.0",
"@cordis/rest": "workspace:^2.0.0",
"common-tags": "^1.8.2",
"discord-api-types": "^0.24.0",
"tslib": "^2.3.1",
Expand Down
2 changes: 1 addition & 1 deletion libs/rest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Cordis' REST utilities for the Discord API",
"main": "./dist/index.js",
"types": "./types/index.d.ts",
"version": "1.2.0",
"version": "2.0.0",
"scripts": {
"lint": "eslint src --ext .ts",
"build": "tsc",
Expand Down
3 changes: 1 addition & 2 deletions libs/rest/src/Error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ export const CordisRestError = makeCordisError(
{
retryLimitExceeded: (request: string, attempts: number) => `Tried to "${request}" for ${attempts} times but all of them failed`,
mutexLock: (route: string) => `A mutex for the "${route}" bucket locked up but was told to not wait`,
rateLimited: (request: string) => `A ratelimit was hit/prevented while "${request}"`,
internal: (request: string) => `Discord raised an internal error on "${request}"`
rateLimited: (request: string) => `A ratelimit was hit/prevented while "${request}"`
}
);

Expand Down
65 changes: 65 additions & 0 deletions libs/rest/src/bucket/BaseBucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { DiscordFetchOptions } from './Fetch';
import type { Rest } from '../struct';
import type { Response } from 'node-fetch';

export interface BucketConstructor {
makeRoute(method: string, url: string): string;
new (rest: Rest, route: string): BaseBucket;
}

/**
* Base bucket class - used to deal with all of the HTTP semantics
*/
export abstract class BaseBucket {
/**
* Time after a Bucket is destroyed if unused
*/
public static readonly BUCKET_TTL = 1e4;

/**
* Creates a simple API route representation (e.g. /users/:id), used as an identifier for each bucket.
*
* Credit to https://github.com/abalabahaha/eris
*/
public static makeRoute(method: string, url: string) {
let route = url
.replace(/\/([a-z-]+)\/(?:[0-9]{17,19})/g, (match, p) => (['channels', 'guilds', 'webhook'].includes(p) ? match : `/${p}/:id`))
.replace(/\/invites\/[\w\d-]{2,}/g, '/invites/:code')
.replace(/\/reactions\/[^/]+/g, '/reactions/:id')
.replace(/^\/webhooks\/(\d+)\/[A-Za-z0-9-_]{64,}/, '/webhooks/$1/:token')
.replace(/\?.*$/, '');

// Message deletes have their own rate limit
if (method === 'delete' && route.endsWith('/messages/:id')) {
route = method + route;
}

// In this case, /channels/[idHere]/messages is correct,
// however /channels/[idHere] is not. we need "/channels/:id"
if (/^\/channels\/[0-9]{17,19}$/.test(route)) {
route = route.replace(/[0-9]{17,19}/, ':id');
}

return route;
}

public ['constructor']!: typeof BaseBucket;

public constructor(
public readonly rest: Rest,
public readonly route: string
) {}

/**
* Shortcut for the manager mutex
*/
public get mutex() {
return this.rest.mutex;
}

/**
* Makes a request to Discord
* @param req Request options
*/
public abstract make<D, Q>(req: DiscordFetchOptions<D, Q>): Promise<Response>;
}
94 changes: 94 additions & 0 deletions libs/rest/src/bucket/Bucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { discordFetch, DiscordFetchOptions } from './Fetch';
import { CordisRestError, HTTPError } from '../Error';
import { BaseBucket } from './BaseBucket';
import type { Response } from 'node-fetch';
import { Rest } from '..';

/**
* Data held to represent ratelimit state for a Bucket
*/
export interface RatelimitData {
global: boolean;
limit: number;
timeout: number;
remaining: number;
}

/**
* Simple, default sequential bucket
*/
export class Bucket extends BaseBucket {
protected readonly _destroyTimeout: NodeJS.Timeout;

public constructor(rest: Rest, route: string) {
super(rest, route);
this._destroyTimeout = setTimeout(() => this.rest.buckets.delete(this.route), this.constructor.BUCKET_TTL).unref();
}

public async make<D, Q>(req: DiscordFetchOptions<D, Q>): Promise<Response> {
this._destroyTimeout.refresh();

this.rest.emit('request', req);

const mutexTimeout = await this.mutex
.claim(this.route, req.retryAfterRatelimit)
// Would rather throw a ratelimit error
.catch(() => Promise.reject(new CordisRestError('rateLimited', `${req.method.toUpperCase()} ${req.path}`)));

if (mutexTimeout > 0 && !req.isRetryAfterRatelimit) {
this.rest.emit('ratelimit', this.route, req.path, true, mutexTimeout);
}

let timeout: NodeJS.Timeout;
if (req.implicitAbortBehavior) {
timeout = setTimeout(() => {
req.controller.abort();
this.rest.emit('abort', req);
}, this.rest.abortAfter);
}

const res = await discordFetch(req).finally(() => clearTimeout(timeout));

const global = res.headers.get('x-ratelimit-global');
const limit = res.headers.get('x-ratelimit-limit');
const remaining = res.headers.get('x-ratelimit-remaining');
const resetAfter = res.headers.get('x-ratelimit-reset-after');

const state: Partial<RatelimitData> = {};

if (global) {
state.global = global === 'true';
}

if (limit) {
state.limit = Number(limit);
}

if (remaining) {
state.remaining = Number(remaining);
}

if (resetAfter) {
state.timeout = Number(resetAfter) * 1000;
}

this.rest.emit('response', req, res.clone(), state);

await this.mutex.set(this.route, state);

if (res.status === 429) {
const retry = res.headers.get('retry-after');
/* istanbul ignore next */
const retryAfter = Number(retry ?? 1) * 1000;

this.rest.emit('ratelimit', this.route, req.path, false, retryAfter);

await this.mutex.set(this.route, { timeout: retryAfter });
return Promise.reject(new CordisRestError('rateLimited', `${req.method.toUpperCase()} ${req.path}`));
} else if (!res.ok) {
return Promise.reject(new HTTPError(res.clone(), await res.text()));
}

return res;
}
}
15 changes: 10 additions & 5 deletions libs/rest/src/Fetch.ts → libs/rest/src/bucket/Fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import FormData from 'form-data';
import { URLSearchParams } from 'url';
import AbortController from 'abort-controller';
import { RouteBases } from 'discord-api-types/v9';
import { Readable } from 'stream';

/**
* Represents a file that can be sent to Discord
Expand Down Expand Up @@ -37,15 +38,16 @@ export interface DiscordFetchOptions<D = RequestBodyData, Q = StringRecord> {
isRetryAfterRatelimit: boolean;
query?: Q | string;
files?: File[];
data?: D;
data?: D | Readable;
domain?: string;
}

/**
* Makes the actual HTTP request
* @param options Options for the request
*/
export const discordFetch = async <D, Q>(options: DiscordFetchOptions<D, Q>) => {
let { path, method, headers, controller, query, files, data } = options;
let { path, method, headers, controller, query, files, data, domain = RouteBases.api } = options;

let queryString: string | null = null;
if (query) {
Expand All @@ -59,9 +61,9 @@ export const discordFetch = async <D, Q>(options: DiscordFetchOptions<D, Q>) =>
).toString();
}

const url = `${RouteBases.api}${path}${queryString ? `?${queryString}` : ''}`;
const url = `${domain}${path}${queryString ? `?${queryString}` : ''}`;

let body: string | FormData;
let body;
if (files?.length) {
body = new FormData();
for (const file of files) {
Expand All @@ -73,6 +75,9 @@ export const discordFetch = async <D, Q>(options: DiscordFetchOptions<D, Q>) =>
}

headers = Object.assign(headers, body.getHeaders());
} else if (data instanceof Readable) {
// In this case the user is expected to set their own Content-Type - otherwise Discord will complain
body = data;
} else if (data != null) {
body = JSON.stringify(data);
headers.set('Content-Type', 'application/json');
Expand All @@ -81,7 +86,7 @@ export const discordFetch = async <D, Q>(options: DiscordFetchOptions<D, Q>) =>
return fetch(url, {
method,
headers,
body: body!,
body: body,
signal: controller.signal,
// 250KB buffer for the sake of supporting 2 clones of reasonably big responses
highWaterMark: 25e4
Expand Down
35 changes: 35 additions & 0 deletions libs/rest/src/bucket/ProxyBucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { discordFetch, DiscordFetchOptions } from './Fetch';
import { CordisRestError, HTTPError } from '../Error';
import { BaseBucket } from './BaseBucket';
import type { Response } from 'node-fetch';

/**
* Unconventional Bucket implementation that will hijack all requests (i.e. there is no seperate bucket depending on the route)
*
* This is meant for proxying requests, but will not handle any ratelimiting and will entirely ignore mutexes
*/
export class ProxyBucket extends BaseBucket {
public static override makeRoute() {
return 'proxy';
}

public async make<D, Q>(req: DiscordFetchOptions<D, Q>): Promise<Response> {
let timeout: NodeJS.Timeout;
if (req.implicitAbortBehavior) {
timeout = setTimeout(() => {
req.controller.abort();
this.rest.emit('abort', req);
}, this.rest.abortAfter);
}

const res = await discordFetch(req).finally(() => clearTimeout(timeout));

if (res.status === 429) {
return Promise.reject(new CordisRestError('rateLimited', `${req.method.toUpperCase()} ${req.path}`));
} else if (!res.ok) {
return Promise.reject(new HTTPError(res.clone(), await res.text()));
}

return res;
}
}
4 changes: 4 additions & 0 deletions libs/rest/src/bucket/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './BaseBucket';
export * from './Bucket';
export * from './Fetch';
export * from './ProxyBucket';
2 changes: 1 addition & 1 deletion libs/rest/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export * from './bucket';
export * from './mutex';
export * from './struct';
export * from './Constants';
export * from './Error';
export * from './Fetch';
4 changes: 2 additions & 2 deletions libs/rest/src/mutex/MemoryMutex.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// ? Anything ignored from coverage in this file are just weird edge cases - nothing to really cover.

import { Mutex } from './Mutex';
import type { RatelimitData } from '../struct';
import type { RatelimitData } from '../bucket';

export interface MemoryRatelimitData extends RatelimitData {
expiresAt: Date;
Expand Down Expand Up @@ -43,7 +43,7 @@ export class MemoryMutex extends Mutex {
}

/* istanbul ignore next */
return 1e2;
return 0;
}

ratelimit.remaining--;
Expand Down
2 changes: 1 addition & 1 deletion libs/rest/src/mutex/Mutex.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { halt } from '@cordis/common';
import { CordisRestError } from '../Error';
import type { RatelimitData } from '../struct';
import type { RatelimitData } from '../bucket';

/**
* "Mutex" used to ensure requests don't go through when a ratelimit is about to happen
Expand Down
Loading