Skip to content

Commit

Permalink
DX 593 - Auto executed pipeline (#1039)
Browse files Browse the repository at this point in the history
* feat: add auto executed pipelines

* test: improve test times by shrinking test subjects

* feat: add proxy over autopipeline function

* add enableAutoPipelining parameter to redis

* initalize auto pipeline with static method

* add docstrings for autoPipeline methods

* add pipelineCounter field to autoPipeline proxy

* add test for consecutive awaits with auto pipeline

* simplfy auto pipeline tests

* fix test descriptions

* rm pipelineCounter field from auto pipeline proxy

---------

Co-authored-by: CahidArda <[email protected]>
  • Loading branch information
ogzhanolguncu and CahidArda authored May 10, 2024
1 parent 682d8bc commit c9dc7e8
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 12 deletions.
228 changes: 228 additions & 0 deletions pkg/auto-pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import { Redis } from "../platforms/nodejs"
import { keygen, newHttpClient } from "./test-utils";

import { afterEach, describe, expect, test } from "bun:test";
import { ScriptLoadCommand } from "./commands/script_load";


const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterEach(cleanup);

describe("Auto pipeline", () => {
test("should execute all commands inside a Promise.all in a single pipeline", async () => {
const persistentKey = newKey();
const persistentKey2 = newKey();
const scriptHash = await new ScriptLoadCommand(["return 1"]).exec(client);

const redis = Redis.autoPipeline({
latencyLogging: false
})
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0)

// all the following commands are in a single pipeline call
const result = await Promise.all([
redis.append(newKey(), "hello"),
redis.bitcount(newKey(), 0, 1),
redis.bitop("and", newKey(), newKey()),
redis.bitpos(newKey(), 1, 0),
redis.dbsize(),
redis.decr(newKey()),
redis.decrby(newKey(), 1),
redis.del(newKey()),
redis.echo("hello"),
redis.eval("return ARGV[1]", [], ["Hello"]),
redis.evalsha(scriptHash, [], ["Hello"]),
redis.exists(newKey()),
redis.expire(newKey(), 5),
redis.expireat(newKey(), Math.floor(new Date().getTime() / 1000) + 60),
redis.flushall(),
redis.flushdb(),
redis.get(newKey()),
redis.getbit(newKey(), 0),
redis.getdel(newKey()),
redis.getset(newKey(), "hello"),
redis.hdel(newKey(), "field"),
redis.hexists(newKey(), "field"),
redis.hget(newKey(), "field"),
redis.hgetall(newKey()),
redis.hincrby(newKey(), "field", 1),
redis.hincrbyfloat(newKey(), "field", 1.5),
redis.hkeys(newKey()),
redis.hlen(newKey()),
redis.hmget(newKey(), newKey()),
redis.hmset(newKey(), { field: "field", value: "value" }),
redis.hscan(newKey(), 0),
redis.hset(newKey(), { field: "value" }),
redis.hsetnx(newKey(), "field", "value"),
redis.hstrlen(newKey(), "field"),
redis.hvals(newKey()),
redis.incr(newKey()),
redis.incrby(newKey(), 1),
redis.incrbyfloat(newKey(), 1.5),
redis.keys("*"),
redis.lindex(newKey(), 0),
redis.linsert(newKey(), "before", "pivot", "value"),
redis.llen(newKey()),
redis.lmove(newKey(), newKey(), "left", "right"),
redis.lpop(newKey()),
redis.lpos(newKey(), "value"),
redis.lpush(persistentKey, "element"),
redis.lpushx(newKey(), "element1", "element2"),
redis.lrange(newKey(), 0, 1),
redis.lrem(newKey(), 1, "value"),
redis.lset(persistentKey, 0, "value"),
redis.ltrim(newKey(), 0, 1),
redis.hrandfield(newKey()),
redis.hrandfield(newKey(), 2),
redis.hrandfield(newKey(), 3, true),
redis.mget<[string, string]>(newKey(), newKey()),
redis.mset({ key1: "value", key2: "value" }),
redis.msetnx({ key3: "value", key4: "value" }),
redis.persist(newKey()),
redis.pexpire(newKey(), 1000),
redis.pexpireat(newKey(), new Date().getTime() + 1000),
redis.ping(),
redis.psetex(newKey(), 1, "value"),
redis.pttl(newKey()),
redis.publish("test", "hello"),
redis.randomkey(),
redis.rename(persistentKey, persistentKey2),
redis.renamenx(persistentKey2, newKey()),
redis.rpop(newKey()),
redis.rpush(newKey(), "element1", "element2"),
redis.rpushx(newKey(), "element1", "element2"),
redis.sadd(newKey(), "memeber1", "member2"),
redis.scan(0),
redis.scard(newKey()),
redis.sdiff(newKey()),
redis.sdiffstore(newKey(), newKey()),
redis.set(newKey(), "value"),
redis.setbit(newKey(), 1, 1),
redis.setex(newKey(), 1, "value"),
redis.setnx(newKey(), "value"),
redis.setrange(newKey(), 1, "value"),
redis.sinter(newKey(), newKey()),
redis.sinterstore(newKey(), newKey()),
redis.sismember(newKey(), "member"),
redis.smembers(newKey()),
redis.smove(newKey(), newKey(), "member"),
redis.spop(newKey()),
redis.srandmember(newKey()),
redis.srem(newKey(), "member"),
redis.sscan(newKey(), 0),
redis.strlen(newKey()),
redis.sunion(newKey()),
redis.sunionstore(newKey(), newKey()),
redis.time(),
redis.touch(newKey()),
redis.ttl(newKey()),
redis.type(newKey()),
redis.unlink(newKey()),
redis.zadd(newKey(), { score: 0, member: "member" }),
redis.zcard(newKey()),
redis.scriptExists(scriptHash),
redis.scriptFlush({ async: true }),
redis.scriptLoad("return 1"),
redis.zcount(newKey(), 0, 1),
redis.zincrby(newKey(), 1, "member"),
redis.zinterstore(newKey(), 1, [newKey()]),
redis.zlexcount(newKey(), "-", "+"),
redis.zpopmax(newKey()),
redis.zpopmin(newKey()),
redis.zrange(newKey(), 0, 1),
redis.zrank(newKey(), "member"),
redis.zrem(newKey(), "member"),
redis.zremrangebylex(newKey(), "-", "+"),
redis.zremrangebyrank(newKey(), 0, 1),
redis.zremrangebyscore(newKey(), 0, 1),
redis.zrevrank(newKey(), "member"),
redis.zscan(newKey(), 0),
redis.zscore(newKey(), "member"),
redis.zunionstore(newKey(), 1, [newKey()]),
redis.zunion(1, [newKey()]),
redis.json.set(newKey(), "$", { hello: "world" })
])
expect(result).toBeTruthy();
expect(result.length).toBe(120); // returns
// @ts-expect-error pipelineCounter is not in type but accessible120 results
expect(redis.pipelineCounter).toBe(1);
});

test("should group async requests with sync requests", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
})
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);

// following five commands are added to the pipeline
redis.flushdb();
redis.incr("baz");
redis.incr("baz");
redis.set("foo", "bar");
redis.incr("baz");

// two get calls are added to the pipeline and pipeline
// is executed since we called await
const [fooValue, bazValue] = await Promise.all([
redis.get("foo"),
redis.get("baz")
]);

expect(fooValue).toBe("bar");
expect(bazValue).toBe(3);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
})

test("should execute a pipeline for each consecutive awaited command", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);

redis.flushdb();

const res1 = await redis.incr("baz");
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);

const res2 = await redis.incr("baz");
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(2);

const res3 = await redis.set("foo", "bar");
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(3);

expect([res1, res2, res3]).toEqual([1, 2, "OK"]);

});

test("should execute a single pipeline for several commands inside Promise.all", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);

const resArray = await Promise.all([
redis.flushdb(),
redis.incr("baz"),
redis.incr("baz"),
redis.set("foo", "bar"),
redis.get("foo")
]);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
expect(resArray).toEqual(["OK", 1, 2, "OK", "bar"]);

})
});
83 changes: 83 additions & 0 deletions pkg/auto-pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Command } from "./commands/command";
import { CommandArgs } from "./types";
import { Pipeline } from "./pipeline";
import { Redis } from "./redis";

// will omit redis only commands since we call Pipeline in the background in auto pipeline
type redisOnly = Exclude<keyof Redis, keyof Pipeline>

export function createAutoPipelineProxy(_redis: Redis) {

const redis = _redis as Redis & {
autoPipelineExecutor: AutoPipelineExecutor;
}

if (!redis.autoPipelineExecutor) {
redis.autoPipelineExecutor = new AutoPipelineExecutor(redis);
}

return new Proxy(redis, {
get: (target, prop: "pipelineCounter" | keyof Pipeline ) => {

// return pipelineCounter of autoPipelineExecutor
if (prop == "pipelineCounter") {
return target.autoPipelineExecutor.pipelineCounter;
}

// If the method is a function on the pipeline, wrap it with the executor logic
if (typeof target.autoPipelineExecutor.pipeline[prop] === "function") {
return (...args: CommandArgs<typeof Command>) => {
return target.autoPipelineExecutor.withAutoPipeline((pipeline) => {
(pipeline[prop] as Function)(...args);
});
};
}
return target.autoPipelineExecutor.pipeline[prop];
},
}) as Omit<Redis, redisOnly>;
}

export class AutoPipelineExecutor {
private pipelinePromises = new WeakMap<Pipeline, Promise<Array<unknown>>>();
private activePipeline: Pipeline | null = null;
private indexInCurrentPipeline = 0;
private redis: Redis;
pipeline: Pipeline; // only to make sure that proxy can work
pipelineCounter: number = 0; // to keep track of how many times a pipeline was executed

constructor(redis: Redis) {
this.redis = redis;
this.pipeline = redis.pipeline();
}

async withAutoPipeline<T>(executeWithPipeline: (pipeline: Pipeline) => unknown): Promise<T> {
const pipeline = this.activePipeline || this.redis.pipeline();

if (!this.activePipeline) {
this.activePipeline = pipeline;
this.indexInCurrentPipeline = 0;
}

const index = this.indexInCurrentPipeline++;
executeWithPipeline(pipeline);

const pipelineDone = this.deferExecution().then(() => {
if (!this.pipelinePromises.has(pipeline)) {
const pipelinePromise = pipeline.exec();
this.pipelineCounter += 1;

this.pipelinePromises.set(pipeline, pipelinePromise);
this.activePipeline = null;
}
return this.pipelinePromises.get(pipeline)!;
});

const results = await pipelineDone;
return results[index] as T;
}

private async deferExecution() {
await Promise.resolve();
return await Promise.resolve();
}
}
20 changes: 10 additions & 10 deletions pkg/commands/xtrim.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ afterAll(cleanup);

describe("XLEN", () => {
test(
"should approximately trim stream to 300 items",
"should approximately trim stream to 30 items",
async () => {
const key = newKey();
const promises = [];
for (let i = 1; i <= 10000; i++) {
for (let i = 1; i <= 1000; i++) {
promises.push(new XAddCommand([key, "*", { [randomID()]: randomID() }]).exec(client));
}
await Promise.all(promises);
await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 300, exactness: "~" }]).exec(
await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 30, exactness: "~" }]).exec(
client,
);
const len = await new XLenCommand([key]).exec(client);
expect(len).toBeGreaterThanOrEqual(290);
expect(len).toBeLessThanOrEqual(310);
expect(len).toBeGreaterThanOrEqual(29);
expect(len).toBeLessThanOrEqual(31);
},
{ timeout: 1000 * 60 },
);
Expand All @@ -45,20 +45,20 @@ describe("XLEN", () => {
});

test(
"should trim with MINID and a limit and only remove 10 items that satisfies MINID",
"should trim with MINID and a limit and only remove 2 items that satisfies MINID",
async () => {
const key = newKey();
const baseTimestamp = Date.now();
for (let i = 0; i < 100; i++) {
for (let i = 0; i < 20; i++) {
const id = `${baseTimestamp}-${i}`;
await new XAddCommand([key, id, { data: `value${i}` }]).exec(client);
}
const midRangeId = `${baseTimestamp}-50`;
await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 10 }]).exec(
const midRangeId = `${baseTimestamp}-10`;
await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 2 }]).exec(
client,
);
const len = await new XLenCommand([key]).exec(client);
expect(len).toBeLessThanOrEqual(100);
expect(len).toBeLessThanOrEqual(20);
},
{ timeout: 20000 },
);
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ describe("use all the things", () => {
.get(newKey())
.getbit(newKey(), 0)
.getdel(newKey())
.getrange(newKey(), 0, 1)
.getset(newKey(), "hello")
.hdel(newKey(), "field")
.hexists(newKey(), "field")
Expand Down Expand Up @@ -244,6 +243,6 @@ describe("use all the things", () => {
.json.set(newKey(), "$", { hello: "world" });

const res = await p.exec();
expect(res.length).toEqual(121);
expect(res.length).toEqual(120);
});
});
Loading

0 comments on commit c9dc7e8

Please sign in to comment.