Skip to content

Commit

Permalink
DX-1019: Read Your Writes Support (#1175)
Browse files Browse the repository at this point in the history
* add: readYourWrites option interface

* send local sync token on requests

* fmt

* add promise.all tests

* add: lua script test

* format tests

* fmt

* change upstashSyncToken convention

* add public redis client test

* add: fastly and cloudflare clients ryw support

* fmt

* add default test

* add: comments

* add: http comment

* sync token docs

* remove comment

* fix readYourWrites arg comment

* add: ryw operation comments

* revert requester

* revert requester interface
  • Loading branch information
fahreddinozcan authored Jul 30, 2024
1 parent 26a3a66 commit 90ae6b1
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pkg/commands/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ export class Command<TResult, TData> {
public async exec(client: Requester): Promise<TData> {
const { result, error } = await client.request<TResult>({
body: this.command,
upstashSyncToken: client.upstashSyncToken,
});

if (error) {
throw new UpstashError(error);
}
Expand Down
46 changes: 45 additions & 1 deletion pkg/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@ export type UpstashRequest = {
* Request body will be serialized to json
*/
body?: unknown;

upstashSyncToken?: string;
};
export type UpstashResponse<TResult> = { result?: TResult; error?: string };

export type Requester = {
export interface Requester {
/**
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
*/
readYourWrites?: boolean;

/**
* This token is used to ensure that the client is in sync with the server. On each request, we send this token in the header, and the server will return a new token.
*/
upstashSyncToken?: string;
request: <TResult = unknown>(req: UpstashRequest) => Promise<UpstashResponse<TResult>>;
};

Expand Down Expand Up @@ -95,11 +106,17 @@ export type HttpClientConfig = {
agent?: any;
signal?: AbortSignal;
keepAlive?: boolean;

/**
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
*/
readYourWrites?: boolean;
} & RequesterConfig;

export class HttpClient implements Requester {
public baseUrl: string;
public headers: Record<string, string>;

public readonly options: {
backend?: string;
agent: any;
Expand All @@ -108,6 +125,8 @@ export class HttpClient implements Requester {
cache?: CacheSetting;
keepAlive: boolean;
};
public readYourWrites: boolean;
public upstashSyncToken = "";

public readonly retry: {
attempts: number;
Expand All @@ -123,6 +142,8 @@ export class HttpClient implements Requester {
signal: config.signal,
keepAlive: config.keepAlive ?? true,
};
this.upstashSyncToken = "";
this.readYourWrites = config.readYourWrites ?? true;

this.baseUrl = config.baseUrl.replace(/\/$/, "");

Expand Down Expand Up @@ -185,6 +206,14 @@ export class HttpClient implements Requester {
backend: this.options.backend,
};

/**
* We've recieved a new `upstash-sync-token` in the previous response. We use it in the next request to observe the effects of previous requests.
*/
if (this.readYourWrites) {
const newHeader = this.upstashSyncToken;
this.headers["upstash-sync-token"] = newHeader;
}

let res: Response | null = null;
let error: Error | null = null;
for (let i = 0; i <= this.retry.attempts; i++) {
Expand Down Expand Up @@ -216,6 +245,20 @@ export class HttpClient implements Requester {
throw new UpstashError(`${body.error}, command was: ${JSON.stringify(req.body)}`);
}

if (this.readYourWrites) {
const headers = res.headers;
this.upstashSyncToken = headers.get("upstash-sync-token") ?? "";
}


/**
* We save the new `upstash-sync-token` in the response header to use it in the next request.
*/
if (this.readYourWrites) {
const headers = res.headers;
this.upstashSyncToken = headers.get("upstash-sync-token") ?? "";
}

if (this.options.responseEncoding === "base64") {
if (Array.isArray(body)) {
return body.map(({ result, error }) => ({
Expand All @@ -226,6 +269,7 @@ export class HttpClient implements Requester {
const result = decode(body.result) as any;
return { result, error: body.error };
}

return body as UpstashResponse<TResult>;
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
throw new Error("Pipeline is empty");
}
const path = this.multiExec ? ["multi-exec"] : ["pipeline"];

const res = (await this.client.request({
path,
body: Object.values(this.commands).map((c) => c.command),
Expand Down
115 changes: 115 additions & 0 deletions pkg/read-your-writes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { keygen, newHttpClient } from "./test-utils";

import { afterAll, describe, expect, test } from "bun:test";

import { Redis as PublicRedis } from "../platforms/nodejs";
import { SetCommand } from "./commands/set";
import { Redis } from "./redis";

const client = newHttpClient();
const { cleanup } = keygen();
afterAll(cleanup);
describe("Read Your Writes Feature", () => {
test("successfully retrieves Upstash-Sync-Token in the response header and updates local state", async () => {
const initialSync = client.upstashSyncToken;
await new SetCommand(["key", "value"]).exec(client);
const updatedSync = client.upstashSyncToken;
await new SetCommand(["key", "value"]).exec(client);

expect(updatedSync).not.toEqual(initialSync);
});

test("succesfully updates sync state with pipeline", async () => {
const initialSync = client.upstashSyncToken;

const { pipeline } = new Redis(client);
const p = pipeline();

p.set("key1", "value1");
p.set("key2", "value2");
p.set("key3", "value3");

await p.exec();

const updatedSync = client.upstashSyncToken;

expect(initialSync).not.toEqual(updatedSync);
});

test("updates after each element of promise.all", async () => {
let currentSync = client.upstashSyncToken;

const promises = Array.from({ length: 3 }, (_, i) =>
new SetCommand([`key${i}`, `value${i}`]).exec(client).then(() => {
expect(client.upstashSyncToken).not.toEqual(currentSync);
currentSync = client.upstashSyncToken;
}),
);

await Promise.all(promises);
});

test("updates after successful lua script call", async () => {
const s = `redis.call('SET', 'mykey', 'myvalue')
return 1
`;

const initialSync = client.upstashSyncToken;

const redis = new Redis(client);
const script = redis.createScript(s);

await script.exec([], []);

const updatedSync = client.upstashSyncToken;

expect(updatedSync).not.toEqual(initialSync);
});

test("should not update the sync state in case of Redis client with manuel HTTP client and opt-out ryw", async () => {
const optOutClient = newHttpClient();
const redis = new Redis(optOutClient, { readYourWrites: false });

const initialSync = optOutClient.upstashSyncToken;

await redis.set("key", "value");

const updatedSync = optOutClient.upstashSyncToken;

expect(updatedSync).toEqual(initialSync);
});

test("should not update the sync state when public Redis interface is provided with opt-out", async () => {
const redis = new PublicRedis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
readYourWrites: false,
});

// @ts-expect-error - We need the sync token for this test, which resides on the client
const initialSync = redis.client.upstashSyncToken;

await redis.set("key", "value");

// @ts-expect-error - We need the sync token for this test, which resides on the client
const updatedSync = redis.client.upstashSyncToken;

expect(updatedSync).toEqual(initialSync);
});

test("should update the sync state when public Redis interface is provided with default behaviour", async () => {
const redis = new PublicRedis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
});

// @ts-expect-error - We need the sync token for this test, which resides on the client
const initialSync = redis.client.upstashSyncToken;

await redis.set("key", "value");

// @ts-expect-error - We need the sync token for this test, which resides on the client
const updatedSync = redis.client.upstashSyncToken;
expect(updatedSync).not.toEqual(initialSync);
});
});
4 changes: 4 additions & 0 deletions pkg/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ export class Redis {
this.client = client;
this.opts = opts;
this.enableTelemetry = opts?.enableTelemetry ?? true;

if (opts?.readYourWrites === false) {
this.client.readYourWrites = false;
}
this.enableAutoPipelining = opts?.enableAutoPipelining ?? true;
}

Expand Down
1 change: 1 addition & 0 deletions pkg/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ export type RedisOptions = {
latencyLogging?: boolean;
enableTelemetry?: boolean;
enableAutoPipelining?: boolean;
readYourWrites?: boolean;
};
14 changes: 8 additions & 6 deletions platforms/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ export type RedisConfigCloudflare = {
*/
signal?: AbortSignal;
keepAlive?: boolean;

/**
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
*/
readYourWrites?: boolean;
} & core.RedisOptions &
RequesterConfig &
Env;
Expand All @@ -51,15 +56,11 @@ export class Redis extends core.Redis {
*/
constructor(config: RedisConfigCloudflare, env?: Env) {
if (!config.url) {
throw new Error(
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
}

if (!config.token) {
throw new Error(
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
}

if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) {
Expand All @@ -76,6 +77,7 @@ export class Redis extends core.Redis {
responseEncoding: config.responseEncoding,
signal: config.signal,
keepAlive: config.keepAlive,
readYourWrites: config.readYourWrites,
});

super(client, {
Expand Down
14 changes: 8 additions & 6 deletions platforms/fastly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ export type RedisConfigFastly = {
*/
backend: string;
keepAlive?: boolean;

/**
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
*/
readYourWrites?: boolean;
} & core.RedisOptions &
RequesterConfig;

Expand All @@ -48,15 +53,11 @@ export class Redis extends core.Redis {
*/
constructor(config: RedisConfigFastly) {
if (!config.url) {
throw new Error(
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
}

if (!config.token) {
throw new Error(
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
}

if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) {
Expand All @@ -73,6 +74,7 @@ export class Redis extends core.Redis {
options: { backend: config.backend },
responseEncoding: config.responseEncoding,
keepAlive: config.keepAlive,
readYourWrites: config.readYourWrites,
});

super(client, {
Expand Down
14 changes: 8 additions & 6 deletions platforms/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ export type RedisConfigNodejs = {
latencyLogging?: boolean;
agent?: unknown;
keepAlive?: boolean;

/**
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
*/
readYourWrites?: boolean;
} & core.RedisOptions &
RequesterConfig;

Expand Down Expand Up @@ -97,15 +102,11 @@ export class Redis extends core.Redis {
}

if (!configOrRequester.url) {
throw new Error(
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
}

if (!configOrRequester.token) {
throw new Error(
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
);
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
}

if (
Expand Down Expand Up @@ -133,6 +134,7 @@ export class Redis extends core.Redis {
cache: configOrRequester.cache ?? "no-store",
signal: configOrRequester.signal,
keepAlive: configOrRequester.keepAlive,
readYourWrites: configOrRequester.readYourWrites,
});

super(client, {
Expand Down

0 comments on commit 90ae6b1

Please sign in to comment.