From fc7ce9993e812b2a1dc2434edc9a2c1584224754 Mon Sep 17 00:00:00 2001 From: Craig Day Date: Sun, 13 Aug 2023 01:19:03 -0700 Subject: [PATCH] client-side caching with a new client impl and connection wrapper simplify cache impls; some javadocs and tests more javadocs; invalidation handler on client basic support for max age in default impls initial PR feedback; naming consistencies checkpoint on wrapped connection push tracking status down to actual conn impl simplify cache impls to just LRU backed by LinkedHashMap cleanup client wrapper; test simple operation docs update --- src/main/asciidoc/index.adoc | 28 ++- .../client/CachingRedisOptionsConverter.java | 89 +++++++ src/main/java/examples/RedisExamples.java | 26 +- .../io/vertx/redis/client/CachingRedis.java | 123 +++++++++ .../redis/client/CachingRedisOptions.java | 205 +++++++++++++++ .../redis/client/ClientSideCacheMode.java | 26 ++ .../java/io/vertx/redis/client/Redis.java | 2 - .../vertx/redis/client/RedisClientCache.java | 69 ++++++ .../redis/client/impl/CachingRedisClient.java | 234 ++++++++++++++++++ .../client/impl/CachingRedisConnection.java | 170 +++++++++++++ .../client/impl/RedisConnectionInternal.java | 6 + .../client/impl/RedisSentinelConnection.java | 4 + .../impl/RedisStandaloneConnection.java | 10 + .../vertx/redis/client/impl/RequestImpl.java | 9 + .../redis/client/impl/cache/CacheKey.java | 62 +++++ .../impl/cache/LRURedisClientCache.java | 101 ++++++++ .../redis/client/test/CachingRedisTest.java | 53 ++++ .../test/redis/CachingRedisClientTest.java | 125 ++++++++++ 18 files changed, 1338 insertions(+), 4 deletions(-) create mode 100644 src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java create mode 100644 src/main/java/io/vertx/redis/client/CachingRedis.java create mode 100644 src/main/java/io/vertx/redis/client/CachingRedisOptions.java create mode 100644 src/main/java/io/vertx/redis/client/ClientSideCacheMode.java create mode 100644 src/main/java/io/vertx/redis/client/RedisClientCache.java create mode 100644 src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java create mode 100644 src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java create mode 100644 src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java create mode 100644 src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java create mode 100644 src/test/java/io/vertx/redis/client/test/CachingRedisTest.java create mode 100644 src/test/java/io/vertx/test/redis/CachingRedisClientTest.java diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index 8545dcdf..8edeafd6 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -24,6 +24,7 @@ Redis has a rich API and it can be organized in the following groups: * Strings - Commands to work with Strings. * Transactions - Commands to handle transaction lifecycle. * Streams - Commands to handle streaming. +* Client-side caching - Commands to control client-side caching. == Using Vert.x-Redis @@ -182,9 +183,34 @@ And from another place in the code publish messages to the queue: ---- NOTE: It is important to remember that the commands `SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE` and `PUNSUBSCRIBE` are `void`. -This means that the result in case of success is `null` not a instance of response. +This means that the result in case of success is `null` not an instance of response. All messages are then routed through the handler on the client. +== Client-side Caching + +Redis supports client-side caching implementations using a strategy called _Tracking_. + +All modes of the client support caching except connections that are in pub/sub mode. + +To create a client with client-side caching, one would do: + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching1} +---- + +The default implementation will use a simple Least-Recently-Used (LFU) cache backed by a `LinkedHashMap`. +You can also provide your own implementation of the cache: + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching2} +---- + +NOTE: The cache is not a write-through cache. A value will not be stored in the client-side cache until the value is fetched from Redis for the first time. +To avoid write-then-read race conditions within the same batch, read commands that are part of a batch will not check the cache first. +Additionally, the current implementation does not support the `OPTIN` or `NOLOOP` options. + == Tracing commands The Redis client can trace command execution when Vert.x has tracing enabled. diff --git a/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java b/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java new file mode 100644 index 00000000..55c4d84f --- /dev/null +++ b/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java @@ -0,0 +1,89 @@ +package io.vertx.redis.client; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.impl.JsonUtil; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Base64; + +/** + * Converter and mapper for {@link io.vertx.redis.client.CachingRedisOptions}. + * NOTE: This class has been automatically generated from the {@link io.vertx.redis.client.CachingRedisOptions} original class using Vert.x codegen. + */ +public class CachingRedisOptionsConverter { + + + private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER; + private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER; + + public static void fromJson(Iterable> json, CachingRedisOptions obj) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "maxAge": + if (member.getValue() instanceof Number) { + obj.setMaxAge(((Number)member.getValue()).longValue()); + } + break; + case "maxAgeUnit": + if (member.getValue() instanceof String) { + obj.setMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; + case "maxCacheSize": + if (member.getValue() instanceof Number) { + obj.setMaxCacheSize(((Number)member.getValue()).intValue()); + } + break; + case "mode": + if (member.getValue() instanceof String) { + obj.setMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue())); + } + break; + case "prefix": + if (member.getValue() instanceof String) { + obj.setPrefix((String)member.getValue()); + } + break; + case "prefixes": + if (member.getValue() instanceof JsonArray) { + java.util.ArrayList list = new java.util.ArrayList<>(); + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + list.add((String)item); + }); + obj.setPrefixes(list); + } + break; + case "prefixs": + if (member.getValue() instanceof JsonArray) { + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + obj.addPrefix((String)item); + }); + } + break; + } + } + } + + public static void toJson(CachingRedisOptions obj, JsonObject json) { + toJson(obj, json.getMap()); + } + + public static void toJson(CachingRedisOptions obj, java.util.Map json) { + json.put("maxAge", obj.getMaxAge()); + if (obj.getMaxAgeUnit() != null) { + json.put("maxAgeUnit", obj.getMaxAgeUnit().name()); + } + json.put("maxCacheSize", obj.getMaxCacheSize()); + if (obj.getMode() != null) { + json.put("mode", obj.getMode().name()); + } + if (obj.getPrefixes() != null) { + JsonArray array = new JsonArray(); + obj.getPrefixes().forEach(item -> array.add(item)); + json.put("prefixes", array); + } + } +} diff --git a/src/main/java/examples/RedisExamples.java b/src/main/java/examples/RedisExamples.java index b9e4e289..2a1e19b9 100644 --- a/src/main/java/examples/RedisExamples.java +++ b/src/main/java/examples/RedisExamples.java @@ -143,7 +143,7 @@ private Future createRedisClient() { // make sure to invalidate old connection if present if (redis != null) { - redis.close();; + redis.close(); } if (CONNECTING.compareAndSet(false, true)) { @@ -246,4 +246,28 @@ public void example13(Vertx vertx) { public void tracing1(RedisOptions options) { options.setTracingPolicy(TracingPolicy.ALWAYS); } + + public void clientCaching1(Vertx vertx) { + CachingRedis + .create(vertx) + .connect() + .onSuccess(conn -> { + // get the value for a key, returning from a local in-memory cache if + // it exists, or fetching from Redis if not. if the value is fetched from + // Redis, it will be stored in the local cache + conn.send(Request.cmd(Command.GET).arg("key")); + }); + } + + public void clientCaching2(Vertx vertx, RedisClientCache customCache) { + CachingRedis + .create(vertx, customCache) + .connect() + .onSuccess(conn -> { + // get the value for a key, returning from the custom cache if + // it exists, or fetching from Redis if not. if the value is fetched from + // Redis, it will be stored in the local cache + conn.send(Request.cmd(Command.GET).arg("key")); + }); + } } diff --git a/src/main/java/io/vertx/redis/client/CachingRedis.java b/src/main/java/io/vertx/redis/client/CachingRedis.java new file mode 100644 index 00000000..28e1f444 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/CachingRedis.java @@ -0,0 +1,123 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.redis.client.impl.CachingRedisClient; +import io.vertx.redis.client.impl.cache.CacheKey; + +import java.util.Collection; + +/** + * A {@link Redis} client wrapper that implements client-side caching. + * + * @see Client-side caching in Redis + */ +public interface CachingRedis extends Redis { + + /** + * Create a new caching client using default client and cache options. + * + * @param vertx the vertx instance + * @return the caching client + */ + static CachingRedis create(Vertx vertx) { + return create(vertx, Redis.createClient(vertx)); + } + + /** + * Create a new caching client wrapping an existing redis client with default caching options. + * + * @param vertx the vertx instance + * @param redis the redis client to wrap + * @return the caching client + */ + static CachingRedis create(Vertx vertx, Redis redis) { + return create(vertx, redis, RedisClientCache.lru(new CachingRedisOptions())); + } + + /** + * Create a new caching client using default client and cache options, backed by a given cache. + * + * @param vertx the vertx instance + * @param cache the backing cache + * @return the caching client + */ + static CachingRedis create(Vertx vertx, RedisClientCache cache) { + return create(vertx, Redis.createClient(vertx), cache); + } + + /** + * Create a new caching client wrapping an existing redis client and backed by a given cache. + * + * @param vertx the vertx instance + * @param redis the redis client to wrap + * @param cache the backing cache + * @return the caching client + */ + static CachingRedis create(Vertx vertx, Redis redis, RedisClientCache cache) { + return create(vertx, redis, cache, new CachingRedisOptions()); + } + + /** + * Create a new caching client wrapping an existing redis client and using the given cache options. + * + * @param vertx the vertx instance + * @param redis the redis client to wrap + * @param options the cache options + * @return the caching client + */ + static CachingRedis create(Vertx vertx, Redis redis, CachingRedisOptions options) { + return create(vertx, redis, RedisClientCache.lru(options), options); + } + + /** + * Create a new caching client wrapping an existing redis client, using the given cache and cache options. + * + * @param vertx the vertx instance + * @param redis the redis client to wrap + * @param cache the backing cache + * @param options the cache options + * @return the caching client + */ + static CachingRedis create(Vertx vertx, Redis redis, RedisClientCache cache, CachingRedisOptions options) { + return new CachingRedisClient(vertx, redis, cache, options); + } + + /** + * Flush the local cache. + * + *

+ * This operation only clears the local cache and has no interaction with the server. + * + * @return a future indicating the status of the operation + */ + Future flush(); + + /** + * Set a handler to be called when invalidation is performed. + * + *

+ * The client will clear the keys before this handler is invoked. It is not recommended to modify + * the cache as a part of this handler. The primary function is for instrumentation. + * + * @param handler a handler that accepts the keys which were invalidated + * @return fluent self + */ + CachingRedis invalidationHandler(Handler> handler); +} diff --git a/src/main/java/io/vertx/redis/client/CachingRedisOptions.java b/src/main/java/io/vertx/redis/client/CachingRedisOptions.java new file mode 100644 index 00000000..d0630ac9 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/CachingRedisOptions.java @@ -0,0 +1,205 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.json.JsonObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@DataObject(generateConverter = true) +public class CachingRedisOptions { + + /** + * The default number of max entries the cache will store. + */ + public static final int DEFAULT_MAX_SIZE = 1024; + + /** + * The default max age, in {@link #DEFAULT_MAX_AGE_UNIT} units, of an item in the cache. + */ + public static final long DEFAULT_MAX_AGE = 3_600_000; // 1 hour in milliseconds + public static final TimeUnit DEFAULT_MAX_AGE_UNIT = TimeUnit.MILLISECONDS; + + private int maxCacheSize; + private long maxAge; + private TimeUnit maxAgeUnit; + private ClientSideCacheMode mode; + private List prefixes; + + /** + * Creates a default configuration. + */ + public CachingRedisOptions() { + this.maxCacheSize = DEFAULT_MAX_SIZE; + this.maxAge = DEFAULT_MAX_AGE; + this.maxAgeUnit = DEFAULT_MAX_AGE_UNIT; + this.mode = ClientSideCacheMode.PER_CLIENT; + this.prefixes = new ArrayList<>(); + } + + /** + * Copy constructor. + * + * @param other the options to clone + */ + public CachingRedisOptions(CachingRedisOptions other) { + this.maxCacheSize = other.maxCacheSize; + this.maxAge = other.maxAge; + this.maxAgeUnit = other.maxAgeUnit; + this.mode = other.mode; + this.prefixes = other.prefixes; + } + + /** + * Create from JSON constructor. + * + * @param json the source JSON + */ + public CachingRedisOptions(JsonObject json) { + this(); + CachingRedisOptionsConverter.fromJson(json, this); + } + + /** + * Get the max cache size. + * + * @return the max size + */ + public int getMaxCacheSize() { + return maxCacheSize; + } + + /** + * Set the max cache size. + * + * @param maxCacheSize the max size + * @return fluent self + */ + public CachingRedisOptions setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + return this; + } + + /** + * Get the max item age. + * + * @return the max age value + */ + public long getMaxAge() { + return maxAge; + } + + /** + * Get the max item age time unit. + * + * @return the time unit + */ + public TimeUnit getMaxAgeUnit() { + return maxAgeUnit; + } + + /** + * Set the max age value. + * + *

+ * By default, the max age is in milliseconds. If you want to change the unit, use {@link #setMaxAgeUnit(TimeUnit)}. + * + * @param maxAge the max age + * @return fluent self + */ + public CachingRedisOptions setMaxAge(long maxAge) { + this.maxAge = maxAge; + return this; + } + + /** + * Set the max age time unit. + * + * @param unit the time unit + * @return fluent self + */ + public CachingRedisOptions setMaxAgeUnit(TimeUnit unit) { + this.maxAgeUnit = unit; + return this; + } + + /** + * Get the tracking mode the clients configure the data connection with. + * + * @return the mode + */ + public ClientSideCacheMode getMode() { + return mode; + } + + /** + * Set the tracking mode the clients configure the data connection with. + * + * @param mode the mode + * @return fluent self + */ + public CachingRedisOptions setMode(ClientSideCacheMode mode) { + this.mode = mode; + return this; + } + + /** + * Get the key prefixes the server should send invalidation messages for. + * + * @return the prefixes + */ + public List getPrefixes() { + return prefixes; + } + + /** + * Set the key prefixes the server should send invalidation messages for, replacing any existing values. + * + * @param prefixes the prefix list + * @return fluent self + */ + public CachingRedisOptions setPrefixes(List prefixes) { + this.prefixes.clear(); + this.prefixes.addAll(prefixes); + return this; + } + + /** + * Set a single prefix the server should send invalidation messages for, replacing any existing values. + * + * @param prefix the prefix + * @return fluent self + */ + public CachingRedisOptions setPrefix(String prefix) { + prefixes.clear(); + prefixes.add(prefix); + return this; + } + + /** + * Add a prefix teh server should send invalidation messages for. + * + * @param prefix the prefix to add + * @return fluent self + */ + public CachingRedisOptions addPrefix(String prefix) { + prefixes.add(prefix); + return this; + } +} diff --git a/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java b/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java new file mode 100644 index 00000000..c4cc1229 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +/** + * The mode to caching clients will configure invalidations connections in. + * + * @see Tracking modes + */ +public enum ClientSideCacheMode { + PER_CLIENT, + BROADCAST +} diff --git a/src/main/java/io/vertx/redis/client/Redis.java b/src/main/java/io/vertx/redis/client/Redis.java index c5b99e11..c29a2786 100644 --- a/src/main/java/io/vertx/redis/client/Redis.java +++ b/src/main/java/io/vertx/redis/client/Redis.java @@ -25,8 +25,6 @@ import io.vertx.redis.client.impl.RedisSentinelClient; import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; /** * A simple Redis client. diff --git a/src/main/java/io/vertx/redis/client/RedisClientCache.java b/src/main/java/io/vertx/redis/client/RedisClientCache.java new file mode 100644 index 00000000..8f8c3cb3 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/RedisClientCache.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.redis.client.impl.cache.CacheKey; +import io.vertx.redis.client.impl.cache.LRURedisClientCache; + +/** + * A client-side cache store used by a {@link CachingRedis}. + */ +public interface RedisClientCache { + + /** + * Create a simple least-recently-used (LRU) cache store with a max size and max item age. + * + * @param options the cache options + * @return the new cache store + */ + static RedisClientCache lru(CachingRedisOptions options) { + return new LRURedisClientCache(options); + } + + /** + * Get an item if it exists. + * + * @param key the key to get + * @return the item, or null if it does not exist + */ + @Nullable Response get(CacheKey key); + + /** + * Put an item, replacing anything that may exist. + * + * @param key the key + * @param value the value + */ + void put(CacheKey key, Response value); + + /** + * Remove an item if it exists. + * + * @param key the key to remove + */ + void delete(CacheKey key); + + /** + * Invalidate or remove all items, emptying the cache. + */ + void flush(); + + /** + * Close the cache and cleanup any resources. + */ + void close(); +} diff --git a/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java b/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java new file mode 100644 index 00000000..dcdc9518 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java @@ -0,0 +1,234 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.redis.client.CachingRedis; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.RedisClientCache; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; +import io.vertx.redis.client.impl.cache.CacheKey; +import io.vertx.redis.client.impl.types.MultiType; +import io.vertx.redis.client.impl.types.PushType; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CachingRedisClient implements CachingRedis { + + private static final Logger LOG = LoggerFactory.getLogger(CachingRedis.class); + private static final String INVALIDATIONS_CHANNEL = "__redis__:invalidate"; + private static final Request CLIENT_ID_COMMAND = Request.cmd(Command.CLIENT).arg("ID"); + private static final Request SUBSCRIBE_COMMAND = Request.cmd(Command.SUBSCRIBE).arg(INVALIDATIONS_CHANNEL); + private static final int MAX_RECONNECT_ATTEMPTS = 5; + + private final VertxInternal vertx; + private final Redis redis; + private final RedisClientCache cache; + private final CachingRedisOptions options; + private final AtomicBoolean closed = new AtomicBoolean(); + private Promise clientIdPromise; + private Handler> invalidationHandler; + + public CachingRedisClient(Vertx vertx, Redis redis, RedisClientCache cache, CachingRedisOptions options) { + if (redis instanceof CachingRedis) { + throw new IllegalArgumentException("CachingRedisClient can not be self referential"); + } + + this.vertx = (VertxInternal) vertx; + this.redis = redis; + this.cache = cache; + this.options = options; + } + + @Override + public Future connect() { + if (closed.get()) { + return vertx.getContext().failedFuture("Client is closed"); + } + + final Promise promise = vertx.promise(); + + getClientId().onComplete(id -> { + if (id.failed()) { + promise.fail(id.cause()); + return; + } + + final int clientId = id.result(); + + redis.connect().onComplete(connect -> { + if (connect.failed()) { + promise.fail(connect.cause()); + return; + } + + final RedisConnection connection = connect.result(); + + promise.complete(new CachingRedisConnection(vertx, cache, connection, clientId, options)); + }); + }); + + return promise.future(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + redis.close(); + cache.close(); + } + } + + @Override + public Future<@Nullable Response> send(Request request) { + if (closed.get()) { + return vertx.getContext().failedFuture("Client is closed"); + } + + return connect().compose(conn -> + conn.send(request) + .eventually(v -> + conn.close().onFailure(LOG::warn))); + } + + @Override + public Future> batch(List commands) { + return redis.batch(commands); + } + + @Override + public Future flush() { + cache.flush(); + return Future.succeededFuture(); + } + + @Override + public CachingRedis invalidationHandler(Handler> handler) { + this.invalidationHandler = handler; + return this; + } + + Future getClientId() { + synchronized (this) { + if (clientIdPromise != null) { + return clientIdPromise.future(); + } else { + clientIdPromise = vertx.promise(); + } + } + + createInvalidationsConnection(0).onComplete(clientIdPromise); + + return clientIdPromise.future(); + } + + private Future createInvalidationsConnection(int attempt) { + if (closed.get()) { + // we're shutting down, the endHandler will have sent us here so don't try to reconnect + return Future.failedFuture("Client is closed"); + } + if (attempt > MAX_RECONNECT_ATTEMPTS) { + return Future.failedFuture("max reconnect attempts exceeded"); + } + + return redis + .connect() + .compose(this::initInvalidationsConnection) + .recover(e -> createInvalidationsConnection(attempt + 1)); + } + + private Future initInvalidationsConnection(RedisConnection connection) { + connection.exceptionHandler(e -> { + LOG.warn(e); + // flush the cache to ensure no stale data is served while reconnecting + cache.flush(); + connection.close(); + reconnectInvalidationsConnection(); + }); + + connection.endHandler(v -> { + // flush the cache to ensure no stale data is served while reconnecting + cache.flush(); + reconnectInvalidationsConnection(); + }); + + return connection + .send(CLIENT_ID_COMMAND) + .map(Response::toInteger) + .compose(clientId -> connection + .send(SUBSCRIBE_COMMAND) + .onSuccess(r -> connection.handler(this::handleInvalidationMessage)) + .map(clientId)); + } + + private void reconnectInvalidationsConnection() { + if (closed.get()) { + return; + } + synchronized (this) { + clientIdPromise = vertx.promise(); + } + + createInvalidationsConnection(0).onComplete(clientIdPromise); + } + + private void handleInvalidationMessage(Response message) { + // only process messages like `invalidate: [key1,key2,key3]` or `invalidate: key` + if (message instanceof PushType) { + final PushType pushMessage = (PushType) message; + final String pushCommand = pushMessage.get(0).toString(); + + if ("INVALIDATE".equalsIgnoreCase(pushCommand)) { + final Response invalidationMessage = pushMessage.get(1); + final Set invalidatedKeys = new HashSet<>(); + + if (invalidationMessage instanceof MultiType) { + final Iterator manyKeys = invalidationMessage.iterator(); + + do { + final CacheKey key = new CacheKey(manyKeys.next().toString()); + cache.delete(key); + invalidatedKeys.add(key); + } while (manyKeys.hasNext()); + } else { + final CacheKey key = new CacheKey(invalidationMessage.toString()); + cache.delete(key); + invalidatedKeys.add(key); + } + + if (invalidationHandler != null) { + invalidationHandler.handle(invalidatedKeys); + } + } + } + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java b/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java new file mode 100644 index 00000000..0ce98d78 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java @@ -0,0 +1,170 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.impl.VertxInternal; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.RedisClientCache; +import io.vertx.redis.client.ClientSideCacheMode; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; + +import java.util.List; + +public class CachingRedisConnection implements RedisConnection { + + private final VertxInternal vertx; + private final RedisClientCache cache; + private final RedisConnection delegate; + private final RedisStandaloneConnection actual; + private final Request trackingCommand; + + public CachingRedisConnection(VertxInternal vertx, RedisClientCache cache, RedisConnection delegate, int clientId, CachingRedisOptions options) { + this.vertx = vertx; + this.cache = cache; + this.delegate = delegate; + this.trackingCommand = trackingCommand(clientId, options); + + if (delegate instanceof PooledRedisConnection) { + this.actual = (RedisStandaloneConnection) ((PooledRedisConnection) delegate).actual(); + } else if (delegate instanceof RedisSentinelConnection) { + this.actual = (RedisStandaloneConnection) ((RedisSentinelConnection) delegate).actual(); + } else { + this.actual = (RedisStandaloneConnection) delegate; + } + } + + @Override + public RedisConnection exceptionHandler(Handler handler) { + return delegate.exceptionHandler(handler); + } + + @Override + public RedisConnection handler(@Nullable Handler handler) { + return delegate.handler(handler); + } + + @Override + public RedisConnection pause() { + return delegate.pause(); + } + + @Override + public RedisConnection resume() { + return delegate.resume(); + } + + @Override + public RedisConnection fetch(long amount) { + return delegate.fetch(amount); + } + + @Override + public RedisConnection endHandler(@Nullable Handler endHandler) { + return delegate.endHandler(endHandler); + } + + @Override + public Future<@Nullable Response> send(Request request) { + final RequestImpl req = (RequestImpl) request; + final Promise promise = vertx.promise(); + + // Result could possibly be cached, check cache first + if (req.isCacheable()) { + final Response cachedValue = cache.get(req.cacheKey()); + if (cachedValue != null) { + promise.complete(cachedValue); + return promise.future(); + } + } + + configureTracking() + .onFailure(promise::fail) + .onSuccess(v -> + delegate + .send(request) + .onFailure(promise::fail) + .onSuccess(resp -> { + if (req.isCacheable() && resp != null) { + cache.put(req.cacheKey(), resp); + } + promise.complete(resp); + })); + + return promise.future(); + } + + @Override + public Future> batch(List requests) { + // Currently we don't support batching because order can matter. If an earlier request in the + // batch changes the value that a later, cacheable, request reads, we can't guarantee we + // invalidate in time. Rather than risk returning unexpected or stale values, simple don't deal + // with caching during a batch. + return delegate.batch(requests); + } + + @Override + public Future close() { + return delegate.close(); + } + + @Override + public boolean pendingQueueFull() { + return delegate.pendingQueueFull(); + } + + private Future configureTracking() { + final Promise trackingPromise = vertx.promise(); + + if (actual.isTrackingConfigured()) { + trackingPromise.complete(); + } else { + actual.send(trackingCommand).onComplete(ar -> { + if (ar.succeeded()) { + actual.setTrackingConfigured(); + } + + trackingPromise.handle(ar.mapEmpty()); + }); + } + + return trackingPromise.future(); + } + + private static Request trackingCommand(int clientId, CachingRedisOptions options) { + final Request request = Request.cmd(Command.CLIENT) + .arg("TRACKING") + .arg("ON") + .arg("REDIRECT") + .arg(clientId); + + for (String prefix : options.getPrefixes()) { + request.arg("PREFIX").arg(prefix); + } + + if (ClientSideCacheMode.BROADCAST == options.getMode()) { + request.arg("BCAST"); + } + + return request; + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java b/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java index cb88983e..f609437f 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java @@ -38,6 +38,12 @@ public interface RedisConnectionInternal extends RedisConnection { */ boolean isTainted(); + /** + * Returns {@code true} if a {@link io.vertx.redis.client.CachingRedis} client is being used, + * and tracking has been configured for this connection. + */ + boolean isTrackingConfigured(); + VertxInternal vertx(); /** diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java index c865236e..c0db145e 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java @@ -19,6 +19,10 @@ public RedisSentinelConnection(PooledRedisConnection connection, PooledRedisConn this.sentinel = sentinel; } + public RedisConnectionInternal actual() { + return connection.actual(); + } + @Override public RedisConnection exceptionHandler(Handler handler) { connection.exceptionHandler(handler); diff --git a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java index 3cf6bf3c..9bb36887 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java @@ -55,6 +55,7 @@ public class RedisStandaloneConnection implements RedisConnectionInternal, Parse private Runnable onEvict; private boolean closed = false; private boolean tainted = false; + private boolean trackingConfigured = false; public RedisStandaloneConnection(VertxInternal vertx, ContextInternal context, PoolConnector.Listener connectionListener, NetSocket netSocket, PoolOptions options, int maxWaitingHandlers, RedisURI uri, ClientMetrics metrics, TracingPolicy tracingPolicy) { //System.out.println("#" + this.hashCode()); @@ -78,6 +79,10 @@ synchronized void setValid() { tainted = false; } + synchronized void setTrackingConfigured() { + trackingConfigured = true; + } + @Override public void forceClose() { //System.out.println("forceClose()#" + this.hashCode()); @@ -91,6 +96,11 @@ public boolean isValid() { return !closed && (expiresAt <= 0 || System.currentTimeMillis() < expiresAt); } + @Override + public boolean isTrackingConfigured() { + return trackingConfigured; + } + @Override public Future close() { //System.out.println("close()#" + this.hashCode()); diff --git a/src/main/java/io/vertx/redis/client/impl/RequestImpl.java b/src/main/java/io/vertx/redis/client/impl/RequestImpl.java index e80328a6..b2344573 100644 --- a/src/main/java/io/vertx/redis/client/impl/RequestImpl.java +++ b/src/main/java/io/vertx/redis/client/impl/RequestImpl.java @@ -19,6 +19,7 @@ import io.vertx.redis.client.Command; import io.vertx.redis.client.Request; +import io.vertx.redis.client.impl.cache.CacheKey; import java.nio.charset.StandardCharsets; import java.util.*; @@ -140,6 +141,14 @@ public Request arg(Buffer arg) { return this; } + boolean isCacheable() { + return keys().size() == 1 && cmd.isReadOnly(args); + } + + CacheKey cacheKey() { + return new CacheKey(new String(keys().get(0), StandardCharsets.UTF_8)); + } + Buffer encode() { return encode(Buffer.buffer()); } diff --git a/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java b/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java new file mode 100644 index 00000000..d7d4d9f0 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl.cache; + +import java.util.Objects; + +public class CacheKey { + + private final String key; + private final long createdAtMillis; + + public CacheKey(String key) { + this(key, System.currentTimeMillis()); + } + + public CacheKey(String key, long createdAtMillis) { + this.key = key; + this.createdAtMillis = createdAtMillis; + } + + public String getKey() { + return key; + } + + public long createdAt() { + return createdAtMillis; + } + + public long ageMillis() { + return System.currentTimeMillis() - createdAtMillis; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(key, cacheKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java b/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java new file mode 100644 index 00000000..0ecef56b --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java @@ -0,0 +1,101 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl.cache; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.RedisClientCache; +import io.vertx.redis.client.Response; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRURedisClientCache implements RedisClientCache { + + private final Store store; + private final long maxAgeMillis; + + public LRURedisClientCache(CachingRedisOptions options) { + this.store = new Store(options.getMaxCacheSize()); + this.maxAgeMillis = options.getMaxAgeUnit().toMillis(options.getMaxAge()); + } + + @Override + public @Nullable Response get(CacheKey key) { + final CacheEntry entry = store.get(key); + + if (entry == null) { + return null; + } + + if (entry.getAgeMillis() > maxAgeMillis) { + store.remove(key); + return null; + } + + return entry.response; + } + + @Override + public void put(CacheKey key, Response value) { + store.put(key, new CacheEntry(value)); + } + + @Override + public void delete(CacheKey key) { + store.remove(key); + } + + @Override + public void flush() { + store.clear(); + } + + @Override + public void close() { + flush(); + } + + static final class Store extends LinkedHashMap { + + private final int maxSize; + + Store(int maxSize) { + super(maxSize, 0.75f, true); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } + } + + static final class CacheEntry { + + private final Response response; + private final long insertTime; + + CacheEntry(Response response) { + this.response = response; + this.insertTime = System.currentTimeMillis(); + } + + long getAgeMillis() { + return System.currentTimeMillis() - insertTime; + } + } +} diff --git a/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java b/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java new file mode 100644 index 00000000..7681ea8f --- /dev/null +++ b/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java @@ -0,0 +1,53 @@ +package io.vertx.redis.client.test; + +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.CachingRedis; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.Request; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.GenericContainer; + +@RunWith(VertxUnitRunner.class) +public class CachingRedisTest { + + @ClassRule + public static final GenericContainer redis = new GenericContainer<>("redis:7") + .withExposedPorts(6379); + + @Rule + public final RunTestOnContext rule = new RunTestOnContext(); + + @Test + public void simpleTest(TestContext should) { + final Async test = should.async(); + + CachingRedis.create(rule.vertx(), Redis.createClient(rule.vertx(), "redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())) + .send(Request.cmd(Command.PING)).onComplete(send -> { + should.assertTrue(send.succeeded()); + should.assertNotNull(send.result()); + should.assertEquals("PONG", send.result().toString()); + test.complete(); + }); + } + + @Test + public void preservesContext(TestContext should) { + Redis client = Redis.createClient(rule.vertx(), new RedisOptions() + .setConnectionString("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())); + CachingRedis cachingClient = CachingRedis.create(rule.vertx(), client); + + PreservesContext.sendWithoutConnect(cachingClient, should); + PreservesContext.batchWithoutConnect(cachingClient, should); + PreservesContext.connect(cachingClient, should); + PreservesContext.connectThenSend(cachingClient, should); + PreservesContext.connectThenBatch(cachingClient, should); + } +} diff --git a/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java b/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java new file mode 100644 index 00000000..c8ee7ab5 --- /dev/null +++ b/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java @@ -0,0 +1,125 @@ +package io.vertx.test.redis; + +import io.vertx.core.Context; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.CachingRedis; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.Response; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.GenericContainer; + +import java.util.UUID; + +@RunWith(VertxUnitRunner.class) +public class CachingRedisClientTest { + + @ClassRule + public static final GenericContainer container = new GenericContainer<>("redis:6.0.6") + .withExposedPorts(6379); + + @Rule + public final RunTestOnContext rule = new RunTestOnContext(); + + private Redis client; + private CachingRedis cachingClient; + private RedisAPI api; + + @Before + public void before(TestContext should) { + final Async before = should.async(); + + Context context = rule.vertx().getOrCreateContext(); + client = Redis.createClient(rule.vertx(), new RedisOptions().setConnectionString("redis://" + container.getHost() + ":" + container.getFirstMappedPort())); + cachingClient = CachingRedis.create(rule.vertx(), client); + cachingClient.connect().onComplete(onConnect -> { + should.assertTrue(onConnect.succeeded()); + should.assertEquals(context, rule.vertx().getOrCreateContext()); + api = RedisAPI.api(cachingClient); + before.complete(); + }); + } + + @After + public void after() { + cachingClient.close(); + } + + @Test + public void testSimpleSetAndGet(TestContext should) { + final Async test = should.async(); + final String key = UUID.randomUUID().toString(); + + api + .send(Command.SET, key, "hello, world") + .onComplete(setResponse -> { + should.assertTrue(setResponse.succeeded()); + + api + .get(key) + .onComplete(get1Response -> { + should.assertTrue(get1Response.succeeded()); + + final Response response = get1Response.result(); + + should.assertEquals(response.toString(), "hello, world"); + + // The cache impl stores the actual object, so we can assert it's from cache by + // asserting a subsequent get returns the same object + api + .get(key) + .onComplete(get2Response -> { + should.assertTrue(get2Response.succeeded()); + should.assertEquals(get2Response.result().toString(), "hello, world"); + should.assertTrue(get2Response.result() == response); + test.complete(); + }); + }); + }); + } + + @Test + public void testInvalidation(TestContext should) { + final Async test = should.async(); + final String key = UUID.randomUUID().toString(); + + cachingClient.invalidationHandler(keys -> { + should.assertEquals(keys.size(), 1); + test.complete(); + }); + + api.send(Command.SET, key, "hello, world").onComplete(set1Response -> { + should.assertTrue(set1Response.succeeded()); + + api.get(key).onComplete(get1Response -> { + should.assertTrue(get1Response.succeeded()); + + final Response response = get1Response.result(); + + should.assertEquals(response.toString(), "hello, world"); + + // The cache impl stores the actual object, so we can assert it's from cache by + // asserting a subsequent get returns the same object + api.get(key).onComplete(get2Response -> { + should.assertTrue(get2Response.succeeded()); + should.assertEquals(get2Response.result().toString(), "hello, world"); + should.assertTrue(get2Response.result() == response); + + api.send(Command.SET, key, "new value").onComplete(set2Response -> { + should.assertTrue(set2Response.succeeded()); + }); + }); + }); + }); + } +}