Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limiter #344

Merged
merged 23 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,29 @@ services:

postgres:
image: timescale/timescaledb-ha:pg14
ports:
- "5433:5432"
restart: always
volumes:
- postgres:/home/postgres/pgdata/data
environment:
- POSTGRES_PASSWORD=postgres

pgbouncer:
image: edoburu/pgbouncer@
container_name: pgbouncer
environment:
- DB_HOST=postgres
- DB_PORT=5432
- DB_USER=postgres
- DB_PASSWORD=postgres
- AUTH_TYPE=scram-sha-256
- POOL_MODE=transaction
- ADMIN_USERS=postgres,admin
- MAX_CLIENT_CONN=1000
ports:
- "5433:5432"
depends_on:
- postgres

web: &backend
volumes:
- ./run/:/app
Expand Down
18 changes: 10 additions & 8 deletions run/config/redis.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
const { getRedisHost, getRedisPort, getRedisUsername, getRedisPassword, getRedisTls, getRedisTlsSentinel } = require('../lib/env');

module.exports = {
development: {
"host": process.env.REDIS_HOST,
"port": process.env.REDIS_PORT,
'host': getRedisHost(),
'port': getRedisPort(),
},
production: {
"host": process.env.REDIS_HOST,
"port": process.env.REDIS_PORT,
"username": process.env.REDIS_USERNAME,
"password": process.env.REDIS_PASSWORD,
"tls": process.env.ENABLE_REDIS_TLS,
"enableTLSForSentinelMode": process.env.ENABLE_REDIS_TLS_SENTINEL
'host': getRedisHost(),
'port': getRedisPort(),
'username': getRedisUsername(),
'password': getRedisPassword(),
'tls': getRedisTls(),
'enableTLSForSentinelMode': getRedisTlsSentinel()
}
}
3 changes: 2 additions & 1 deletion run/jobs/batchBlockSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ module.exports = async job => {
userId: data.userId,
workspace: data.workspace,
blockNumber: i,
source: data.source || 'batchSync'
source: data.source || 'batchSync',
rateLimited: true
}
});
}
Expand Down
46 changes: 42 additions & 4 deletions run/jobs/blockSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ const { ProviderConnector } = require('../lib/rpc');
const { Workspace, Explorer, StripeSubscription, StripePlan, RpcHealthCheck, Block } = require('../models');
const db = require('../lib/firebase');
const logger = require('../lib/logger');
const { sanitize, processRawRpcObject } = require('../lib/utils');
const { processRawRpcObject } = require('../lib/utils');
const { enqueue, bulkEnqueue } = require('../lib/queue');
const RateLimiter = require('../lib/rateLimiter');

module.exports = async job => {
const data = job.data;
Expand Down Expand Up @@ -63,10 +65,31 @@ module.exports = async job => {
else if (data.source != 'recovery' && workspace.integrityCheck && workspace.integrityCheck.isRecovering)
await db.updateWorkspaceIntegrityCheck(workspace.id, { status: 'healthy' });

const providerConnector = new ProviderConnector(workspace.rpcServer);
let limiter;
if (data.rateLimited && workspace.rateLimitInterval && workspace.rateLimitMaxInInterval)
limiter = new RateLimiter(workspace.id, workspace.rateLimitInterval, workspace.rateLimitMaxInInterval);
const providerConnector = new ProviderConnector(workspace.rpcServer, limiter);
let block;

try {
const block = await providerConnector.fetchRawBlockWithTransactions(data.blockNumber);
try {
block = await providerConnector.fetchRawBlockWithTransactions(data.blockNumber);
} catch(error) {
if (error.message == 'Rate limited') {
const priority = job.opts.priority || data.source == 'cli-light' ? 1 : 10;
await enqueue('blockSync', `blockSync-${workspace.id}-${data.blockNumber}`, {
userId: workspace.user.firebaseUserId,
workspace: workspace.name,
blockNumber: data.blockNumber,
source: data.source,
rateLimited: data.rateLimited
}, priority, null, workspace.rateLimitInterval);
return `Re-enqueuing: ${error.message}`
}
else
throw error;
}

if (!block)
throw new Error("Couldn't fetch block from provider");

Expand All @@ -82,10 +105,25 @@ module.exports = async job => {
if (!syncedBlock)
throw new Error("Couldn't store block");

const transactions = syncedBlock.transactions;
const jobs = [];
for (let i = 0; i < transactions.length; i++) {
const transaction = transactions[i];
jobs.push({
name: `receiptSync-${workspace.id}-${transaction.hash}`,
data: {
transactionId: transaction.id,
source: data.source,
rateLimited: data.rateLimited
}
});
}
await bulkEnqueue('receiptSync', jobs, job.opts.priority);

return 'Block synced';
} catch(error) {
logger.error(error.message, { location: 'jobs.blockSync', error, data });
await db.incrementFailedAttempts(workspace.id);
// await db.incrementFailedAttempts(workspace.id);
throw error;
}
};
35 changes: 28 additions & 7 deletions run/jobs/receiptSync.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const { ProviderConnector } = require('../lib/rpc');
const { Workspace, Explorer, StripeSubscription, Transaction, TransactionReceipt, RpcHealthCheck } = require('../models');
const db = require('../lib/firebase');
const { enqueue } = require('../lib/queue');
const RateLimiter = require('../lib/rateLimiter');
const logger = require('../lib/logger');

module.exports = async job => {
Expand All @@ -14,7 +16,7 @@ module.exports = async job => {
{
model: Workspace,
as: 'workspace',
attributes: ['id', 'rpcServer'],
attributes: ['id', 'rpcServer', 'rateLimitInterval', 'rateLimitMaxInInterval'],
include: [
{
model: Explorer,
Expand Down Expand Up @@ -46,27 +48,46 @@ module.exports = async job => {
if (!transaction.workspace)
return 'Missing workspace';

if (!transaction.workspace.explorer)
const workspace = transaction.workspace;

if (!workspace.explorer)
return 'Inactive explorer';

if (transaction.workspace.rpcHealthCheck && transaction.workspace.rpcHealthCheckEnabled && !transaction.workspace.rpcHealthCheck.isReachable)
if (workspace.rpcHealthCheck && workspace.rpcHealthCheckEnabled && !workspace.rpcHealthCheck.isReachable)
return 'RPC is unreachable';

if (!transaction.workspace.explorer.stripeSubscription)
if (!workspace.explorer.stripeSubscription)
return 'No active subscription';

const providerConnector = new ProviderConnector(transaction.workspace.rpcServer);
let limiter;
if (data.rateLimited && workspace.rateLimitInterval && workspace.rateLimitMaxInInterval)
limiter = new RateLimiter(workspace.id, workspace.rateLimitInterval, workspace.rateLimitMaxInInterval);

const providerConnector = new ProviderConnector(workspace.rpcServer, limiter);

try {
const receipt = await providerConnector.fetchTransactionReceipt(transaction.hash);
let receipt;
try {
receipt = await providerConnector.fetchTransactionReceipt(transaction.hash);
} catch(error) {
if (error.message == 'Rate limited') {
const priority = job.opts.priority || data.source == 'cli-light' ? 1 : 10;
await enqueue('receiptSync', `receiptSync-${workspace.id}-${transaction.hash}`, {
transactionId: transaction.id,
source: data.source,
rateLimited: data.rateLimited
}, priority, null, workspace.rateLimitInterval);
return `Re-enqueuing: ${error.message}`
}
}

if (!receipt)
throw new Error('Failed to fetch receipt');

return db.storeTransactionReceipt(data.transactionId, receipt);
} catch(error) {
logger.error(error.message, { location: 'jobs.receiptSync', error, data });
await db.incrementFailedAttempts(transaction.workspace.id);
// await db.incrementFailedAttempts(transaction.workspace.id);
throw error;
}
};
8 changes: 7 additions & 1 deletion run/lib/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,11 @@ module.exports = {
getSoketiPort: () => process.env.SOKETI_PORT,
getSoketiScheme: () => process.env.SOKETI_SCHEME,
getSoketiUseTLS: () => process.env.SOKETI_USE_TLS,
getDiscordFeedbackChannelWebhook: () => process.env.DISCORD_FEEDBACK_CHANNEL_WEBHOOK
getDiscordFeedbackChannelWebhook: () => process.env.DISCORD_FEEDBACK_CHANNEL_WEBHOOK,
getRedisHost: () => process.env.REDIS_HOST,
getRedisPort: () => process.env.REDIS_PORT,
getRedisUsername: () => process.env.REDIS_USERNAME,
getRedisPassword: () => process.env.REDIS_PASSWORD,
getRedisTls: () => process.env.ENABLE_REDIS_TLS,
getRedisTlsSentinel: () => process.env.ENABLE_REDIS_TLS_SENTINEL
};
2 changes: 1 addition & 1 deletion run/lib/logger.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { createLogger, format, transports } = require('winston');

const logger = createLogger({
level: process.env.LOG_LEVEL || 'error',
level: process.env.LOG_LEVEL || 'info',
exitOnError: false,
format: format.json(),
transports: []
Expand Down
2 changes: 1 addition & 1 deletion run/lib/queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const queues = require("../queues");
const { sanitize } = require("./utils");

const uniqueQueues = ["blockSync", "batchBlockSync"];
const uniqueQueues = [];

const MAX_BATCH_SIZE = 2000;

Expand Down
28 changes: 28 additions & 0 deletions run/lib/rateLimiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const { RedisRateLimiter } = require('rolling-rate-limiter');
const { getNodeEnv } = require('../lib/env');
const Redis = require('ioredis');
const config = require('../config/redis')[getNodeEnv()];

const redis = new Redis(config);

class RateLimiter {

constructor(id, interval, maxInInterval) {
this.id = id;
this.limiter = new RedisRateLimiter({
client: redis,
namespace: 'rate-limiter',
interval, maxInInterval
});
}

limit() {
return this.limiter.limit(this.id);
}

wouldLimit() {
return this.limiter.wouldLimitWithInfo(this.id);
}
}

module.exports = RateLimiter;
16 changes: 15 additions & 1 deletion run/lib/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,28 @@ const getProvider = function(url) {
};

class ProviderConnector {
constructor(server) {
constructor(server, limiter) {
if (!server) throw '[ProviderConnector] Missing server parameter';
this.provider = getProvider(server);
this.limiter = limiter;
}

async checkRateLimit() {
if (this.limiter) {
const { blocked: shouldLimit } = await this.limiter.wouldLimit();
if (shouldLimit)
throw new Error('Rate limited');
await this.limiter.limit();
}
}

fetchLatestBlock() {
return withTimeout(this.provider.getBlock());
}

async fetchRawBlockWithTransactions(blockNumber) {
await this.checkRateLimit();

const res = await withTimeout(this.provider.send('eth_getBlockByNumber', [`0x${blockNumber.toString(16)}`, true]))
return sanitize(res);
}
Expand All @@ -114,6 +126,8 @@ class ProviderConnector {
}

async fetchTransactionReceipt(transactionHash) {
await this.checkRateLimit();

try {
return await withTimeout(this.provider.getTransactionReceipt(transactionHash));
} catch(error) {
Expand Down
35 changes: 35 additions & 0 deletions run/migrations/20240527114054-add-rate-limiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

module.exports = {
async up (queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.addColumn('workspaces', 'rateLimitInterval', {
type: Sequelize.DataTypes.INTEGER,
allowNull: true
});
await queryInterface.addColumn('workspaces', 'rateLimitMaxInInterval', {
type: Sequelize.DataTypes.INTEGER,
allowNull: true
});
await transaction.commit();
} catch(error) {
console.log(error)
await transaction.rollback();
throw error;
}
},

async down (queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.removeColumn('workspaces', 'rateLimitInterval');
await queryInterface.removeColumn('workspaces', 'rateLimitMaxInInterval');
await transaction.commit();
} catch(error) {
console.log(error)
await transaction.rollback();
throw error;
}
}
};
16 changes: 3 additions & 13 deletions run/models/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { trigger } = require('../lib/pusher');
const { enqueue, bulkEnqueue } = require('../lib/queue');
const { getNodeEnv } = require('../lib/env');
const moment = require('moment');
const STALLED_BLOCK_REMOVAL_DELAY = getNodeEnv() == 'production' ? 1 * 60 * 1000 : 15 * 60 * 1000;
const STALLED_BLOCK_REMOVAL_DELAY = getNodeEnv() == 'production' ? 5 * 60 * 1000 : 15 * 60 * 1000;

module.exports = (sequelize, DataTypes) => {
class Block extends Model {
Expand Down Expand Up @@ -79,20 +79,10 @@ module.exports = (sequelize, DataTypes) => {
if (workspace.public) {
await enqueue('removeStalledBlock', `removeStalledBlock-${block.id}`, { blockId: block.id }, null, null, STALLED_BLOCK_REMOVAL_DELAY);
const afterCreateFn = async () => {
const transactions = block.transactions;
const jobs = [];
for (let i = 0; i < transactions.length; i++) {
const transaction = transactions[i];
jobs.push({
name: `receiptSync-${workspace.id}-${transaction.hash}`,
data: { transactionId: transaction.id }
});
}
await bulkEnqueue('receiptSync', jobs);
if (workspace.tracing == 'other') {
const jobs = [];
for (let i = 0; i < transactions.length; i++) {
const transaction = transactions[i];
for (let i = 0; i < block.transactions.length; i++) {
const transaction = block.transactions[i];
jobs.push({
name: `processTransactionTrace-${workspace.id}-${transaction.hash}`,
data: { transactionId: transaction.id }
Expand Down
4 changes: 3 additions & 1 deletion run/models/workspace.js
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,9 @@ module.exports = (sequelize, DataTypes) => {
Math.max(this.getDataValue('integrityCheckStartBlockNumber'), 0) :
null;
}
}
},
rateLimitInterval: DataTypes.INTEGER,
rateLimitMaxInInterval: DataTypes.INTEGER
}, {
hooks: {
afterCreate(workspace, options) {
Expand Down
Loading
Loading