First commit

This commit is contained in:
2025-12-25 11:16:59 +01:00
commit 0c5ca09a63
720 changed files with 329234 additions and 0 deletions

View File

@@ -0,0 +1,74 @@
const RateLimiterRes = require("./RateLimiterRes");
/**
* Bursty rate limiter exposes only msBeforeNext time and doesn't expose points from bursty limiter by default
* @type {BurstyRateLimiter}
*/
module.exports = class BurstyRateLimiter {
constructor(rateLimiter, burstLimiter) {
this._rateLimiter = rateLimiter;
this._burstLimiter = burstLimiter
}
/**
* Merge rate limiter response objects. Responses can be null
*
* @param {RateLimiterRes} [rlRes] Rate limiter response
* @param {RateLimiterRes} [blRes] Bursty limiter response
*/
_combineRes(rlRes, blRes) {
return new RateLimiterRes(
rlRes.remainingPoints,
Math.min(rlRes.msBeforeNext, blRes.msBeforeNext),
rlRes.consumedPoints,
rlRes.isFirstInDuration
)
}
/**
* @param key
* @param pointsToConsume
* @param options
* @returns {Promise<any>}
*/
consume(key, pointsToConsume = 1, options = {}) {
return this._rateLimiter.consume(key, pointsToConsume, options)
.catch((rlRej) => {
if (rlRej instanceof RateLimiterRes) {
return this._burstLimiter.consume(key, pointsToConsume, options)
.then((blRes) => {
return Promise.resolve(this._combineRes(rlRej, blRes))
})
.catch((blRej) => {
if (blRej instanceof RateLimiterRes) {
return Promise.reject(this._combineRes(rlRej, blRej))
} else {
return Promise.reject(blRej)
}
}
)
} else {
return Promise.reject(rlRej)
}
})
}
/**
* It doesn't expose available points from burstLimiter
*
* @param key
* @returns {Promise<RateLimiterRes>}
*/
get(key) {
return Promise.all([
this._rateLimiter.get(key),
this._burstLimiter.get(key),
]).then(([rlRes, blRes]) => {
return this._combineRes(rlRes, blRes);
});
}
get points() {
return this._rateLimiter.points;
}
};

View File

@@ -0,0 +1,347 @@
const {
LIMITER_TYPES,
ERR_UNKNOWN_LIMITER_TYPE_MESSAGE,
} = require('./constants');
const crypto = require('crypto');
const {
RateLimiterMemory,
RateLimiterCluster,
RateLimiterMemcache,
RateLimiterMongo,
RateLimiterMySQL,
RateLimiterPostgres,
RateLimiterRedis,
} = require('../index');
function getDelayMs(count, delays, maxWait) {
let msDelay = maxWait;
const delayIndex = count - 1;
if (delayIndex >= 0 && delayIndex < delays.length) {
msDelay = delays[delayIndex];
}
return msDelay;
}
const ExpressBruteFlexible = function (limiterType, options) {
ExpressBruteFlexible.instanceCount++;
this.name = `brute${ExpressBruteFlexible.instanceCount}`;
this.options = Object.assign({}, ExpressBruteFlexible.defaults, options);
if (this.options.minWait < 1) {
this.options.minWait = 1;
}
const validLimiterTypes = Object.keys(ExpressBruteFlexible.LIMITER_TYPES).map(k => ExpressBruteFlexible.LIMITER_TYPES[k]);
if (!validLimiterTypes.includes(limiterType)) {
throw new Error(ERR_UNKNOWN_LIMITER_TYPE_MESSAGE);
}
this.limiterType = limiterType;
this.delays = [this.options.minWait];
while (this.delays[this.delays.length - 1] < this.options.maxWait) {
const nextNum = this.delays[this.delays.length - 1] + (this.delays.length > 1 ? this.delays[this.delays.length - 2] : 0);
this.delays.push(nextNum);
}
this.delays[this.delays.length - 1] = this.options.maxWait;
// set default lifetime
if (typeof this.options.lifetime === 'undefined') {
this.options.lifetime = Math.ceil((this.options.maxWait / 1000) * (this.delays.length + this.options.freeRetries));
}
this.prevent = this.getMiddleware({
prefix: this.options.prefix,
});
};
ExpressBruteFlexible.prototype.getMiddleware = function (options) {
const opts = Object.assign({}, options);
const commonKeyPrefix = opts.prefix || '';
const freeLimiterOptions = {
storeClient: this.options.storeClient,
storeType: this.options.storeType,
keyPrefix: `${commonKeyPrefix}free`,
dbName: this.options.dbName,
tableName: this.options.tableName,
points: this.options.freeRetries > 0 ? this.options.freeRetries - 1 : 0,
duration: this.options.lifetime,
};
const blockLimiterOptions = {
storeClient: this.options.storeClient,
storeType: this.options.storeType,
keyPrefix: `${commonKeyPrefix}block`,
dbName: this.options.dbName,
tableName: this.options.tableName,
points: 1,
duration: Math.min(this.options.lifetime, Math.ceil((this.options.maxWait / 1000))),
};
const counterLimiterOptions = {
storeClient: this.options.storeClient,
storeType: this.options.storeType,
keyPrefix: `${commonKeyPrefix}counter`,
dbName: this.options.dbName,
tableName: this.options.tableName,
points: 1,
duration: this.options.lifetime,
};
switch (this.limiterType) {
case 'memory':
this.freeLimiter = new RateLimiterMemory(freeLimiterOptions);
this.blockLimiter = new RateLimiterMemory(blockLimiterOptions);
this.counterLimiter = new RateLimiterMemory(counterLimiterOptions);
break;
case 'cluster':
this.freeLimiter = new RateLimiterCluster(freeLimiterOptions);
this.blockLimiter = new RateLimiterCluster(blockLimiterOptions);
this.counterLimiter = new RateLimiterCluster(counterLimiterOptions);
break;
case 'memcache':
this.freeLimiter = new RateLimiterMemcache(freeLimiterOptions);
this.blockLimiter = new RateLimiterMemcache(blockLimiterOptions);
this.counterLimiter = new RateLimiterMemcache(counterLimiterOptions);
break;
case 'mongo':
this.freeLimiter = new RateLimiterMongo(freeLimiterOptions);
this.blockLimiter = new RateLimiterMongo(blockLimiterOptions);
this.counterLimiter = new RateLimiterMongo(counterLimiterOptions);
break;
case 'mysql':
this.freeLimiter = new RateLimiterMySQL(freeLimiterOptions);
this.blockLimiter = new RateLimiterMySQL(blockLimiterOptions);
this.counterLimiter = new RateLimiterMySQL(counterLimiterOptions);
break;
case 'postgres':
this.freeLimiter = new RateLimiterPostgres(freeLimiterOptions);
this.blockLimiter = new RateLimiterPostgres(blockLimiterOptions);
this.counterLimiter = new RateLimiterPostgres(counterLimiterOptions);
break;
case 'redis':
this.freeLimiter = new RateLimiterRedis(freeLimiterOptions);
this.blockLimiter = new RateLimiterRedis(blockLimiterOptions);
this.counterLimiter = new RateLimiterRedis(counterLimiterOptions);
break;
default:
throw new Error(ERR_UNKNOWN_LIMITER_TYPE_MESSAGE);
}
let keyFunc = opts.key;
if (typeof keyFunc !== 'function') {
keyFunc = function (req, res, next) {
next(opts.key);
};
}
const getFailCallback = (() => (typeof opts.failCallback === 'undefined' ? this.options.failCallback : opts.failCallback));
return (req, res, next) => {
const cannotIncrementErrorObjectBase = {
req,
res,
next,
message: 'Cannot increment request count',
};
keyFunc(req, res, (key) => {
if (!opts.ignoreIP) {
key = ExpressBruteFlexible._getKey([req.ip, this.name, key]);
} else {
key = ExpressBruteFlexible._getKey([this.name, key]);
}
// attach a simpler "reset" function to req.brute.reset
if (this.options.attachResetToRequest) {
let reset = ((callback) => {
Promise.all([
this.freeLimiter.delete(key),
this.blockLimiter.delete(key),
this.counterLimiter.delete(key),
]).then(() => {
if (typeof callback === 'function') {
process.nextTick(() => {
callback();
});
}
}).catch((err) => {
if (typeof callback === 'function') {
process.nextTick(() => {
callback(err);
});
}
});
});
if (req.brute && req.brute.reset) {
// wrap existing reset if one exists
const oldReset = req.brute.reset;
const newReset = reset;
reset = function (callback) {
oldReset(() => {
newReset(callback);
});
};
}
req.brute = {
reset,
};
}
this.freeLimiter.consume(key)
.then(() => {
if (typeof next === 'function') {
next();
}
})
.catch(() => {
Promise.all([
this.blockLimiter.get(key),
this.counterLimiter.get(key),
])
.then((allRes) => {
const [blockRes, counterRes] = allRes;
if (blockRes === null) {
const msDelay = getDelayMs(
counterRes ? counterRes.consumedPoints + 1 : 1,
this.delays,
// eslint-disable-next-line
this.options.maxWait
);
this.blockLimiter.penalty(key, 1, { customDuration: Math.ceil(msDelay / 1000) })
.then((blockPenaltyRes) => {
if (blockPenaltyRes.consumedPoints === 1) {
this.counterLimiter.penalty(key)
.then(() => {
if (typeof next === 'function') {
next();
}
})
.catch((err) => {
this.options.handleStoreError(Object.assign({}, cannotIncrementErrorObjectBase, { parent: err }));
});
} else {
const nextValidDate = new Date(Date.now() + blockPenaltyRes.msBeforeNext);
const failCallback = getFailCallback();
if (typeof failCallback === 'function') {
failCallback(req, res, next, nextValidDate);
}
}
})
.catch((err) => {
this.options.handleStoreError(Object.assign({}, cannotIncrementErrorObjectBase, { parent: err }));
});
} else {
const nextValidDate = new Date(Date.now() + blockRes.msBeforeNext);
const failCallback = getFailCallback();
if (typeof failCallback === 'function') {
failCallback(req, res, next, nextValidDate);
}
}
})
.catch((err) => {
this.options.handleStoreError(Object.assign({}, cannotIncrementErrorObjectBase, { parent: err }));
});
});
});
};
};
ExpressBruteFlexible.prototype.reset = function (ip, key, callback) {
let keyArgs = [];
if (ip) {
keyArgs.push(ip)
}
keyArgs.push(this.name);
keyArgs.push(key);
const ebKey = ExpressBruteFlexible._getKey(keyArgs);
Promise.all([
this.freeLimiter.delete(ebKey),
this.blockLimiter.delete(ebKey),
this.counterLimiter.delete(ebKey),
]).then(() => {
if (typeof callback === 'function') {
process.nextTick(() => {
callback();
});
}
}).catch((err) => {
this.options.handleStoreError({
message: 'Cannot reset request count',
parent: err,
key,
ip,
});
});
};
ExpressBruteFlexible._getKey = function (arr) {
let key = '';
arr.forEach((part) => {
if (part) {
key += crypto.createHash('sha256').update(part).digest('base64');
}
});
return crypto.createHash('sha256').update(key).digest('base64');
};
const setRetryAfter = function (res, nextValidRequestDate) {
const secondUntilNextRequest = Math.ceil((nextValidRequestDate.getTime() - Date.now()) / 1000);
res.header('Retry-After', secondUntilNextRequest);
};
ExpressBruteFlexible.FailTooManyRequests = function (req, res, next, nextValidRequestDate) {
setRetryAfter(res, nextValidRequestDate);
res.status(429);
res.send({
error: {
text: 'Too many requests in this time frame.',
nextValidRequestDate,
},
});
};
ExpressBruteFlexible.FailForbidden = function (req, res, next, nextValidRequestDate) {
setRetryAfter(res, nextValidRequestDate);
res.status(403);
res.send({
error: {
text: 'Too many requests in this time frame.',
nextValidRequestDate,
},
});
};
ExpressBruteFlexible.FailMark = function (req, res, next, nextValidRequestDate) {
res.status(429);
setRetryAfter(res, nextValidRequestDate);
res.nextValidRequestDate = nextValidRequestDate;
next();
};
ExpressBruteFlexible.defaults = {
freeRetries: 2,
attachResetToRequest: true,
minWait: 500,
maxWait: 1000 * 60 * 15,
failCallback: ExpressBruteFlexible.FailTooManyRequests,
handleStoreError(err) {
// eslint-disable-next-line
throw {
message: err.message,
parent: err.parent,
};
},
};
ExpressBruteFlexible.LIMITER_TYPES = LIMITER_TYPES;
ExpressBruteFlexible.instanceCount = 0;
module.exports = ExpressBruteFlexible;

View File

@@ -0,0 +1,195 @@
const RateLimiterRes = require('./RateLimiterRes');
module.exports = class RLWrapperBlackAndWhite {
constructor(opts = {}) {
this.limiter = opts.limiter;
this.blackList = opts.blackList;
this.whiteList = opts.whiteList;
this.isBlackListed = opts.isBlackListed;
this.isWhiteListed = opts.isWhiteListed;
this.runActionAnyway = opts.runActionAnyway;
}
get limiter() {
return this._limiter;
}
set limiter(value) {
if (typeof value === 'undefined') {
throw new Error('limiter is not set');
}
this._limiter = value;
}
get runActionAnyway() {
return this._runActionAnyway;
}
set runActionAnyway(value) {
this._runActionAnyway = typeof value === 'undefined' ? false : value;
}
get blackList() {
return this._blackList;
}
set blackList(value) {
this._blackList = Array.isArray(value) ? value : [];
}
get isBlackListed() {
return this._isBlackListed;
}
set isBlackListed(func) {
if (typeof func === 'undefined') {
func = () => false;
}
if (typeof func !== 'function') {
throw new Error('isBlackListed must be function');
}
this._isBlackListed = func;
}
get whiteList() {
return this._whiteList;
}
set whiteList(value) {
this._whiteList = Array.isArray(value) ? value : [];
}
get isWhiteListed() {
return this._isWhiteListed;
}
set isWhiteListed(func) {
if (typeof func === 'undefined') {
func = () => false;
}
if (typeof func !== 'function') {
throw new Error('isWhiteListed must be function');
}
this._isWhiteListed = func;
}
isBlackListedSomewhere(key) {
return this.blackList.indexOf(key) >= 0 || this.isBlackListed(key);
}
isWhiteListedSomewhere(key) {
return this.whiteList.indexOf(key) >= 0 || this.isWhiteListed(key);
}
getBlackRes() {
return new RateLimiterRes(0, Number.MAX_SAFE_INTEGER, 0, false);
}
getWhiteRes() {
return new RateLimiterRes(Number.MAX_SAFE_INTEGER, 0, 0, false);
}
rejectBlack() {
return Promise.reject(this.getBlackRes());
}
resolveBlack() {
return Promise.resolve(this.getBlackRes());
}
resolveWhite() {
return Promise.resolve(this.getWhiteRes());
}
consume(key, pointsToConsume = 1) {
let res;
if (this.isWhiteListedSomewhere(key)) {
res = this.resolveWhite();
} else if (this.isBlackListedSomewhere(key)) {
res = this.rejectBlack();
}
if (typeof res === 'undefined') {
return this.limiter.consume(key, pointsToConsume);
}
if (this.runActionAnyway) {
this.limiter.consume(key, pointsToConsume).catch(() => {});
}
return res;
}
block(key, secDuration) {
let res;
if (this.isWhiteListedSomewhere(key)) {
res = this.resolveWhite();
} else if (this.isBlackListedSomewhere(key)) {
res = this.resolveBlack();
}
if (typeof res === 'undefined') {
return this.limiter.block(key, secDuration);
}
if (this.runActionAnyway) {
this.limiter.block(key, secDuration).catch(() => {});
}
return res;
}
penalty(key, points) {
let res;
if (this.isWhiteListedSomewhere(key)) {
res = this.resolveWhite();
} else if (this.isBlackListedSomewhere(key)) {
res = this.resolveBlack();
}
if (typeof res === 'undefined') {
return this.limiter.penalty(key, points);
}
if (this.runActionAnyway) {
this.limiter.penalty(key, points).catch(() => {});
}
return res;
}
reward(key, points) {
let res;
if (this.isWhiteListedSomewhere(key)) {
res = this.resolveWhite();
} else if (this.isBlackListedSomewhere(key)) {
res = this.resolveBlack();
}
if (typeof res === 'undefined') {
return this.limiter.reward(key, points);
}
if (this.runActionAnyway) {
this.limiter.reward(key, points).catch(() => {});
}
return res;
}
get(key) {
let res;
if (this.isWhiteListedSomewhere(key)) {
res = this.resolveWhite();
} else if (this.isBlackListedSomewhere(key)) {
res = this.resolveBlack();
}
if (typeof res === 'undefined' || this.runActionAnyway) {
return this.limiter.get(key);
}
return res;
}
delete(key) {
return this.limiter.delete(key);
}
};

View File

@@ -0,0 +1,125 @@
module.exports = class RateLimiterAbstract {
/**
*
* @param opts Object Defaults {
* points: 4, // Number of points
* duration: 1, // Per seconds
* blockDuration: 0, // Block if consumed more than points in current duration for blockDuration seconds
* execEvenly: false, // Execute allowed actions evenly over duration
* execEvenlyMinDelayMs: duration * 1000 / points, // ms, works with execEvenly=true option
* keyPrefix: 'rlflx',
* }
*/
constructor(opts = {}) {
this.points = opts.points;
this.duration = opts.duration;
this.blockDuration = opts.blockDuration;
this.execEvenly = opts.execEvenly;
this.execEvenlyMinDelayMs = opts.execEvenlyMinDelayMs;
this.keyPrefix = opts.keyPrefix;
}
get points() {
return this._points;
}
set points(value) {
this._points = value >= 0 ? value : 4;
}
get duration() {
return this._duration;
}
set duration(value) {
this._duration = typeof value === 'undefined' ? 1 : value;
}
get msDuration() {
return this.duration * 1000;
}
get blockDuration() {
return this._blockDuration;
}
set blockDuration(value) {
this._blockDuration = typeof value === 'undefined' ? 0 : value;
}
get msBlockDuration() {
return this.blockDuration * 1000;
}
get execEvenly() {
return this._execEvenly;
}
set execEvenly(value) {
this._execEvenly = typeof value === 'undefined' ? false : Boolean(value);
}
get execEvenlyMinDelayMs() {
return this._execEvenlyMinDelayMs;
}
set execEvenlyMinDelayMs(value) {
this._execEvenlyMinDelayMs = typeof value === 'undefined' ? Math.ceil(this.msDuration / this.points) : value;
}
get keyPrefix() {
return this._keyPrefix;
}
set keyPrefix(value) {
if (typeof value === 'undefined') {
value = 'rlflx';
}
if (typeof value !== 'string') {
throw new Error('keyPrefix must be string');
}
this._keyPrefix = value;
}
_getKeySecDuration(options = {}) {
return options && options.customDuration >= 0
? options.customDuration
: this.duration;
}
getKey(key) {
return this.keyPrefix.length > 0 ? `${this.keyPrefix}:${key}` : key;
}
parseKey(rlKey) {
return rlKey.substring(this.keyPrefix.length);
}
consume() {
throw new Error("You have to implement the method 'consume'!");
}
penalty() {
throw new Error("You have to implement the method 'penalty'!");
}
reward() {
throw new Error("You have to implement the method 'reward'!");
}
get() {
throw new Error("You have to implement the method 'get'!");
}
set() {
throw new Error("You have to implement the method 'set'!");
}
block() {
throw new Error("You have to implement the method 'block'!");
}
delete() {
throw new Error("You have to implement the method 'delete'!");
}
};

View File

@@ -0,0 +1,367 @@
/**
* Implements rate limiting in cluster using built-in IPC
*
* Two classes are described here: master and worker
* Master have to be create in the master process without any options.
* Any number of rate limiters can be created in workers, but each rate limiter must be with unique keyPrefix
*
* Workflow:
* 1. master rate limiter created in master process
* 2. worker rate limiter sends 'init' message with necessary options during creating
* 3. master receives options and adds new rate limiter by keyPrefix if it isn't created yet
* 4. master sends 'init' back to worker's rate limiter
* 5. worker can process requests immediately,
* but they will be postponed by 'workerWaitInit' until master sends 'init' to worker
* 6. every request to worker rate limiter creates a promise
* 7. if master doesn't response for 'timeout', promise is rejected
* 8. master sends 'resolve' or 'reject' command to worker
* 9. worker resolves or rejects promise depending on message from master
*
*/
const cluster = require('cluster');
const crypto = require('crypto');
const RateLimiterAbstract = require('./RateLimiterAbstract');
const RateLimiterMemory = require('./RateLimiterMemory');
const RateLimiterRes = require('./RateLimiterRes');
const channel = 'rate_limiter_flexible';
let masterInstance = null;
const masterSendToWorker = function (worker, msg, type, res) {
let data;
if (res === null || res === true || res === false) {
data = res;
} else {
data = {
remainingPoints: res.remainingPoints,
msBeforeNext: res.msBeforeNext,
consumedPoints: res.consumedPoints,
isFirstInDuration: res.isFirstInDuration,
};
}
worker.send({
channel,
keyPrefix: msg.keyPrefix, // which rate limiter exactly
promiseId: msg.promiseId,
type,
data,
});
};
const workerWaitInit = function (payload) {
setTimeout(() => {
if (this._initiated) {
process.send(payload);
// Promise will be removed by timeout if too long
} else if (typeof this._promises[payload.promiseId] !== 'undefined') {
workerWaitInit.call(this, payload);
}
}, 30);
};
const workerSendToMaster = function (func, promiseId, key, arg, opts) {
const payload = {
channel,
keyPrefix: this.keyPrefix,
func,
promiseId,
data: {
key,
arg,
opts,
},
};
if (!this._initiated) {
// Wait init before sending messages to master
workerWaitInit.call(this, payload);
} else {
process.send(payload);
}
};
const masterProcessMsg = function (worker, msg) {
if (!msg || msg.channel !== channel || typeof this._rateLimiters[msg.keyPrefix] === 'undefined') {
return false;
}
let promise;
switch (msg.func) {
case 'consume':
promise = this._rateLimiters[msg.keyPrefix].consume(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'penalty':
promise = this._rateLimiters[msg.keyPrefix].penalty(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'reward':
promise = this._rateLimiters[msg.keyPrefix].reward(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'block':
promise = this._rateLimiters[msg.keyPrefix].block(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'get':
promise = this._rateLimiters[msg.keyPrefix].get(msg.data.key, msg.data.opts);
break;
case 'delete':
promise = this._rateLimiters[msg.keyPrefix].delete(msg.data.key, msg.data.opts);
break;
default:
return false;
}
if (promise) {
promise
.then((res) => {
masterSendToWorker(worker, msg, 'resolve', res);
})
.catch((rejRes) => {
masterSendToWorker(worker, msg, 'reject', rejRes);
});
}
};
const workerProcessMsg = function (msg) {
if (!msg || msg.channel !== channel || msg.keyPrefix !== this.keyPrefix) {
return false;
}
if (this._promises[msg.promiseId]) {
clearTimeout(this._promises[msg.promiseId].timeoutId);
let res;
if (msg.data === null || msg.data === true || msg.data === false) {
res = msg.data;
} else {
res = new RateLimiterRes(
msg.data.remainingPoints,
msg.data.msBeforeNext,
msg.data.consumedPoints,
msg.data.isFirstInDuration // eslint-disable-line comma-dangle
);
}
switch (msg.type) {
case 'resolve':
this._promises[msg.promiseId].resolve(res);
break;
case 'reject':
this._promises[msg.promiseId].reject(res);
break;
default:
throw new Error(`RateLimiterCluster: no such message type '${msg.type}'`);
}
delete this._promises[msg.promiseId];
}
};
/**
* Prepare options to send to master
* Master will create rate limiter depending on options
*
* @returns {{points: *, duration: *, blockDuration: *, execEvenly: *, execEvenlyMinDelayMs: *, keyPrefix: *}}
*/
const getOpts = function () {
return {
points: this.points,
duration: this.duration,
blockDuration: this.blockDuration,
execEvenly: this.execEvenly,
execEvenlyMinDelayMs: this.execEvenlyMinDelayMs,
keyPrefix: this.keyPrefix,
};
};
const savePromise = function (resolve, reject) {
const hrtime = process.hrtime();
let promiseId = hrtime[0].toString() + hrtime[1].toString();
if (typeof this._promises[promiseId] !== 'undefined') {
promiseId += crypto.randomBytes(12).toString('base64');
}
this._promises[promiseId] = {
resolve,
reject,
timeoutId: setTimeout(() => {
delete this._promises[promiseId];
reject(new Error('RateLimiterCluster timeout: no answer from master in time'));
}, this.timeoutMs),
};
return promiseId;
};
class RateLimiterClusterMaster {
constructor() {
if (masterInstance) {
return masterInstance;
}
this._rateLimiters = {};
cluster.setMaxListeners(0);
cluster.on('message', (worker, msg) => {
if (msg && msg.channel === channel && msg.type === 'init') {
// If init request, check or create rate limiter by key prefix and send 'init' back to worker
if (typeof this._rateLimiters[msg.opts.keyPrefix] === 'undefined') {
this._rateLimiters[msg.opts.keyPrefix] = new RateLimiterMemory(msg.opts);
}
worker.send({
channel,
type: 'init',
keyPrefix: msg.opts.keyPrefix,
});
} else {
masterProcessMsg.call(this, worker, msg);
}
});
masterInstance = this;
}
}
class RateLimiterClusterMasterPM2 {
constructor(pm2) {
if (masterInstance) {
return masterInstance;
}
this._rateLimiters = {};
pm2.launchBus((err, pm2Bus) => {
pm2Bus.on('process:msg', (packet) => {
const msg = packet.raw;
if (msg && msg.channel === channel && msg.type === 'init') {
// If init request, check or create rate limiter by key prefix and send 'init' back to worker
if (typeof this._rateLimiters[msg.opts.keyPrefix] === 'undefined') {
this._rateLimiters[msg.opts.keyPrefix] = new RateLimiterMemory(msg.opts);
}
pm2.sendDataToProcessId(packet.process.pm_id, {
data: {},
topic: channel,
channel,
type: 'init',
keyPrefix: msg.opts.keyPrefix,
}, (sendErr, res) => {
if (sendErr) {
console.log(sendErr, res);
}
});
} else {
const worker = {
send: (msgData) => {
const pm2Message = msgData;
pm2Message.topic = channel;
if (typeof pm2Message.data === 'undefined') {
pm2Message.data = {};
}
pm2.sendDataToProcessId(packet.process.pm_id, pm2Message, (sendErr, res) => {
if (sendErr) {
console.log(sendErr, res);
}
});
},
};
masterProcessMsg.call(this, worker, msg);
}
});
});
masterInstance = this;
}
}
class RateLimiterClusterWorker extends RateLimiterAbstract {
get timeoutMs() {
return this._timeoutMs;
}
set timeoutMs(value) {
this._timeoutMs = typeof value === 'undefined' ? 5000 : Math.abs(parseInt(value));
}
constructor(opts = {}) {
super(opts);
process.setMaxListeners(0);
this.timeoutMs = opts.timeoutMs;
this._initiated = false;
process.on('message', (msg) => {
if (msg && msg.channel === channel && msg.type === 'init' && msg.keyPrefix === this.keyPrefix) {
this._initiated = true;
} else {
workerProcessMsg.call(this, msg);
}
});
// Create limiter on master with specific options
process.send({
channel,
type: 'init',
opts: getOpts.call(this),
});
this._promises = {};
}
consume(key, pointsToConsume = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'consume', promiseId, key, pointsToConsume, options);
});
}
penalty(key, points = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'penalty', promiseId, key, points, options);
});
}
reward(key, points = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'reward', promiseId, key, points, options);
});
}
block(key, secDuration, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'block', promiseId, key, secDuration, options);
});
}
get(key, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'get', promiseId, key, options);
});
}
delete(key, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);
workerSendToMaster.call(this, 'delete', promiseId, key, options);
});
}
}
module.exports = {
RateLimiterClusterMaster,
RateLimiterClusterMasterPM2,
RateLimiterCluster: RateLimiterClusterWorker,
};

View File

@@ -0,0 +1,150 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');
class RateLimiterMemcache extends RateLimiterStoreAbstract {
/**
*
* @param {Object} opts
* Defaults {
* ... see other in RateLimiterStoreAbstract
*
* storeClient: memcacheClient
* }
*/
constructor(opts) {
super(opts);
this.client = opts.storeClient;
}
_getRateLimiterRes(rlKey, changedPoints, result) {
const res = new RateLimiterRes();
res.consumedPoints = parseInt(result.consumedPoints);
res.isFirstInDuration = result.consumedPoints === changedPoints;
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = result.msBeforeNext;
return res;
}
_upsert(rlKey, points, msDuration, forceExpire = false, options = {}) {
return new Promise((resolve, reject) => {
const nowMs = Date.now();
const secDuration = Math.floor(msDuration / 1000);
if (forceExpire) {
this.client.set(rlKey, points, secDuration, (err) => {
if (!err) {
this.client.set(
`${rlKey}_expire`,
secDuration > 0 ? nowMs + (secDuration * 1000) : -1,
secDuration,
() => {
const res = {
consumedPoints: points,
msBeforeNext: secDuration > 0 ? secDuration * 1000 : -1,
};
resolve(res);
}
);
} else {
reject(err);
}
});
} else {
this.client.incr(rlKey, points, (err, consumedPoints) => {
if (err || consumedPoints === false) {
this.client.add(rlKey, points, secDuration, (errAddKey, createdNew) => {
if (errAddKey || !createdNew) {
// Try to upsert again in case of race condition
if (typeof options.attemptNumber === 'undefined' || options.attemptNumber < 3) {
const nextOptions = Object.assign({}, options);
nextOptions.attemptNumber = nextOptions.attemptNumber ? (nextOptions.attemptNumber + 1) : 1;
this._upsert(rlKey, points, msDuration, forceExpire, nextOptions)
.then(resUpsert => resolve(resUpsert))
.catch(errUpsert => reject(errUpsert));
} else {
reject(new Error('Can not add key'));
}
} else {
this.client.add(
`${rlKey}_expire`,
secDuration > 0 ? nowMs + (secDuration * 1000) : -1,
secDuration,
() => {
const res = {
consumedPoints: points,
msBeforeNext: secDuration > 0 ? secDuration * 1000 : -1,
};
resolve(res);
}
);
}
});
} else {
this.client.get(`${rlKey}_expire`, (errGetExpire, resGetExpireMs) => {
if (errGetExpire) {
reject(errGetExpire);
} else {
const expireMs = resGetExpireMs === false ? 0 : resGetExpireMs;
const res = {
consumedPoints,
msBeforeNext: expireMs >= 0 ? Math.max(expireMs - nowMs, 0) : -1,
};
resolve(res);
}
});
}
});
}
});
}
_get(rlKey) {
return new Promise((resolve, reject) => {
const nowMs = Date.now();
this.client.get(rlKey, (err, consumedPoints) => {
if (!consumedPoints) {
resolve(null);
} else {
this.client.get(`${rlKey}_expire`, (errGetExpire, resGetExpireMs) => {
if (errGetExpire) {
reject(errGetExpire);
} else {
const expireMs = resGetExpireMs === false ? 0 : resGetExpireMs;
const res = {
consumedPoints,
msBeforeNext: expireMs >= 0 ? Math.max(expireMs - nowMs, 0) : -1,
};
resolve(res);
}
});
}
});
});
}
_delete(rlKey) {
return new Promise((resolve, reject) => {
this.client.del(rlKey, (err, res) => {
if (err) {
reject(err);
} else if (res === false) {
resolve(res);
} else {
this.client.del(`${rlKey}_expire`, (errDelExpire) => {
if (errDelExpire) {
reject(errDelExpire);
} else {
resolve(res);
}
});
}
});
});
}
}
module.exports = RateLimiterMemcache;

View File

@@ -0,0 +1,106 @@
const RateLimiterAbstract = require('./RateLimiterAbstract');
const MemoryStorage = require('./component/MemoryStorage/MemoryStorage');
const RateLimiterRes = require('./RateLimiterRes');
class RateLimiterMemory extends RateLimiterAbstract {
constructor(opts = {}) {
super(opts);
this._memoryStorage = new MemoryStorage();
}
/**
*
* @param key
* @param pointsToConsume
* @param {Object} options
* @returns {Promise<RateLimiterRes>}
*/
consume(key, pointsToConsume = 1, options = {}) {
return new Promise((resolve, reject) => {
const rlKey = this.getKey(key);
const secDuration = this._getKeySecDuration(options);
let res = this._memoryStorage.incrby(rlKey, pointsToConsume, secDuration);
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
if (res.consumedPoints > this.points) {
// Block only first time when consumed more than points
if (this.blockDuration > 0 && res.consumedPoints <= (this.points + pointsToConsume)) {
// Block key
res = this._memoryStorage.set(rlKey, res.consumedPoints, this.blockDuration);
}
reject(res);
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
// Execute evenly
let delay = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2));
if (delay < this.execEvenlyMinDelayMs) {
delay = res.consumedPoints * this.execEvenlyMinDelayMs;
}
setTimeout(resolve, delay, res);
} else {
resolve(res);
}
});
}
penalty(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const secDuration = this._getKeySecDuration(options);
const res = this._memoryStorage.incrby(rlKey, points, secDuration);
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
resolve(res);
});
}
reward(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const secDuration = this._getKeySecDuration(options);
const res = this._memoryStorage.incrby(rlKey, -points, secDuration);
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
resolve(res);
});
}
/**
* Block any key for secDuration seconds
*
* @param key
* @param secDuration
*/
block(key, secDuration) {
const msDuration = secDuration * 1000;
const initPoints = this.points + 1;
this._memoryStorage.set(this.getKey(key), initPoints, secDuration);
return Promise.resolve(
new RateLimiterRes(0, msDuration === 0 ? -1 : msDuration, initPoints)
);
}
set(key, points, secDuration) {
const msDuration = (secDuration >= 0 ? secDuration : this.duration) * 1000;
this._memoryStorage.set(this.getKey(key), points, secDuration);
return Promise.resolve(
new RateLimiterRes(0, msDuration === 0 ? -1 : msDuration, points)
);
}
get(key) {
const res = this._memoryStorage.get(this.getKey(key));
if (res !== null) {
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
}
return Promise.resolve(res);
}
delete(key) {
return Promise.resolve(this._memoryStorage.delete(this.getKey(key)));
}
}
module.exports = RateLimiterMemory;

View File

@@ -0,0 +1,273 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');
/**
* Get MongoDB driver version as upsert options differ
* @params {Object} Client instance
* @returns {Object} Version Object containing major, feature & minor versions.
*/
function getDriverVersion(client) {
try {
const _client = client.client ? client.client : client;
const { version } = _client.topology.s.options.metadata.driver;
const _v = version.split('.').map(v => parseInt(v));
return {
major: _v[0],
feature: _v[1],
patch: _v[2],
};
} catch (err) {
return { major: 0, feature: 0, patch: 0 };
}
}
class RateLimiterMongo extends RateLimiterStoreAbstract {
/**
*
* @param {Object} opts
* Defaults {
* indexKeyPrefix: {attr1: 1, attr2: 1}
* ... see other in RateLimiterStoreAbstract
*
* mongo: MongoClient
* }
*/
constructor(opts) {
super(opts);
this.dbName = opts.dbName;
this.tableName = opts.tableName;
this.indexKeyPrefix = opts.indexKeyPrefix;
if (opts.mongo) {
this.client = opts.mongo;
} else {
this.client = opts.storeClient;
}
if (typeof this.client.then === 'function') {
// If Promise
this.client
.then((conn) => {
this.client = conn;
this._initCollection();
this._driverVersion = getDriverVersion(this.client);
});
} else {
this._initCollection();
this._driverVersion = getDriverVersion(this.client);
}
}
get dbName() {
return this._dbName;
}
set dbName(value) {
this._dbName = typeof value === 'undefined' ? RateLimiterMongo.getDbName() : value;
}
static getDbName() {
return 'node-rate-limiter-flexible';
}
get tableName() {
return this._tableName;
}
set tableName(value) {
this._tableName = typeof value === 'undefined' ? this.keyPrefix : value;
}
get client() {
return this._client;
}
set client(value) {
if (typeof value === 'undefined') {
throw new Error('mongo is not set');
}
this._client = value;
}
get indexKeyPrefix() {
return this._indexKeyPrefix;
}
set indexKeyPrefix(obj) {
this._indexKeyPrefix = obj || {};
}
_initCollection() {
const db = typeof this.client.db === 'function'
? this.client.db(this.dbName)
: this.client;
const collection = db.collection(this.tableName);
collection.createIndex({ expire: -1 }, { expireAfterSeconds: 0 });
collection.createIndex(Object.assign({}, this.indexKeyPrefix, { key: 1 }), { unique: true });
this._collection = collection;
}
_getRateLimiterRes(rlKey, changedPoints, result) {
const res = new RateLimiterRes();
let doc;
if (typeof result.value === 'undefined') {
doc = result;
} else {
doc = result.value;
}
res.isFirstInDuration = doc.points === changedPoints;
res.consumedPoints = doc.points;
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = doc.expire !== null
? Math.max(new Date(doc.expire).getTime() - Date.now(), 0)
: -1;
return res;
}
_upsert(key, points, msDuration, forceExpire = false, options = {}) {
if (!this._collection) {
return Promise.reject(Error('Mongo connection is not established'));
}
const docAttrs = options.attrs || {};
let where;
let upsertData;
if (forceExpire) {
where = { key };
where = Object.assign(where, docAttrs);
upsertData = {
$set: {
key,
points,
expire: msDuration > 0 ? new Date(Date.now() + msDuration) : null,
},
};
upsertData.$set = Object.assign(upsertData.$set, docAttrs);
} else {
where = {
$or: [
{ expire: { $gt: new Date() } },
{ expire: { $eq: null } },
],
key,
};
where = Object.assign(where, docAttrs);
upsertData = {
$setOnInsert: {
key,
expire: msDuration > 0 ? new Date(Date.now() + msDuration) : null,
},
$inc: { points },
};
upsertData.$setOnInsert = Object.assign(upsertData.$setOnInsert, docAttrs);
}
// Options for collection updates differ between driver versions
const upsertOptions = {
upsert: true,
};
if ((this._driverVersion.major >= 4) ||
(this._driverVersion.major === 3 &&
(this._driverVersion.feature >=7) ||
(this._driverVersion.feature >= 6 &&
this._driverVersion.patch >= 7 )))
{
upsertOptions.returnDocument = 'after';
} else {
upsertOptions.returnOriginal = false;
}
/*
* 1. Find actual limit and increment points
* 2. If limit expired, but Mongo doesn't clean doc by TTL yet, try to replace limit doc completely
* 3. If 2 or more Mongo threads try to insert the new limit doc, only the first succeed
* 4. Try to upsert from step 1. Actual limit is created now, points are incremented without problems
*/
return new Promise((resolve, reject) => {
this._collection.findOneAndUpdate(
where,
upsertData,
upsertOptions
).then((res) => {
resolve(res);
}).catch((errUpsert) => {
if (errUpsert && errUpsert.code === 11000) { // E11000 duplicate key error collection
const replaceWhere = Object.assign({ // try to replace OLD limit doc
$or: [
{ expire: { $lte: new Date() } },
{ expire: { $eq: null } },
],
key,
}, docAttrs);
const replaceTo = {
$set: Object.assign({
key,
points,
expire: msDuration > 0 ? new Date(Date.now() + msDuration) : null,
}, docAttrs)
};
this._collection.findOneAndUpdate(
replaceWhere,
replaceTo,
upsertOptions
).then((res) => {
resolve(res);
}).catch((errReplace) => {
if (errReplace && errReplace.code === 11000) { // E11000 duplicate key error collection
this._upsert(key, points, msDuration, forceExpire)
.then(res => resolve(res))
.catch(err => reject(err));
} else {
reject(errReplace);
}
});
} else {
reject(errUpsert);
}
});
});
}
_get(rlKey, options = {}) {
if (!this._collection) {
return Promise.reject(Error('Mongo connection is not established'));
}
const docAttrs = options.attrs || {};
const where = Object.assign({
key: rlKey,
$or: [
{ expire: { $gt: new Date() } },
{ expire: { $eq: null } },
],
}, docAttrs);
return this._collection.findOne(where);
}
_delete(rlKey, options = {}) {
if (!this._collection) {
return Promise.reject(Error('Mongo connection is not established'));
}
const docAttrs = options.attrs || {};
const where = Object.assign({ key: rlKey }, docAttrs);
return this._collection.deleteOne(where)
.then(res => res.deletedCount > 0);
}
}
module.exports = RateLimiterMongo;

View File

@@ -0,0 +1,379 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');
class RateLimiterMySQL extends RateLimiterStoreAbstract {
/**
* @callback callback
* @param {Object} err
*
* @param {Object} opts
* @param {callback} cb
* Defaults {
* ... see other in RateLimiterStoreAbstract
*
* storeClient: anySqlClient,
* storeType: 'knex', // required only for Knex instance
* dbName: 'string',
* tableName: 'string',
* }
*/
constructor(opts, cb = null) {
super(opts);
this.client = opts.storeClient;
this.clientType = opts.storeType;
this.dbName = opts.dbName;
this.tableName = opts.tableName;
this.clearExpiredByTimeout = opts.clearExpiredByTimeout;
this.tableCreated = opts.tableCreated;
if (!this.tableCreated) {
this._createDbAndTable()
.then(() => {
this.tableCreated = true;
if (this.clearExpiredByTimeout) {
this._clearExpiredHourAgo();
}
if (typeof cb === 'function') {
cb();
}
})
.catch((err) => {
if (typeof cb === 'function') {
cb(err);
} else {
throw err;
}
});
} else {
if (this.clearExpiredByTimeout) {
this._clearExpiredHourAgo();
}
if (typeof cb === 'function') {
cb();
}
}
}
clearExpired(expire) {
return new Promise((resolve) => {
this._getConnection()
.then((conn) => {
conn.query(`DELETE FROM ??.?? WHERE expire < ?`, [this.dbName, this.tableName, expire], () => {
this._releaseConnection(conn);
resolve();
});
})
.catch(() => {
resolve();
});
});
}
_clearExpiredHourAgo() {
if (this._clearExpiredTimeoutId) {
clearTimeout(this._clearExpiredTimeoutId);
}
this._clearExpiredTimeoutId = setTimeout(() => {
this.clearExpired(Date.now() - 3600000) // Never rejected
.then(() => {
this._clearExpiredHourAgo();
});
}, 300000);
this._clearExpiredTimeoutId.unref();
}
/**
*
* @return Promise<any>
* @private
*/
_getConnection() {
switch (this.clientType) {
case 'pool':
return new Promise((resolve, reject) => {
this.client.getConnection((errConn, conn) => {
if (errConn) {
return reject(errConn);
}
resolve(conn);
});
});
case 'sequelize':
return this.client.connectionManager.getConnection();
case 'knex':
return this.client.client.acquireConnection();
default:
return Promise.resolve(this.client);
}
}
_releaseConnection(conn) {
switch (this.clientType) {
case 'pool':
return conn.release();
case 'sequelize':
return this.client.connectionManager.releaseConnection(conn);
case 'knex':
return this.client.client.releaseConnection(conn);
default:
return true;
}
}
/**
*
* @returns {Promise<any>}
* @private
*/
_createDbAndTable() {
return new Promise((resolve, reject) => {
this._getConnection()
.then((conn) => {
conn.query(`CREATE DATABASE IF NOT EXISTS \`${this.dbName}\`;`, (errDb) => {
if (errDb) {
this._releaseConnection(conn);
return reject(errDb);
}
conn.query(this._getCreateTableStmt(), (err) => {
if (err) {
this._releaseConnection(conn);
return reject(err);
}
this._releaseConnection(conn);
resolve();
});
});
})
.catch((err) => {
reject(err);
});
});
}
_getCreateTableStmt() {
return `CREATE TABLE IF NOT EXISTS \`${this.dbName}\`.\`${this.tableName}\` (` +
'`key` VARCHAR(255) CHARACTER SET utf8 NOT NULL,' +
'`points` INT(9) NOT NULL default 0,' +
'`expire` BIGINT UNSIGNED,' +
'PRIMARY KEY (`key`)' +
') ENGINE = INNODB;';
}
get clientType() {
return this._clientType;
}
set clientType(value) {
if (typeof value === 'undefined') {
if (this.client.constructor.name === 'Connection') {
value = 'connection';
} else if (this.client.constructor.name === 'Pool') {
value = 'pool';
} else if (this.client.constructor.name === 'Sequelize') {
value = 'sequelize';
} else {
throw new Error('storeType is not defined');
}
}
this._clientType = value.toLowerCase();
}
get dbName() {
return this._dbName;
}
set dbName(value) {
this._dbName = typeof value === 'undefined' ? 'rtlmtrflx' : value;
}
get tableName() {
return this._tableName;
}
set tableName(value) {
this._tableName = typeof value === 'undefined' ? this.keyPrefix : value;
}
get tableCreated() {
return this._tableCreated
}
set tableCreated(value) {
this._tableCreated = typeof value === 'undefined' ? false : !!value;
}
get clearExpiredByTimeout() {
return this._clearExpiredByTimeout;
}
set clearExpiredByTimeout(value) {
this._clearExpiredByTimeout = typeof value === 'undefined' ? true : Boolean(value);
}
_getRateLimiterRes(rlKey, changedPoints, result) {
const res = new RateLimiterRes();
const [row] = result;
res.isFirstInDuration = changedPoints === row.points;
res.consumedPoints = res.isFirstInDuration ? changedPoints : row.points;
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = row.expire
? Math.max(row.expire - Date.now(), 0)
: -1;
return res;
}
_upsertTransaction(conn, key, points, msDuration, forceExpire) {
return new Promise((resolve, reject) => {
conn.query('BEGIN', (errBegin) => {
if (errBegin) {
conn.rollback();
return reject(errBegin);
}
const dateNow = Date.now();
const newExpire = msDuration > 0 ? dateNow + msDuration : null;
let q;
let values;
if (forceExpire) {
q = `INSERT INTO ??.?? VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
points = ?,
expire = ?;`;
values = [
this.dbName, this.tableName, key, points, newExpire,
points,
newExpire,
];
} else {
q = `INSERT INTO ??.?? VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
points = IF(expire <= ?, ?, points + (?)),
expire = IF(expire <= ?, ?, expire);`;
values = [
this.dbName, this.tableName, key, points, newExpire,
dateNow, points, points,
dateNow, newExpire,
];
}
conn.query(q, values, (errUpsert) => {
if (errUpsert) {
conn.rollback();
return reject(errUpsert);
}
conn.query('SELECT points, expire FROM ??.?? WHERE `key` = ?;', [this.dbName, this.tableName, key], (errSelect, res) => {
if (errSelect) {
conn.rollback();
return reject(errSelect);
}
conn.query('COMMIT', (err) => {
if (err) {
conn.rollback();
return reject(err);
}
resolve(res);
});
});
});
});
});
}
_upsert(key, points, msDuration, forceExpire = false) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
return new Promise((resolve, reject) => {
this._getConnection()
.then((conn) => {
this._upsertTransaction(conn, key, points, msDuration, forceExpire)
.then((res) => {
resolve(res);
this._releaseConnection(conn);
})
.catch((err) => {
reject(err);
this._releaseConnection(conn);
});
})
.catch((err) => {
reject(err);
});
});
}
_get(rlKey) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
return new Promise((resolve, reject) => {
this._getConnection()
.then((conn) => {
conn.query(
'SELECT points, expire FROM ??.?? WHERE `key` = ? AND (`expire` > ? OR `expire` IS NULL)',
[this.dbName, this.tableName, rlKey, Date.now()],
(err, res) => {
if (err) {
reject(err);
} else if (res.length === 0) {
resolve(null);
} else {
resolve(res);
}
this._releaseConnection(conn);
} // eslint-disable-line
);
})
.catch((err) => {
reject(err);
});
});
}
_delete(rlKey) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
return new Promise((resolve, reject) => {
this._getConnection()
.then((conn) => {
conn.query(
'DELETE FROM ??.?? WHERE `key` = ?',
[this.dbName, this.tableName, rlKey],
(err, res) => {
if (err) {
reject(err);
} else {
resolve(res.affectedRows > 0);
}
this._releaseConnection(conn);
} // eslint-disable-line
);
})
.catch((err) => {
reject(err);
});
});
}
}
module.exports = RateLimiterMySQL;

View File

@@ -0,0 +1,312 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');
class RateLimiterPostgres extends RateLimiterStoreAbstract {
/**
* @callback callback
* @param {Object} err
*
* @param {Object} opts
* @param {callback} cb
* Defaults {
* ... see other in RateLimiterStoreAbstract
*
* storeClient: postgresClient,
* storeType: 'knex', // required only for Knex instance
* tableName: 'string',
* }
*/
constructor(opts, cb = null) {
super(opts);
this.client = opts.storeClient;
this.clientType = opts.storeType;
this.tableName = opts.tableName;
this.clearExpiredByTimeout = opts.clearExpiredByTimeout;
this.tableCreated = opts.tableCreated;
if (!this.tableCreated) {
this._createTable()
.then(() => {
this.tableCreated = true;
if (this.clearExpiredByTimeout) {
this._clearExpiredHourAgo();
}
if (typeof cb === 'function') {
cb();
}
})
.catch((err) => {
if (typeof cb === 'function') {
cb(err);
} else {
throw err;
}
});
} else {
if (typeof cb === 'function') {
cb();
}
}
}
clearExpired(expire) {
return new Promise((resolve) => {
const q = {
name: 'rlflx-clear-expired',
text: `DELETE FROM ${this.tableName} WHERE expire < $1`,
values: [expire],
};
this._query(q)
.then(() => {
resolve();
})
.catch(() => {
// Deleting expired query is not critical
resolve();
});
});
}
/**
* Delete all rows expired 1 hour ago once per 5 minutes
*
* @private
*/
_clearExpiredHourAgo() {
if (this._clearExpiredTimeoutId) {
clearTimeout(this._clearExpiredTimeoutId);
}
this._clearExpiredTimeoutId = setTimeout(() => {
this.clearExpired(Date.now() - 3600000) // Never rejected
.then(() => {
this._clearExpiredHourAgo();
});
}, 300000);
this._clearExpiredTimeoutId.unref();
}
/**
*
* @return Promise<any>
* @private
*/
_getConnection() {
switch (this.clientType) {
case 'pool':
return Promise.resolve(this.client);
case 'sequelize':
return this.client.connectionManager.getConnection();
case 'knex':
return this.client.client.acquireConnection();
case 'typeorm':
return Promise.resolve(this.client.driver.master);
default:
return Promise.resolve(this.client);
}
}
_releaseConnection(conn) {
switch (this.clientType) {
case 'pool':
return true;
case 'sequelize':
return this.client.connectionManager.releaseConnection(conn);
case 'knex':
return this.client.client.releaseConnection(conn);
case 'typeorm':
return true;
default:
return true;
}
}
/**
*
* @returns {Promise<any>}
* @private
*/
_createTable() {
return new Promise((resolve, reject) => {
this._query({
text: this._getCreateTableStmt(),
})
.then(() => {
resolve();
})
.catch((err) => {
if (err.code === '23505') {
// Error: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
// Postgres doesn't handle concurrent table creation
// It is supposed, that table is created by another worker
resolve();
} else {
reject(err);
}
});
});
}
_getCreateTableStmt() {
return `CREATE TABLE IF NOT EXISTS ${this.tableName} (
key varchar(255) PRIMARY KEY,
points integer NOT NULL DEFAULT 0,
expire bigint
);`;
}
get clientType() {
return this._clientType;
}
set clientType(value) {
const constructorName = this.client.constructor.name;
if (typeof value === 'undefined') {
if (constructorName === 'Client') {
value = 'client';
} else if (
constructorName === 'Pool' ||
constructorName === 'BoundPool'
) {
value = 'pool';
} else if (constructorName === 'Sequelize') {
value = 'sequelize';
} else {
throw new Error('storeType is not defined');
}
}
this._clientType = value.toLowerCase();
}
get tableName() {
return this._tableName;
}
set tableName(value) {
this._tableName = typeof value === 'undefined' ? this.keyPrefix : value;
}
get tableCreated() {
return this._tableCreated
}
set tableCreated(value) {
this._tableCreated = typeof value === 'undefined' ? false : !!value;
}
get clearExpiredByTimeout() {
return this._clearExpiredByTimeout;
}
set clearExpiredByTimeout(value) {
this._clearExpiredByTimeout = typeof value === 'undefined' ? true : Boolean(value);
}
_getRateLimiterRes(rlKey, changedPoints, result) {
const res = new RateLimiterRes();
const row = result.rows[0];
res.isFirstInDuration = changedPoints === row.points;
res.consumedPoints = res.isFirstInDuration ? changedPoints : row.points;
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = row.expire
? Math.max(row.expire - Date.now(), 0)
: -1;
return res;
}
_query(q) {
const prefix = this.tableName.toLowerCase();
const queryObj = { name: `${prefix}:${q.name}`, text: q.text, values: q.values };
return new Promise((resolve, reject) => {
this._getConnection()
.then((conn) => {
conn.query(queryObj)
.then((res) => {
resolve(res);
this._releaseConnection(conn);
})
.catch((err) => {
reject(err);
this._releaseConnection(conn);
});
})
.catch((err) => {
reject(err);
});
});
}
_upsert(key, points, msDuration, forceExpire = false) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
const newExpire = msDuration > 0 ? Date.now() + msDuration : null;
const expireQ = forceExpire
? ' $3 '
: ` CASE
WHEN ${this.tableName}.expire <= $4 THEN $3
ELSE ${this.tableName}.expire
END `;
return this._query({
name: forceExpire ? 'rlflx-upsert-force' : 'rlflx-upsert',
text: `
INSERT INTO ${this.tableName} VALUES ($1, $2, $3)
ON CONFLICT(key) DO UPDATE SET
points = CASE
WHEN (${this.tableName}.expire <= $4 OR 1=${forceExpire ? 1 : 0}) THEN $2
ELSE ${this.tableName}.points + ($2)
END,
expire = ${expireQ}
RETURNING points, expire;`,
values: [key, points, newExpire, Date.now()],
});
}
_get(rlKey) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
return new Promise((resolve, reject) => {
this._query({
name: 'rlflx-get',
text: `
SELECT points, expire FROM ${this.tableName} WHERE key = $1 AND (expire > $2 OR expire IS NULL);`,
values: [rlKey, Date.now()],
})
.then((res) => {
if (res.rowCount === 0) {
res = null;
}
resolve(res);
})
.catch((err) => {
reject(err);
});
});
}
_delete(rlKey) {
if (!this.tableCreated) {
return Promise.reject(Error('Table is not created yet'));
}
return this._query({
name: 'rlflx-delete',
text: `DELETE FROM ${this.tableName} WHERE key = $1`,
values: [rlKey],
})
.then(res => res.rowCount > 0);
}
}
module.exports = RateLimiterPostgres;

View File

@@ -0,0 +1,127 @@
const RateLimiterQueueError = require('./component/RateLimiterQueueError')
const MAX_QUEUE_SIZE = 4294967295;
const KEY_DEFAULT = 'limiter';
module.exports = class RateLimiterQueue {
constructor(limiterFlexible, opts = {
maxQueueSize: MAX_QUEUE_SIZE,
}) {
this._queueLimiters = {
KEY_DEFAULT: new RateLimiterQueueInternal(limiterFlexible, opts)
};
this._limiterFlexible = limiterFlexible;
this._maxQueueSize = opts.maxQueueSize
}
getTokensRemaining(key = KEY_DEFAULT) {
if (this._queueLimiters[key]) {
return this._queueLimiters[key].getTokensRemaining()
} else {
return Promise.resolve(this._limiterFlexible.points)
}
}
removeTokens(tokens, key = KEY_DEFAULT) {
if (!this._queueLimiters[key]) {
this._queueLimiters[key] = new RateLimiterQueueInternal(
this._limiterFlexible, {
key,
maxQueueSize: this._maxQueueSize,
})
}
return this._queueLimiters[key].removeTokens(tokens)
}
};
class RateLimiterQueueInternal {
constructor(limiterFlexible, opts = {
maxQueueSize: MAX_QUEUE_SIZE,
key: KEY_DEFAULT,
}) {
this._key = opts.key;
this._waitTimeout = null;
this._queue = [];
this._limiterFlexible = limiterFlexible;
this._maxQueueSize = opts.maxQueueSize
}
getTokensRemaining() {
return this._limiterFlexible.get(this._key)
.then((rlRes) => {
return rlRes !== null ? rlRes.remainingPoints : this._limiterFlexible.points;
})
}
removeTokens(tokens) {
const _this = this;
return new Promise((resolve, reject) => {
if (tokens > _this._limiterFlexible.points) {
reject(new RateLimiterQueueError(`Requested tokens ${tokens} exceeds maximum ${_this._limiterFlexible.points} tokens per interval`));
return
}
if (_this._queue.length > 0) {
_this._queueRequest.call(_this, resolve, reject, tokens);
} else {
_this._limiterFlexible.consume(_this._key, tokens)
.then((res) => {
resolve(res.remainingPoints);
})
.catch((rej) => {
if (rej instanceof Error) {
reject(rej);
} else {
_this._queueRequest.call(_this, resolve, reject, tokens);
if (_this._waitTimeout === null) {
_this._waitTimeout = setTimeout(_this._processFIFO.bind(_this), rej.msBeforeNext);
}
}
});
}
})
}
_queueRequest(resolve, reject, tokens) {
const _this = this;
if (_this._queue.length < _this._maxQueueSize) {
_this._queue.push({resolve, reject, tokens});
} else {
reject(new RateLimiterQueueError(`Number of requests reached it's maximum ${_this._maxQueueSize}`))
}
}
_processFIFO() {
const _this = this;
if (_this._waitTimeout !== null) {
clearTimeout(_this._waitTimeout);
_this._waitTimeout = null;
}
if (_this._queue.length === 0) {
return;
}
const item = _this._queue.shift();
_this._limiterFlexible.consume(_this._key, item.tokens)
.then((res) => {
item.resolve(res.remainingPoints);
_this._processFIFO.call(_this);
})
.catch((rej) => {
if (rej instanceof Error) {
item.reject(rej);
_this._processFIFO.call(_this);
} else {
_this._queue.unshift(item);
if (_this._waitTimeout === null) {
_this._waitTimeout = setTimeout(_this._processFIFO.bind(_this), rej.msBeforeNext);
}
}
});
}
}

View File

@@ -0,0 +1,173 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');
const incrTtlLuaScript = `redis.call('set', KEYS[1], 0, 'EX', ARGV[2], 'NX') \
local consumed = redis.call('incrby', KEYS[1], ARGV[1]) \
local ttl = redis.call('pttl', KEYS[1]) \
if ttl == -1 then \
redis.call('expire', KEYS[1], ARGV[2]) \
ttl = 1000 * ARGV[2] \
end \
return {consumed, ttl} \
`;
class RateLimiterRedis extends RateLimiterStoreAbstract {
/**
*
* @param {Object} opts
* Defaults {
* ... see other in RateLimiterStoreAbstract
*
* redis: RedisClient
* rejectIfRedisNotReady: boolean = false - reject / invoke insuranceLimiter immediately when redis connection is not "ready"
* }
*/
constructor(opts) {
super(opts);
if (opts.redis) {
this.client = opts.redis;
} else {
this.client = opts.storeClient;
}
this._rejectIfRedisNotReady = !!opts.rejectIfRedisNotReady;
if (typeof this.client.defineCommand === 'function') {
this.client.defineCommand("rlflxIncr", {
numberOfKeys: 1,
lua: incrTtlLuaScript,
});
}
}
/**
* Prevent actual redis call if redis connection is not ready
* Because of different connection state checks for ioredis and node-redis, only this clients would be actually checked.
* For any other clients all the requests would be passed directly to redis client
* @return {boolean}
* @private
*/
_isRedisReady() {
if (!this._rejectIfRedisNotReady) {
return true;
}
// ioredis client
if (this.client.status && this.client.status !== 'ready') {
return false;
}
// node-redis client
if (typeof this.client.isReady === 'function' && !this.client.isReady()) {
return false;
}
return true;
}
_getRateLimiterRes(rlKey, changedPoints, result) {
let [consumed, resTtlMs] = result;
// Support ioredis results format
if (Array.isArray(consumed)) {
[, consumed] = consumed;
[, resTtlMs] = resTtlMs;
}
const res = new RateLimiterRes();
res.consumedPoints = parseInt(consumed);
res.isFirstInDuration = res.consumedPoints === changedPoints;
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = resTtlMs;
return res;
}
_upsert(rlKey, points, msDuration, forceExpire = false) {
return new Promise((resolve, reject) => {
if (!this._isRedisReady()) {
return reject(new Error('Redis connection is not ready'));
}
const secDuration = Math.floor(msDuration / 1000);
const multi = this.client.multi();
if (forceExpire) {
if (secDuration > 0) {
multi.set(rlKey, points, 'EX', secDuration);
} else {
multi.set(rlKey, points);
}
multi.pttl(rlKey)
.exec((err, res) => {
if (err) {
return reject(err);
}
return resolve(res);
});
} else {
if (secDuration > 0) {
const incrCallback = function(err, result) {
if (err) {
return reject(err);
}
return resolve(result);
};
if (typeof this.client.rlflxIncr === 'function') {
this.client.rlflxIncr(rlKey, points, secDuration, incrCallback);
} else {
this.client.eval(incrTtlLuaScript, 1, rlKey, points, secDuration, incrCallback);
}
} else {
multi.incrby(rlKey, points)
.pttl(rlKey)
.exec((err, res) => {
if (err) {
return reject(err);
}
return resolve(res);
});
}
}
});
}
_get(rlKey) {
return new Promise((resolve, reject) => {
if (!this._isRedisReady()) {
return reject(new Error('Redis connection is not ready'));
}
this.client
.multi()
.get(rlKey)
.pttl(rlKey)
.exec((err, res) => {
if (err) {
reject(err);
} else {
const [points] = res;
if (points === null) {
return resolve(null)
}
resolve(res);
}
});
});
}
_delete(rlKey) {
return new Promise((resolve, reject) => {
this.client.del(rlKey, (err, res) => {
if (err) {
reject(err);
} else {
resolve(res > 0);
}
});
});
}
}
module.exports = RateLimiterRedis;

View File

@@ -0,0 +1,64 @@
module.exports = class RateLimiterRes {
constructor(remainingPoints, msBeforeNext, consumedPoints, isFirstInDuration) {
this.remainingPoints = typeof remainingPoints === 'undefined' ? 0 : remainingPoints; // Remaining points in current duration
this.msBeforeNext = typeof msBeforeNext === 'undefined' ? 0 : msBeforeNext; // Milliseconds before next action
this.consumedPoints = typeof consumedPoints === 'undefined' ? 0 : consumedPoints; // Consumed points in current duration
this.isFirstInDuration = typeof isFirstInDuration === 'undefined' ? false : isFirstInDuration;
}
get msBeforeNext() {
return this._msBeforeNext;
}
set msBeforeNext(ms) {
this._msBeforeNext = ms;
return this;
}
get remainingPoints() {
return this._remainingPoints;
}
set remainingPoints(p) {
this._remainingPoints = p;
return this;
}
get consumedPoints() {
return this._consumedPoints;
}
set consumedPoints(p) {
this._consumedPoints = p;
return this;
}
get isFirstInDuration() {
return this._isFirstInDuration;
}
set isFirstInDuration(value) {
this._isFirstInDuration = Boolean(value);
}
_getDecoratedProperties() {
return {
remainingPoints: this.remainingPoints,
msBeforeNext: this.msBeforeNext,
consumedPoints: this.consumedPoints,
isFirstInDuration: this.isFirstInDuration,
};
}
[Symbol.for("nodejs.util.inspect.custom")]() {
return this._getDecoratedProperties();
}
toString() {
return JSON.stringify(this._getDecoratedProperties());
}
toJSON() {
return this._getDecoratedProperties();
}
};

View File

@@ -0,0 +1,442 @@
const RateLimiterAbstract = require('./RateLimiterAbstract');
const BlockedKeys = require('./component/BlockedKeys');
const RateLimiterRes = require('./RateLimiterRes');
module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
/**
*
* @param opts Object Defaults {
* ... see other in RateLimiterAbstract
*
* inMemoryBlockOnConsumed: 40, // Number of points when key is blocked
* inMemoryBlockDuration: 10, // Block duration in seconds
* insuranceLimiter: RateLimiterAbstract
* }
*/
constructor(opts = {}) {
super(opts);
this.inMemoryBlockOnConsumed = opts.inMemoryBlockOnConsumed || opts.inmemoryBlockOnConsumed;
this.inMemoryBlockDuration = opts.inMemoryBlockDuration || opts.inmemoryBlockDuration;
this.insuranceLimiter = opts.insuranceLimiter;
this._inMemoryBlockedKeys = new BlockedKeys();
}
get client() {
return this._client;
}
set client(value) {
if (typeof value === 'undefined') {
throw new Error('storeClient is not set');
}
this._client = value;
}
/**
* Have to be launched after consume
* It blocks key and execute evenly depending on result from store
*
* It uses _getRateLimiterRes function to prepare RateLimiterRes from store result
*
* @param resolve
* @param reject
* @param rlKey
* @param changedPoints
* @param storeResult
* @param {Object} options
* @private
*/
_afterConsume(resolve, reject, rlKey, changedPoints, storeResult, options = {}) {
const res = this._getRateLimiterRes(rlKey, changedPoints, storeResult);
if (this.inMemoryBlockOnConsumed > 0 && !(this.inMemoryBlockDuration > 0)
&& res.consumedPoints >= this.inMemoryBlockOnConsumed
) {
this._inMemoryBlockedKeys.addMs(rlKey, res.msBeforeNext);
if (res.consumedPoints > this.points) {
return reject(res);
} else {
return resolve(res)
}
} else if (res.consumedPoints > this.points) {
let blockPromise = Promise.resolve();
// Block only first time when consumed more than points
if (this.blockDuration > 0 && res.consumedPoints <= (this.points + changedPoints)) {
res.msBeforeNext = this.msBlockDuration;
blockPromise = this._block(rlKey, res.consumedPoints, this.msBlockDuration, options);
}
if (this.inMemoryBlockOnConsumed > 0 && res.consumedPoints >= this.inMemoryBlockOnConsumed) {
// Block key for this.inMemoryBlockDuration seconds
this._inMemoryBlockedKeys.add(rlKey, this.inMemoryBlockDuration);
res.msBeforeNext = this.msInMemoryBlockDuration;
}
blockPromise
.then(() => {
reject(res);
})
.catch((err) => {
reject(err);
});
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
let delay = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2));
if (delay < this.execEvenlyMinDelayMs) {
delay = res.consumedPoints * this.execEvenlyMinDelayMs;
}
setTimeout(resolve, delay, res);
} else {
resolve(res);
}
}
_handleError(err, funcName, resolve, reject, key, data = false, options = {}) {
if (!(this.insuranceLimiter instanceof RateLimiterAbstract)) {
reject(err);
} else {
this.insuranceLimiter[funcName](key, data, options)
.then((res) => {
resolve(res);
})
.catch((res) => {
reject(res);
});
}
}
/**
* @deprecated Use camelCase version
* @returns {BlockedKeys}
* @private
*/
get _inmemoryBlockedKeys() {
return this._inMemoryBlockedKeys
}
/**
* @deprecated Use camelCase version
* @param rlKey
* @returns {number}
*/
getInmemoryBlockMsBeforeExpire(rlKey) {
return this.getInMemoryBlockMsBeforeExpire(rlKey)
}
/**
* @deprecated Use camelCase version
* @returns {number|number}
*/
get inmemoryBlockOnConsumed() {
return this.inMemoryBlockOnConsumed;
}
/**
* @deprecated Use camelCase version
* @param value
*/
set inmemoryBlockOnConsumed(value) {
this.inMemoryBlockOnConsumed = value;
}
/**
* @deprecated Use camelCase version
* @returns {number|number}
*/
get inmemoryBlockDuration() {
return this.inMemoryBlockDuration;
}
/**
* @deprecated Use camelCase version
* @param value
*/
set inmemoryBlockDuration(value) {
this.inMemoryBlockDuration = value
}
/**
* @deprecated Use camelCase version
* @returns {number}
*/
get msInmemoryBlockDuration() {
return this.inMemoryBlockDuration * 1000;
}
getInMemoryBlockMsBeforeExpire(rlKey) {
if (this.inMemoryBlockOnConsumed > 0) {
return this._inMemoryBlockedKeys.msBeforeExpire(rlKey);
}
return 0;
}
get inMemoryBlockOnConsumed() {
return this._inMemoryBlockOnConsumed;
}
set inMemoryBlockOnConsumed(value) {
this._inMemoryBlockOnConsumed = value ? parseInt(value) : 0;
if (this.inMemoryBlockOnConsumed > 0 && this.points > this.inMemoryBlockOnConsumed) {
throw new Error('inMemoryBlockOnConsumed option must be greater or equal "points" option');
}
}
get inMemoryBlockDuration() {
return this._inMemoryBlockDuration;
}
set inMemoryBlockDuration(value) {
this._inMemoryBlockDuration = value ? parseInt(value) : 0;
if (this.inMemoryBlockDuration > 0 && this.inMemoryBlockOnConsumed === 0) {
throw new Error('inMemoryBlockOnConsumed option must be set up');
}
}
get msInMemoryBlockDuration() {
return this._inMemoryBlockDuration * 1000;
}
get insuranceLimiter() {
return this._insuranceLimiter;
}
set insuranceLimiter(value) {
if (typeof value !== 'undefined' && !(value instanceof RateLimiterAbstract)) {
throw new Error('insuranceLimiter must be instance of RateLimiterAbstract');
}
this._insuranceLimiter = value;
if (this._insuranceLimiter) {
this._insuranceLimiter.blockDuration = this.blockDuration;
this._insuranceLimiter.execEvenly = this.execEvenly;
}
}
/**
* Block any key for secDuration seconds
*
* @param key
* @param secDuration
* @param {Object} options
*
* @return Promise<RateLimiterRes>
*/
block(key, secDuration, options = {}) {
const msDuration = secDuration * 1000;
return this._block(this.getKey(key), this.points + 1, msDuration, options);
}
/**
* Set points by key for any duration
*
* @param key
* @param points
* @param secDuration
* @param {Object} options
*
* @return Promise<RateLimiterRes>
*/
set(key, points, secDuration, options = {}) {
const msDuration = (secDuration >= 0 ? secDuration : this.duration) * 1000;
return this._block(this.getKey(key), points, msDuration, options);
}
/**
*
* @param key
* @param pointsToConsume
* @param {Object} options
* @returns Promise<RateLimiterRes>
*/
consume(key, pointsToConsume = 1, options = {}) {
return new Promise((resolve, reject) => {
const rlKey = this.getKey(key);
const inMemoryBlockMsBeforeExpire = this.getInMemoryBlockMsBeforeExpire(rlKey);
if (inMemoryBlockMsBeforeExpire > 0) {
return reject(new RateLimiterRes(0, inMemoryBlockMsBeforeExpire));
}
this._upsert(rlKey, pointsToConsume, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
this._afterConsume(resolve, reject, rlKey, pointsToConsume, res);
})
.catch((err) => {
this._handleError(err, 'consume', resolve, reject, key, pointsToConsume, options);
});
});
}
/**
*
* @param key
* @param points
* @param {Object} options
* @returns Promise<RateLimiterRes>
*/
penalty(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._upsert(rlKey, points, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
resolve(this._getRateLimiterRes(rlKey, points, res));
})
.catch((err) => {
this._handleError(err, 'penalty', resolve, reject, key, points, options);
});
});
}
/**
*
* @param key
* @param points
* @param {Object} options
* @returns Promise<RateLimiterRes>
*/
reward(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._upsert(rlKey, -points, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
resolve(this._getRateLimiterRes(rlKey, -points, res));
})
.catch((err) => {
this._handleError(err, 'reward', resolve, reject, key, points, options);
});
});
}
/**
*
* @param key
* @param {Object} options
* @returns Promise<RateLimiterRes>|null
*/
get(key, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._get(rlKey, options)
.then((res) => {
if (res === null || typeof res === 'undefined') {
resolve(null);
} else {
resolve(this._getRateLimiterRes(rlKey, 0, res));
}
})
.catch((err) => {
this._handleError(err, 'get', resolve, reject, key, options);
});
});
}
/**
*
* @param key
* @param {Object} options
* @returns Promise<boolean>
*/
delete(key, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._delete(rlKey, options)
.then((res) => {
this._inMemoryBlockedKeys.delete(rlKey);
resolve(res);
})
.catch((err) => {
this._handleError(err, 'delete', resolve, reject, key, options);
});
});
}
/**
* Cleanup keys no-matter expired or not.
*/
deleteInMemoryBlockedAll() {
this._inMemoryBlockedKeys.delete();
}
/**
* Get RateLimiterRes object filled depending on storeResult, which specific for exact store
*
* @param rlKey
* @param changedPoints
* @param storeResult
* @private
*/
_getRateLimiterRes(rlKey, changedPoints, storeResult) { // eslint-disable-line no-unused-vars
throw new Error("You have to implement the method '_getRateLimiterRes'!");
}
/**
* Block key for this.msBlockDuration milliseconds
* Usually, it just prolongs lifetime of key
*
* @param rlKey
* @param initPoints
* @param msDuration
* @param {Object} options
*
* @return Promise<any>
*/
_block(rlKey, initPoints, msDuration, options = {}) {
return new Promise((resolve, reject) => {
this._upsert(rlKey, initPoints, msDuration, true, options)
.then(() => {
resolve(new RateLimiterRes(0, msDuration > 0 ? msDuration : -1, initPoints));
})
.catch((err) => {
this._handleError(err, 'block', resolve, reject, this.parseKey(rlKey), msDuration / 1000, options);
});
});
}
/**
* Have to be implemented in every limiter
* Resolve with raw result from Store OR null if rlKey is not set
* or Reject with error
*
* @param rlKey
* @param {Object} options
* @private
*
* @return Promise<any>
*/
_get(rlKey, options = {}) { // eslint-disable-line no-unused-vars
throw new Error("You have to implement the method '_get'!");
}
/**
* Have to be implemented
* Resolve with true OR false if rlKey doesn't exist
* or Reject with error
*
* @param rlKey
* @param {Object} options
* @private
*
* @return Promise<any>
*/
_delete(rlKey, options = {}) { // eslint-disable-line no-unused-vars
throw new Error("You have to implement the method '_delete'!");
}
/**
* Have to be implemented
* Resolve with object used for {@link _getRateLimiterRes} to generate {@link RateLimiterRes}
*
* @param {string} rlKey
* @param {number} points
* @param {number} msDuration
* @param {boolean} forceExpire
* @param {Object} options
* @abstract
*
* @return Promise<Object>
*/
_upsert(rlKey, points, msDuration, forceExpire = false, options = {}) {
throw new Error("You have to implement the method '_upsert'!");
}
};

View File

@@ -0,0 +1,51 @@
const RateLimiterAbstract = require('./RateLimiterAbstract');
module.exports = class RateLimiterUnion {
constructor(...limiters) {
if (limiters.length < 1) {
throw new Error('RateLimiterUnion: at least one limiter have to be passed');
}
limiters.forEach((limiter) => {
if (!(limiter instanceof RateLimiterAbstract)) {
throw new Error('RateLimiterUnion: all limiters have to be instance of RateLimiterAbstract');
}
});
this._limiters = limiters;
}
consume(key, points = 1) {
return new Promise((resolve, reject) => {
const promises = [];
this._limiters.forEach((limiter) => {
promises.push(limiter.consume(key, points).catch(rej => ({ rejected: true, rej })));
});
Promise.all(promises)
.then((res) => {
const resObj = {};
let rejected = false;
res.forEach((item) => {
if (item.rejected === true) {
rejected = true;
}
});
for (let i = 0; i < res.length; i++) {
if (rejected && res[i].rejected === true) {
resObj[this._limiters[i].keyPrefix] = res[i].rej;
} else if (!rejected) {
resObj[this._limiters[i].keyPrefix] = res[i];
}
}
if (rejected) {
reject(resObj);
} else {
resolve(resObj);
}
});
});
}
};

View File

@@ -0,0 +1,75 @@
module.exports = class BlockedKeys {
constructor() {
this._keys = {}; // {'key': 1526279430331}
this._addedKeysAmount = 0;
}
collectExpired() {
const now = Date.now();
Object.keys(this._keys).forEach((key) => {
if (this._keys[key] <= now) {
delete this._keys[key];
}
});
this._addedKeysAmount = Object.keys(this._keys).length;
}
/**
* Add new blocked key
*
* @param key String
* @param sec Number
*/
add(key, sec) {
this.addMs(key, sec * 1000);
}
/**
* Add new blocked key for ms
*
* @param key String
* @param ms Number
*/
addMs(key, ms) {
this._keys[key] = Date.now() + ms;
this._addedKeysAmount++;
if (this._addedKeysAmount > 999) {
this.collectExpired();
}
}
/**
* 0 means not blocked
*
* @param key
* @returns {number}
*/
msBeforeExpire(key) {
const expire = this._keys[key];
if (expire && expire >= Date.now()) {
this.collectExpired();
const now = Date.now();
return expire >= now ? expire - now : 0;
}
return 0;
}
/**
* If key is not given, delete all data in memory
*
* @param {string|undefined} key
*/
delete(key) {
if (key) {
delete this._keys[key];
} else {
Object.keys(this._keys).forEach((key) => {
delete this._keys[key];
});
}
}
};

View File

@@ -0,0 +1,3 @@
const BlockedKeys = require('./BlockedKeys');
module.exports = BlockedKeys;

View File

@@ -0,0 +1,83 @@
const Record = require('./Record');
const RateLimiterRes = require('../../RateLimiterRes');
module.exports = class MemoryStorage {
constructor() {
/**
* @type {Object.<string, Record>}
* @private
*/
this._storage = {};
}
incrby(key, value, durationSec) {
if (this._storage[key]) {
const msBeforeExpires = this._storage[key].expiresAt
? this._storage[key].expiresAt.getTime() - new Date().getTime()
: -1;
if (msBeforeExpires !== 0) {
// Change value
this._storage[key].value = this._storage[key].value + value;
return new RateLimiterRes(0, msBeforeExpires, this._storage[key].value, false);
}
return this.set(key, value, durationSec);
}
return this.set(key, value, durationSec);
}
set(key, value, durationSec) {
const durationMs = durationSec * 1000;
if (this._storage[key] && this._storage[key].timeoutId) {
clearTimeout(this._storage[key].timeoutId);
}
this._storage[key] = new Record(
value,
durationMs > 0 ? new Date(Date.now() + durationMs) : null
);
if (durationMs > 0) {
this._storage[key].timeoutId = setTimeout(() => {
delete this._storage[key];
}, durationMs);
if (this._storage[key].timeoutId.unref) {
this._storage[key].timeoutId.unref();
}
}
return new RateLimiterRes(0, durationMs === 0 ? -1 : durationMs, this._storage[key].value, true);
}
/**
*
* @param key
* @returns {*}
*/
get(key) {
if (this._storage[key]) {
const msBeforeExpires = this._storage[key].expiresAt
? this._storage[key].expiresAt.getTime() - new Date().getTime()
: -1;
return new RateLimiterRes(0, msBeforeExpires, this._storage[key].value, false);
}
return null;
}
/**
*
* @param key
* @returns {boolean}
*/
delete(key) {
if (this._storage[key]) {
if (this._storage[key].timeoutId) {
clearTimeout(this._storage[key].timeoutId);
}
delete this._storage[key];
return true;
}
return false;
}
};

View File

@@ -0,0 +1,40 @@
module.exports = class Record {
/**
*
* @param value int
* @param expiresAt Date|int
* @param timeoutId
*/
constructor(value, expiresAt, timeoutId = null) {
this.value = value;
this.expiresAt = expiresAt;
this.timeoutId = timeoutId;
}
get value() {
return this._value;
}
set value(value) {
this._value = parseInt(value);
}
get expiresAt() {
return this._expiresAt;
}
set expiresAt(value) {
if (!(value instanceof Date) && Number.isInteger(value)) {
value = new Date(value);
}
this._expiresAt = value;
}
get timeoutId() {
return this._timeoutId;
}
set timeoutId(value) {
this._timeoutId = value;
}
};

View File

@@ -0,0 +1,3 @@
const MemoryStorage = require('./MemoryStorage');
module.exports = MemoryStorage;

View File

@@ -0,0 +1,13 @@
module.exports = class RateLimiterQueueError extends Error {
constructor(message, extra) {
super();
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor);
}
this.name = 'CustomError';
this.message = message;
if (extra) {
this.extra = extra;
}
}
};

View File

@@ -0,0 +1,9 @@
export class RateLimiterQueueError extends Error {
constructor(message?: string, extra?: string);
readonly name: string;
readonly message: string;
readonly extra: string;
}

View File

@@ -0,0 +1,16 @@
const LIMITER_TYPES = {
MEMORY: 'memory',
CLUSTER: 'cluster',
MEMCACHE: 'memcache',
MONGO: 'mongo',
REDIS: 'redis',
MYSQL: 'mysql',
POSTGRES: 'postgres',
};
const ERR_UNKNOWN_LIMITER_TYPE_MESSAGE = 'Unknown limiter type. Use one of LIMITER_TYPES constants.';
module.exports = {
LIMITER_TYPES,
ERR_UNKNOWN_LIMITER_TYPE_MESSAGE,
};

View File

@@ -0,0 +1,392 @@
export interface IRateLimiterRes {
msBeforeNext?: number;
remainingPoints?: number;
consumedPoints?: number;
isFirstInDuration?: boolean;
}
export class RateLimiterRes {
constructor(
remainingPoints?: number,
msBeforeNext?: number,
consumedPoints?: number,
isFirstInDuration?: boolean
);
readonly msBeforeNext: number;
readonly remainingPoints: number;
readonly consumedPoints: number;
readonly isFirstInDuration: boolean;
toString(): string;
toJSON(): {
remainingPoints: number;
msBeforeNext: number;
consumedPoints: number;
isFirstInDuration: boolean;
};
}
export class RateLimiterAbstract {
constructor(opts: IRateLimiterOptions);
/**
* Maximum number of points can be consumed over duration. Limiter compares this number with
* number of consumed points by key to decide if an operation should be rejected or resolved.
*/
points: number;
/**
* Number of seconds before consumed points are reset.
* Keys never expire, if duration is 0.
*/
duration: number;
/**
* duration in milliseconds
*/
get msDuration(): number;
/**
* If positive number and consumed more than points in current duration, block for blockDuration
* seconds.
*/
blockDuration: number;
/**
* blockDuration in milliseconds
*/
get msBlockDuration(): number;
/**
* Delay action to be executed evenly over duration First action in duration is executed without
* delay. All next allowed actions in current duration are delayed by formula
* msBeforeDurationEnd / (remainingPoints + 2) with minimum delay of duration * 1000 / points.
* It allows to cut off load peaks similar way to Leaky Bucket.
*
* Note: it isn't recommended to use it for long duration and few points, as it may delay action
* for too long with default execEvenlyMinDelayMs.
*/
execEvenly: boolean;
/**
* Sets minimum delay in milliseconds, when action is delayed with execEvenly
*/
execEvenlyMinDelayMs: number;
/**
* If you need to create several limiters for different purpose.
* Set to empty string '', if keys should be stored without prefix.
*/
keyPrefix: string;
/**
* Returns internal key prefixed with keyPrefix option as it is saved in store.
*/
getKey(key: string | number): string;
/**
* Returns internal key without the keyPrefix.
*/
parseKey(rlKey: string): string;
/**
* @param key is usually IP address or some unique client id
* @param pointsToConsume number of points consumed. default: 1
* @param options is object with additional settings:
* - customDuration expire in seconds for this operation only overwrites limiter's duration. It doesn't work, if key already created.
* @returns Returns Promise, which:
* - `resolved` with `RateLimiterRes` when point(s) is consumed, so action can be done
* - `rejected` only for store and database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
* - `rejected` when there is no points to be consumed, where reject reason `rejRes` is `RateLimiterRes` object
* - `rejected` when key is blocked (if block strategy is set up), where reject reason `rejRes` is `RateLimiterRes` object
*/
consume(
key: string | number,
pointsToConsume?: number,
options?: { [key: string]: any }
): Promise<RateLimiterRes>;
/**
* Fine key by points number of points for one duration.
*
* Note: Depending on time penalty may go to next durations
*
* @returns Returns Promise, which:
* - `resolved` with RateLimiterRes
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
penalty(
key: string | number,
points?: number,
options?: { [key: string]: any }
): Promise<RateLimiterRes>;
/**
* Reward key by points number of points for one duration.
* Note: Depending on time reward may go to next durations
* @returns Promise, which:
* - `resolved` with RateLimiterRes
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
reward(
key: string | number,
points?: number,
options?: { [key: string]: any }
): Promise<RateLimiterRes>;
/**
* Get RateLimiterRes in current duration. It always returns RateLimiterRes.isFirstInDuration=false.
* @param key is usually IP address or some unique client id
* @param options
* @returns Promise, which:
* - `resolved` with RateLimiterRes if key is set
* - `resolved` with null if key is NOT set or expired
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
get(
key: string | number,
options?: { [key: string]: any }
): Promise<RateLimiterRes | null>;
/**
* Set points to key for secDuration seconds.
* Store it forever, if secDuration is 0.
* @param key
* @param points
* @param secDuration
* @param options
* @returns Promise, which:
* - `resolved` with RateLimiterRes
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
set(
key: string | number,
points: number,
secDuration: number,
options?: { [key: string]: any }
): Promise<RateLimiterRes>;
/**
* Block key by setting consumed points to points + 1 for secDuration seconds.
*
* It force updates expire, if there is already key.
*
* Blocked key never expires, if secDuration is 0.
* @returns Promise, which:
* - `resolved` with RateLimiterRes
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
block(
key: string | number,
secDuration: number,
options?: { [key: string]: any }
): Promise<RateLimiterRes>;
/**
* Delete all data related to key.
*
* For example, previously blocked key is not blocked after delete as there is no data anymore.
* @returns Promise, which:
* - `resolved` with boolean, true if data is removed by key, false if there is no such key.
* - `rejected` only for database limiters if insuranceLimiter isn't setup: when some error happened, where reject reason `rejRes` is Error object
* - `rejected` only for RateLimiterCluster if insuranceLimiter isn't setup: when timeoutMs exceeded, where reject reason `rejRes` is Error object
*/
delete(
key: string | number,
options?: { [key: string]: any }
): Promise<boolean>;
}
export class RateLimiterStoreAbstract extends RateLimiterAbstract {
constructor(opts: IRateLimiterStoreOptions);
/**
* Cleanup keys blocked in current process memory
*/
deleteInMemoryBlockedAll(): void;
}
interface IRateLimiterOptions {
keyPrefix?: string;
points?: number;
duration?: number;
execEvenly?: boolean;
execEvenlyMinDelayMs?: number;
blockDuration?: number;
}
interface IRateLimiterClusterOptions extends IRateLimiterOptions {
timeoutMs?: number;
}
interface IRateLimiterStoreOptions extends IRateLimiterOptions {
storeClient: any;
storeType?: string;
inMemoryBlockOnConsumed?: number;
inMemoryBlockDuration?: number;
/**
* @deprecated Use camelCased inMemoryBlockOnConsumed option
*/
inmemoryBlockOnConsumed?: number;
/**
* @deprecated Use camelCased inMemoryBlockOnConsumed option
*/
inmemoryBlockDuration?: number;
insuranceLimiter?: RateLimiterAbstract;
dbName?: string;
tableName?: string;
tableCreated?: boolean;
}
interface IRateLimiterStoreNoAutoExpiryOptions extends IRateLimiterStoreOptions {
clearExpiredByTimeout?: boolean;
}
interface IRateLimiterMongoOptions extends IRateLimiterStoreOptions {
indexKeyPrefix?: {
[key: string]: any;
};
}
interface IRateLimiterRedisOptions extends IRateLimiterStoreOptions {
rejectIfRedisNotReady?: boolean;
}
interface ICallbackReady {
(error?: Error): void;
}
interface IRLWrapperBlackAndWhiteOptions {
limiter: RateLimiterAbstract;
blackList?: string[] | number[];
whiteList?: string[] | number[];
isBlackListed?(key: any): boolean;
isWhiteListed?(key: any): boolean;
runActionAnyway?: boolean;
}
export class RateLimiterMemory extends RateLimiterAbstract {
constructor(opts: IRateLimiterOptions);
}
export class RateLimiterCluster extends RateLimiterAbstract {
constructor(opts: IRateLimiterClusterOptions);
}
export class RateLimiterClusterMaster {
constructor();
}
export class RateLimiterClusterMasterPM2 {
constructor(pm2: any);
}
export class RateLimiterRedis extends RateLimiterStoreAbstract {
constructor(opts: IRateLimiterRedisOptions);
}
export interface IRateLimiterMongoFunctionOptions {
attrs: { [key: string]: any };
}
export class RateLimiterMongo extends RateLimiterStoreAbstract {
constructor(opts: IRateLimiterMongoOptions);
indexKeyPrefix(): Object;
indexKeyPrefix(obj?: Object): void;
consume(
key: string | number,
pointsToConsume?: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
penalty(
key: string | number,
points?: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
reward(
key: string | number,
points?: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
block(
key: string | number,
secDuration: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
get(
key: string | number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes | null>;
set(
key: string | number,
points: number,
secDuration: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
delete(
key: string | number,
options?: IRateLimiterMongoFunctionOptions
): Promise<boolean>;
}
export class RateLimiterMySQL extends RateLimiterStoreAbstract {
constructor(opts: IRateLimiterStoreNoAutoExpiryOptions, cb?: ICallbackReady);
}
export class RateLimiterPostgres extends RateLimiterStoreAbstract {
constructor(opts: IRateLimiterStoreNoAutoExpiryOptions, cb?: ICallbackReady);
}
export class RateLimiterMemcache extends RateLimiterStoreAbstract {}
export class RateLimiterUnion {
constructor(...limiters: RateLimiterAbstract[]);
consume(key: string | number, points?: number): Promise<RateLimiterRes[]>;
}
export class RLWrapperBlackAndWhite extends RateLimiterAbstract {
constructor(opts: IRLWrapperBlackAndWhiteOptions);
}
interface IRateLimiterQueueOpts {
maxQueueSize?: number;
}
export class RateLimiterQueue {
constructor(
limiterFlexible: RateLimiterAbstract | BurstyRateLimiter,
opts?: IRateLimiterQueueOpts
);
getTokensRemaining(key?: string | number): Promise<number>;
removeTokens(tokens: number, key?: string | number): Promise<number>;
}
export class BurstyRateLimiter {
constructor(
rateLimiter: RateLimiterAbstract,
burstLimiter: RateLimiterAbstract
);
consume(
key: string | number,
pointsToConsume?: number,
options?: IRateLimiterMongoFunctionOptions
): Promise<RateLimiterRes>;
}