diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index 8545dcdf..0c592ba2 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -24,6 +24,7 @@ Redis has a rich API and it can be organized in the following groups: * Strings - Commands to work with Strings. * Transactions - Commands to handle transaction lifecycle. * Streams - Commands to handle streaming. +* Client-side caching - Commands to control client-side caching. == Using Vert.x-Redis @@ -182,9 +183,54 @@ And from another place in the code publish messages to the queue: ---- NOTE: It is important to remember that the commands `SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE` and `PUNSUBSCRIBE` are `void`. -This means that the result in case of success is `null` not a instance of response. +This means that the result in case of success is `null` not an instance of response. All messages are then routed through the handler on the client. +== Client-side Caching + +Redis supports client-side caching implementations using a strategy called _Tracking_. + +All modes of the client support caching except connections that are in pub/sub mode. + +To create a client with client-side caching, one would do: + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching1} +---- + +A specific interface, `CachingRedis`, is exposed for a caching client that allows an invalidation handler to be attached or a flush command to be issued. +The invalidation handler will be invoked with all the keys being invalidated whenever a message is received on the invalidations connection. + +To attach an invalidations handler: + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching2} +---- + +To manually flush the client's cache store: + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching3} +---- + +The client comes with a default cache store out of the box, but you can write your own if you prefer. + +The implementations are expected to follow the `ServiceLoader` conventions and all stores that are available at runtime from the classpath will be exposed. +When more than 1 implementation is available the first one that can be instantiated and configured with success becomes the default. +If none is available, then the default is a simple Least-Recently-Used (LRU) cache backed by a `LinkedHashMap`. + +[source,$lang] +---- +{@link examples.RedisExamples#clientCaching4} +---- + +NOTE: The cache is not a write-through cache. A value will not be stored in the client-side cache until the value is fetched from Redis for the first time. +To avoid write-then-read race conditions within the same batch, read commands that are part of a batch will not check the cache first. +Additionally, the current implementation does not support the `OPTIN` or `NOLOOP` options. + == Tracing commands The Redis client can trace command execution when Vert.x has tracing enabled. diff --git a/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java b/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java new file mode 100644 index 00000000..9c3cdccf --- /dev/null +++ b/src/main/generated/io/vertx/redis/client/CachingRedisOptionsConverter.java @@ -0,0 +1,95 @@ +package io.vertx.redis.client; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.impl.JsonUtil; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Base64; + +/** + * Converter and mapper for {@link io.vertx.redis.client.CachingRedisOptions}. + * NOTE: This class has been automatically generated from the {@link io.vertx.redis.client.CachingRedisOptions} original class using Vert.x codegen. + */ +public class CachingRedisOptionsConverter { + + + private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER; + private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER; + + public static void fromJson(Iterable> json, CachingRedisOptions obj) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "enabled": + if (member.getValue() instanceof Boolean) { + obj.setEnabled((Boolean)member.getValue()); + } + break; + case "maxCacheSize": + if (member.getValue() instanceof Number) { + obj.setMaxCacheSize(((Number)member.getValue()).intValue()); + } + break; + case "maxAge": + if (member.getValue() instanceof Number) { + obj.setMaxAge(((Number)member.getValue()).longValue()); + } + break; + case "maxAgeUnit": + if (member.getValue() instanceof String) { + obj.setMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; + case "mode": + if (member.getValue() instanceof String) { + obj.setMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue())); + } + break; + case "prefixes": + if (member.getValue() instanceof JsonArray) { + java.util.ArrayList list = new java.util.ArrayList<>(); + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + list.add((String)item); + }); + obj.setPrefixes(list); + } + break; + case "prefix": + if (member.getValue() instanceof String) { + obj.setPrefix((String)member.getValue()); + } + break; + case "prefixs": + if (member.getValue() instanceof JsonArray) { + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + obj.addPrefix((String)item); + }); + } + break; + } + } + } + + public static void toJson(CachingRedisOptions obj, JsonObject json) { + toJson(obj, json.getMap()); + } + + public static void toJson(CachingRedisOptions obj, java.util.Map json) { + json.put("enabled", obj.getEnabled()); + json.put("maxCacheSize", obj.getMaxCacheSize()); + json.put("maxAge", obj.getMaxAge()); + if (obj.getMaxAgeUnit() != null) { + json.put("maxAgeUnit", obj.getMaxAgeUnit().name()); + } + if (obj.getMode() != null) { + json.put("mode", obj.getMode().name()); + } + if (obj.getPrefixes() != null) { + JsonArray array = new JsonArray(); + obj.getPrefixes().forEach(item -> array.add(item)); + json.put("prefixes", array); + } + } +} diff --git a/src/main/generated/io/vertx/redis/client/RedisOptionsConverter.java b/src/main/generated/io/vertx/redis/client/RedisOptionsConverter.java index 9ce11de8..0f541e74 100644 --- a/src/main/generated/io/vertx/redis/client/RedisOptionsConverter.java +++ b/src/main/generated/io/vertx/redis/client/RedisOptionsConverter.java @@ -123,6 +123,56 @@ public static void fromJson(Iterable> json, obj.setPoolName((String)member.getValue()); } break; + case "cachingOptions": + break; + case "cacheEnabled": + if (member.getValue() instanceof Boolean) { + obj.setCacheEnabled((Boolean)member.getValue()); + } + break; + case "cacheMaxSize": + if (member.getValue() instanceof Number) { + obj.setCacheMaxSize(((Number)member.getValue()).intValue()); + } + break; + case "cacheMaxAge": + if (member.getValue() instanceof Number) { + obj.setCacheMaxAge(((Number)member.getValue()).longValue()); + } + break; + case "cacheMaxAgeUnit": + if (member.getValue() instanceof String) { + obj.setCacheMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; + case "cacheMode": + if (member.getValue() instanceof String) { + obj.setCacheMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue())); + } + break; + case "cachePrefixes": + if (member.getValue() instanceof JsonArray) { + java.util.ArrayList list = new java.util.ArrayList<>(); + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + list.add((String)item); + }); + obj.setCachePrefixes(list); + } + break; + case "cachePrefix": + if (member.getValue() instanceof String) { + obj.setCachePrefix((String)member.getValue()); + } + break; + case "cachePrefixs": + if (member.getValue() instanceof JsonArray) { + ((Iterable)member.getValue()).forEach( item -> { + if (item instanceof String) + obj.addCachePrefix((String)item); + }); + } + break; } } } @@ -171,5 +221,19 @@ public static void toJson(RedisOptions obj, java.util.Map json) if (obj.getPoolName() != null) { json.put("poolName", obj.getPoolName()); } + json.put("cacheEnabled", obj.getCacheEnabled()); + json.put("cacheMaxSize", obj.getCacheMaxSize()); + json.put("cacheMaxAge", obj.getCacheMaxAge()); + if (obj.getCacheMaxAgeUnit() != null) { + json.put("cacheMaxAgeUnit", obj.getCacheMaxAgeUnit().name()); + } + if (obj.getCacheMode() != null) { + json.put("cacheMode", obj.getCacheMode().name()); + } + if (obj.getCachePrefixes() != null) { + JsonArray array = new JsonArray(); + obj.getCachePrefixes().forEach(item -> array.add(item)); + json.put("cachePrefixes", array); + } } } diff --git a/src/main/java/examples/RedisExamples.java b/src/main/java/examples/RedisExamples.java index b9e4e289..34ce01a9 100644 --- a/src/main/java/examples/RedisExamples.java +++ b/src/main/java/examples/RedisExamples.java @@ -1,5 +1,6 @@ package examples; +import io.vertx.codegen.annotations.Nullable; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -7,6 +8,13 @@ import io.vertx.core.tracing.TracingPolicy; import io.vertx.redis.client.*; +import io.vertx.redis.client.impl.CachingRedis; +import io.vertx.redis.client.impl.CachingRedisClient; +import io.vertx.redis.client.impl.RedisClient; +import io.vertx.redis.client.impl.cache.CacheKey; +import io.vertx.redis.client.spi.RedisClientCache; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -143,7 +151,7 @@ private Future createRedisClient() { // make sure to invalidate old connection if present if (redis != null) { - redis.close();; + redis.close(); } if (CONNECTING.compareAndSet(false, true)) { @@ -246,4 +254,82 @@ public void example13(Vertx vertx) { public void tracing1(RedisOptions options) { options.setTracingPolicy(TracingPolicy.ALWAYS); } + + public void clientCaching1(Vertx vertx) { + Redis.createClient( + vertx, + new RedisOptions() + .setCacheEnabled(true) + .setCacheMaxSize(256) + .setCacheMaxAge(60_000)) + .connect() + .onSuccess(conn -> { + // get the value for a key, returning from a local in-memory cache if + // it exists, or fetching from Redis if not. if the value is fetched from + // Redis, it will be stored in the local cache + conn.send(Request.cmd(Command.GET).arg("key")); + }); + } + + public void clientCaching2(Redis redis) { + CachingRedis cachingClient = (CachingRedis) redis; + + cachingClient.invalidationHandler(keys -> { + // something... + }); + } + + public void clientCaching3(Redis redis) { + CachingRedis cachingClient = (CachingRedis) redis; + + cachingClient.flush().onSuccess(ignored -> { + // Success! + }); + } + + public void clientCaching4(Vertx vertx, RedisClientCache customCache) { + + // Register this class in META-INF/services/io.vertx.redis.client.spi.RedisClientCache + class CustomCache implements RedisClientCache { + + private final Map store = new HashMap<>(); + + @Override + public @Nullable Response get(CacheKey key) { + return store.get(key); + } + + @Override + public void put(CacheKey key, Response value) { + store.put(key, value); + } + + @Override + public void delete(CacheKey key) { + store.remove(key); + } + + @Override + public void flush() { + store.clear(); + } + + @Override + public void close() { + // Nothing to do here + } + } + + Redis.createClient( + vertx, + new RedisOptions() + .setCacheEnabled(true)) + .connect() + .onSuccess(conn -> { + // get the value for a key, returning from the custom cache if + // it exists, or fetching from Redis if not. if the value is fetched from + // Redis, it will be stored in the local cache + conn.send(Request.cmd(Command.GET).arg("key")); + }); + } } diff --git a/src/main/java/io/vertx/redis/client/CachingRedisOptions.java b/src/main/java/io/vertx/redis/client/CachingRedisOptions.java new file mode 100644 index 00000000..2bbc469f --- /dev/null +++ b/src/main/java/io/vertx/redis/client/CachingRedisOptions.java @@ -0,0 +1,226 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.core.json.JsonObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@DataObject(generateConverter = true) +public class CachingRedisOptions { + + /** + * The default number of max entries the cache will store. + */ + public static final int DEFAULT_MAX_SIZE = 1024; + + /** + * The default max age, in {@link #DEFAULT_MAX_AGE_UNIT} units, of an item in the cache. + */ + public static final long DEFAULT_MAX_AGE = 3_600_000; // 1 hour in milliseconds + public static final TimeUnit DEFAULT_MAX_AGE_UNIT = TimeUnit.MILLISECONDS; + + private boolean enabled; + private int maxCacheSize; + private long maxAge; + private TimeUnit maxAgeUnit; + private ClientSideCacheMode mode; + private List prefixes; + + /** + * Creates a default configuration. + */ + public CachingRedisOptions() { + this.enabled = false; + this.maxCacheSize = DEFAULT_MAX_SIZE; + this.maxAge = DEFAULT_MAX_AGE; + this.maxAgeUnit = DEFAULT_MAX_AGE_UNIT; + this.mode = ClientSideCacheMode.PER_CLIENT; + this.prefixes = new ArrayList<>(); + } + + /** + * Copy constructor. + * + * @param other the options to clone + */ + public CachingRedisOptions(CachingRedisOptions other) { + this.enabled = other.enabled; + this.maxCacheSize = other.maxCacheSize; + this.maxAge = other.maxAge; + this.maxAgeUnit = other.maxAgeUnit; + this.mode = other.mode; + this.prefixes = other.prefixes; + } + + /** + * Create from JSON constructor. + * + * @param json the source JSON + */ + public CachingRedisOptions(JsonObject json) { + this(); + CachingRedisOptionsConverter.fromJson(json, this); + } + + /** + * @return true if caching is enabled + */ + public boolean getEnabled() { + return enabled; + } + + /** + * Set if caching is enabled. + * + * @param enabled if caching is enabled + * @return fluent self + */ + public CachingRedisOptions setEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + /** + * Get the max cache size. + * + * @return the max size + */ + public int getMaxCacheSize() { + return maxCacheSize; + } + + /** + * Set the max cache size. + * + * @param maxCacheSize the max size + * @return fluent self + */ + public CachingRedisOptions setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + return this; + } + + /** + * Get the max item age. + * + * @return the max age value + */ + public long getMaxAge() { + return maxAge; + } + + /** + * Get the max item age time unit. + * + * @return the time unit + */ + public TimeUnit getMaxAgeUnit() { + return maxAgeUnit; + } + + /** + * Set the max age value. + * + *

+ * By default, the max age is in milliseconds. If you want to change the unit, use {@link #setMaxAgeUnit(TimeUnit)}. + * + * @param maxAge the max age + * @return fluent self + */ + public CachingRedisOptions setMaxAge(long maxAge) { + this.maxAge = maxAge; + return this; + } + + /** + * Set the max age time unit. + * + * @param unit the time unit + * @return fluent self + */ + public CachingRedisOptions setMaxAgeUnit(TimeUnit unit) { + this.maxAgeUnit = unit; + return this; + } + + /** + * Get the tracking mode the clients configure the data connection with. + * + * @return the mode + */ + public ClientSideCacheMode getMode() { + return mode; + } + + /** + * Set the tracking mode the clients configure the data connection with. + * + * @param mode the mode + * @return fluent self + */ + public CachingRedisOptions setMode(ClientSideCacheMode mode) { + this.mode = mode; + return this; + } + + /** + * Get the key prefixes the server should send invalidation messages for. + * + * @return the prefixes + */ + public List getPrefixes() { + return prefixes; + } + + /** + * Set the key prefixes the server should send invalidation messages for, replacing any existing values. + * + * @param prefixes the prefix list + * @return fluent self + */ + public CachingRedisOptions setPrefixes(List prefixes) { + this.prefixes.clear(); + this.prefixes.addAll(prefixes); + return this; + } + + /** + * Set a single prefix the server should send invalidation messages for, replacing any existing values. + * + * @param prefix the prefix + * @return fluent self + */ + public CachingRedisOptions setPrefix(String prefix) { + prefixes.clear(); + prefixes.add(prefix); + return this; + } + + /** + * Add a prefix the server should send invalidation messages for. + * + * @param prefix the prefix to add + * @return fluent self + */ + public CachingRedisOptions addPrefix(String prefix) { + prefixes.add(prefix); + return this; + } +} diff --git a/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java b/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java new file mode 100644 index 00000000..c4cc1229 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/ClientSideCacheMode.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client; + +/** + * The mode to caching clients will configure invalidations connections in. + * + * @see Tracking modes + */ +public enum ClientSideCacheMode { + PER_CLIENT, + BROADCAST +} diff --git a/src/main/java/io/vertx/redis/client/Redis.java b/src/main/java/io/vertx/redis/client/Redis.java index c5b99e11..5b5aee80 100644 --- a/src/main/java/io/vertx/redis/client/Redis.java +++ b/src/main/java/io/vertx/redis/client/Redis.java @@ -19,14 +19,13 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.redis.client.impl.CachingRedisClient; import io.vertx.redis.client.impl.RedisClient; import io.vertx.redis.client.impl.RedisClusterClient; import io.vertx.redis.client.impl.RedisReplicationClient; import io.vertx.redis.client.impl.RedisSentinelClient; import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; /** * A simple Redis client. @@ -64,18 +63,30 @@ static Redis createClient(Vertx vertx, String connectionString) { * @return the client */ static Redis createClient(Vertx vertx, RedisOptions options) { + final Redis clientImpl; + switch (options.getType()) { case STANDALONE: - return new RedisClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisStandaloneConnectOptions(options), options.getTracingPolicy()); + clientImpl = new RedisClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisStandaloneConnectOptions(options), options.getTracingPolicy()); + break; case SENTINEL: - return new RedisSentinelClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisSentinelConnectOptions(options), options.getTracingPolicy()); + clientImpl = new RedisSentinelClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisSentinelConnectOptions(options), options.getTracingPolicy()); + break; case CLUSTER: - return new RedisClusterClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisClusterConnectOptions(options), options.getTracingPolicy()); + clientImpl = new RedisClusterClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisClusterConnectOptions(options), options.getTracingPolicy()); + break; case REPLICATION: - return new RedisReplicationClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisClusterConnectOptions(options), options.getTracingPolicy()); + clientImpl = new RedisReplicationClient(vertx, options.getNetClientOptions(), options.getPoolOptions(), new RedisClusterConnectOptions(options), options.getTracingPolicy()); + break; default: throw new IllegalStateException("Unknown Redis Client type: " + options.getType()); } + + if (options.getCacheEnabled()) { + return new CachingRedisClient(vertx, clientImpl, options.getCachingOptions()); + } + + return clientImpl; } /** diff --git a/src/main/java/io/vertx/redis/client/RedisOptions.java b/src/main/java/io/vertx/redis/client/RedisOptions.java index 17b6ca0e..9feac4a5 100644 --- a/src/main/java/io/vertx/redis/client/RedisOptions.java +++ b/src/main/java/io/vertx/redis/client/RedisOptions.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Redis Client Configuration options. @@ -50,6 +51,7 @@ public class RedisOptions { private volatile String password; private boolean protocolNegotiation; private TracingPolicy tracingPolicy; + private CachingRedisOptions cachingOptions; /** * Creates a default configuration object using redis server defaults @@ -66,6 +68,7 @@ public RedisOptions() { maxNestedArrays = 32; protocolNegotiation = true; maxWaitingHandlers = 2048; + cachingOptions = new CachingRedisOptions(); } /** @@ -84,6 +87,7 @@ public RedisOptions(RedisOptions other) { this.useReplicas = other.useReplicas; this.password = other.password; this.protocolNegotiation = other.protocolNegotiation; + this.cachingOptions = other.cachingOptions; } /** @@ -537,6 +541,119 @@ public String getPoolName() { return poolOptions.getName(); } + /** + * @return the client-side caching options + */ + public CachingRedisOptions getCachingOptions() { + return cachingOptions; + } + + /** + * @see CachingRedisOptions#getEnabled() + */ + public boolean getCacheEnabled() { + return cachingOptions.getEnabled(); + } + + /** + * @see CachingRedisOptions#setEnabled(boolean) + */ + public RedisOptions setCacheEnabled(boolean cacheEnabled) { + cachingOptions.setEnabled(cacheEnabled); + return this; + } + + /** + * @see CachingRedisOptions#getMaxCacheSize() + */ + public int getCacheMaxSize() { + return cachingOptions.getMaxCacheSize(); + } + + /** + * @see CachingRedisOptions#setMaxCacheSize(int) + */ + public RedisOptions setCacheMaxSize(int maxCacheSize) { + cachingOptions.setMaxCacheSize(maxCacheSize); + return this; + } + + /** + * @see CachingRedisOptions#getMaxAge() + */ + public long getCacheMaxAge() { + return cachingOptions.getMaxAge(); + } + + /** + * @see CachingRedisOptions#setMaxAge(long) + */ + public RedisOptions setCacheMaxAge(long cacheMaxAge) { + cachingOptions.setMaxAge(cacheMaxAge); + return this; + } + + /** + * @see CachingRedisOptions#getMaxAgeUnit() + */ + public TimeUnit getCacheMaxAgeUnit() { + return cachingOptions.getMaxAgeUnit(); + } + + /** + * @see CachingRedisOptions#setMaxAgeUnit(TimeUnit) + */ + public RedisOptions setCacheMaxAgeUnit(TimeUnit cacheMaxAgeUnit) { + cachingOptions.setMaxAgeUnit(cacheMaxAgeUnit); + return this; + } + + /** + * @see CachingRedisOptions#getMode() + */ + public ClientSideCacheMode getCacheMode() { + return cachingOptions.getMode(); + } + + /** + * @see CachingRedisOptions#setMode(ClientSideCacheMode) + */ + public RedisOptions setCacheMode(ClientSideCacheMode cacheMode) { + cachingOptions.setMode(cacheMode); + return this; + } + + /** + * @see CachingRedisOptions#getPrefixes() + */ + public List getCachePrefixes() { + return cachingOptions.getPrefixes(); + } + + /** + * @see CachingRedisOptions#setPrefixes(List) + */ + public RedisOptions setCachePrefixes(List cachePrefixes) { + cachingOptions.setPrefixes(cachePrefixes); + return this; + } + + /** + * @see CachingRedisOptions#setPrefix(String) + */ + public RedisOptions setCachePrefix(String cachePrefix) { + cachingOptions.setPrefix(cachePrefix); + return this; + } + + /** + * @see CachingRedisOptions#addPrefix(String) + */ + public RedisOptions addCachePrefix(String cachePrefix) { + cachingOptions.addPrefix(cachePrefix); + return this; + } + /** * Converts this object to JSON notation. * diff --git a/src/main/java/io/vertx/redis/client/impl/CachingRedis.java b/src/main/java/io/vertx/redis/client/impl/CachingRedis.java new file mode 100644 index 00000000..152cd064 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/CachingRedis.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.impl.cache.CacheKey; + +import java.util.Collection; + +public interface CachingRedis extends Redis { + + /** + * Flush the local cache. + * + *

+ * This operation only clears the local cache and has no interaction with the server. + * + * @return a future indicating the status of the operation + */ + Future flush(); + + /** + * Set a handler to be called when invalidation is performed. + * + *

+ * The client will clear the keys before this handler is invoked. It is not recommended to modify + * the cache as a part of this handler. The primary function is for instrumentation. + * + * @param handler a handler that accepts the keys which were invalidated + * @return fluent self + */ + CachingRedis invalidationHandler(Handler> handler); +} diff --git a/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java b/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java new file mode 100644 index 00000000..dac85b29 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/CachingRedisClient.java @@ -0,0 +1,256 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; +import io.vertx.redis.client.impl.cache.CacheKey; +import io.vertx.redis.client.impl.cache.LRURedisClientCache; +import io.vertx.redis.client.impl.types.MultiType; +import io.vertx.redis.client.impl.types.PushType; +import io.vertx.redis.client.spi.RedisClientCache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class CachingRedisClient implements CachingRedis { + + private static final Logger LOG = LoggerFactory.getLogger(CachingRedis.class); + private static final String INVALIDATIONS_CHANNEL = "__redis__:invalidate"; + private static final Request CLIENT_ID_COMMAND = Request.cmd(Command.CLIENT).arg("ID"); + private static final Request SUBSCRIBE_COMMAND = Request.cmd(Command.SUBSCRIBE).arg(INVALIDATIONS_CHANNEL); + private static final int MAX_RECONNECT_ATTEMPTS = 5; + + private final VertxInternal vertx; + private final Redis redis; + private final CachingRedisOptions options; + private final RedisClientCache cache; + private final AtomicBoolean closed = new AtomicBoolean(); + private Promise clientIdPromise; + private Handler> invalidationHandler; + + public CachingRedisClient(Vertx vertx, Redis redis, CachingRedisOptions options) { + if (redis instanceof CachingRedis) { + throw new IllegalArgumentException("CachingRedisClient can not be self referential"); + } + + this.vertx = (VertxInternal) vertx; + this.redis = redis; + this.options = options; + this.cache = discoverCacheImpl(options); + } + + @Override + public Future connect() { + if (closed.get()) { + return vertx.getContext().failedFuture("Client is closed"); + } + + final Promise promise = vertx.promise(); + + getClientId().onComplete(id -> { + if (id.failed()) { + promise.fail(id.cause()); + return; + } + + final int clientId = id.result(); + + redis.connect().onComplete(connect -> { + if (connect.failed()) { + promise.fail(connect.cause()); + return; + } + + final RedisConnection connection = connect.result(); + + promise.complete(new CachingRedisConnection(vertx, cache, connection, clientId, options)); + }); + }); + + return promise.future(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + redis.close(); + cache.close(); + } + } + + @Override + public Future<@Nullable Response> send(Request request) { + if (closed.get()) { + return vertx.getContext().failedFuture("Client is closed"); + } + + return connect().compose(conn -> + conn.send(request) + .eventually(() -> + conn.close().onFailure(LOG::warn))); + } + + @Override + public Future> batch(List commands) { + return redis.batch(commands); + } + + @Override + public Future flush() { + cache.flush(); + return Future.succeededFuture(); + } + + @Override + public CachingRedis invalidationHandler(Handler> handler) { + this.invalidationHandler = handler; + return this; + } + + Future getClientId() { + synchronized (this) { + if (clientIdPromise != null) { + return clientIdPromise.future(); + } else { + clientIdPromise = vertx.promise(); + } + } + + createInvalidationsConnection(0).onComplete(clientIdPromise); + + return clientIdPromise.future(); + } + + private Future createInvalidationsConnection(int attempt) { + if (closed.get()) { + // we're shutting down, the endHandler will have sent us here so don't try to reconnect + return Future.failedFuture("Client is closed"); + } + if (attempt > MAX_RECONNECT_ATTEMPTS) { + return Future.failedFuture("max reconnect attempts exceeded"); + } + + return redis + .connect() + .compose(this::initInvalidationsConnection) + .recover(e -> createInvalidationsConnection(attempt + 1)); + } + + private Future initInvalidationsConnection(RedisConnection connection) { + connection.exceptionHandler(e -> { + LOG.warn(e); + // flush the cache to ensure no stale data is served while reconnecting + cache.flush(); + connection.close(); + reconnectInvalidationsConnection(); + }); + + connection.endHandler(v -> { + // flush the cache to ensure no stale data is served while reconnecting + cache.flush(); + reconnectInvalidationsConnection(); + }); + + return connection + .send(CLIENT_ID_COMMAND) + .map(Response::toInteger) + .compose(clientId -> connection + .send(SUBSCRIBE_COMMAND) + .onSuccess(r -> connection.handler(this::handleInvalidationMessage)) + .map(clientId)); + } + + private void reconnectInvalidationsConnection() { + if (closed.get()) { + return; + } + synchronized (this) { + clientIdPromise = vertx.promise(); + } + + createInvalidationsConnection(0).onComplete(clientIdPromise); + } + + private void handleInvalidationMessage(Response message) { + // only process messages like `invalidate: [key1,key2,key3]` or `invalidate: key` + if (message instanceof PushType) { + final PushType pushMessage = (PushType) message; + final String pushCommand = pushMessage.get(0).toString(); + + if ("INVALIDATE".equalsIgnoreCase(pushCommand)) { + final Response invalidationMessage = pushMessage.get(1); + final Set invalidatedKeys = new HashSet<>(); + + if (invalidationMessage instanceof MultiType) { + final Iterator manyKeys = invalidationMessage.iterator(); + + do { + final CacheKey key = new CacheKey(manyKeys.next().toString()); + cache.delete(key); + invalidatedKeys.add(key); + } while (manyKeys.hasNext()); + } else { + final CacheKey key = new CacheKey(invalidationMessage.toString()); + cache.delete(key); + invalidatedKeys.add(key); + } + + if (invalidationHandler != null) { + invalidationHandler.handle(invalidatedKeys); + } + } + } + } + + private static RedisClientCache discoverCacheImpl(CachingRedisOptions options) { + final List impls = new ArrayList<>(); + ServiceLoader.load(RedisClientCache.class).iterator().forEachRemaining(impls::add); + + if (impls.isEmpty()) { + return new LRURedisClientCache(options); + } + + final RedisClientCache selected = impls.get(0); + + if (impls.size() > 1) { + LOG.debug("Multiple implementations of RedisClientCache found: " + + impls.stream().map(c -> c.getClass().getName()).collect(Collectors.joining(", ")) + + ", using: " + selected.getClass().getName()); + } + + return selected; + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java b/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java new file mode 100644 index 00000000..8ce07410 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/CachingRedisConnection.java @@ -0,0 +1,170 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.impl.VertxInternal; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.spi.RedisClientCache; +import io.vertx.redis.client.ClientSideCacheMode; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; + +import java.util.List; + +public class CachingRedisConnection implements RedisConnection { + + private final VertxInternal vertx; + private final RedisClientCache cache; + private final RedisConnection delegate; + private final RedisStandaloneConnection actual; + private final Request trackingCommand; + + public CachingRedisConnection(VertxInternal vertx, RedisClientCache cache, RedisConnection delegate, int clientId, CachingRedisOptions options) { + this.vertx = vertx; + this.cache = cache; + this.delegate = delegate; + this.trackingCommand = trackingCommand(clientId, options); + + if (delegate instanceof PooledRedisConnection) { + this.actual = (RedisStandaloneConnection) ((PooledRedisConnection) delegate).actual(); + } else if (delegate instanceof RedisSentinelConnection) { + this.actual = (RedisStandaloneConnection) ((RedisSentinelConnection) delegate).actual(); + } else { + this.actual = (RedisStandaloneConnection) delegate; + } + } + + @Override + public RedisConnection exceptionHandler(Handler handler) { + return delegate.exceptionHandler(handler); + } + + @Override + public RedisConnection handler(@Nullable Handler handler) { + return delegate.handler(handler); + } + + @Override + public RedisConnection pause() { + return delegate.pause(); + } + + @Override + public RedisConnection resume() { + return delegate.resume(); + } + + @Override + public RedisConnection fetch(long amount) { + return delegate.fetch(amount); + } + + @Override + public RedisConnection endHandler(@Nullable Handler endHandler) { + return delegate.endHandler(endHandler); + } + + @Override + public Future<@Nullable Response> send(Request request) { + final RequestImpl req = (RequestImpl) request; + final Promise promise = vertx.promise(); + + // Result could possibly be cached, check cache first + if (req.isCacheable()) { + final Response cachedValue = cache.get(req.cacheKey()); + if (cachedValue != null) { + promise.complete(cachedValue); + return promise.future(); + } + } + + configureTracking() + .onFailure(promise::fail) + .onSuccess(v -> + delegate + .send(request) + .onFailure(promise::fail) + .onSuccess(resp -> { + if (req.isCacheable() && resp != null) { + cache.put(req.cacheKey(), resp); + } + promise.complete(resp); + })); + + return promise.future(); + } + + @Override + public Future> batch(List requests) { + // Currently we don't support batching because order can matter. If an earlier request in the + // batch changes the value that a later, cacheable, request reads, we can't guarantee we + // invalidate in time. Rather than risk returning unexpected or stale values, simple don't deal + // with caching during a batch. + return delegate.batch(requests); + } + + @Override + public Future close() { + return delegate.close(); + } + + @Override + public boolean pendingQueueFull() { + return delegate.pendingQueueFull(); + } + + private Future configureTracking() { + final Promise trackingPromise = vertx.promise(); + + if (actual.isTrackingConfigured()) { + trackingPromise.complete(); + } else { + actual.send(trackingCommand).onComplete(ar -> { + if (ar.succeeded()) { + actual.setTrackingConfigured(); + } + + trackingPromise.handle(ar.mapEmpty()); + }); + } + + return trackingPromise.future(); + } + + private static Request trackingCommand(int clientId, CachingRedisOptions options) { + final Request request = Request.cmd(Command.CLIENT) + .arg("TRACKING") + .arg("ON") + .arg("REDIRECT") + .arg(clientId); + + for (String prefix : options.getPrefixes()) { + request.arg("PREFIX").arg(prefix); + } + + if (ClientSideCacheMode.BROADCAST == options.getMode()) { + request.arg("BCAST"); + } + + return request; + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java b/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java index cb88983e..76c0e9ca 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisConnectionInternal.java @@ -38,6 +38,12 @@ public interface RedisConnectionInternal extends RedisConnection { */ boolean isTainted(); + /** + * Returns {@code true} if a {@link CachingRedis} client is being used, + * and tracking has been configured for this connection. + */ + boolean isTrackingConfigured(); + VertxInternal vertx(); /** diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java index c865236e..c0db145e 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java @@ -19,6 +19,10 @@ public RedisSentinelConnection(PooledRedisConnection connection, PooledRedisConn this.sentinel = sentinel; } + public RedisConnectionInternal actual() { + return connection.actual(); + } + @Override public RedisConnection exceptionHandler(Handler handler) { connection.exceptionHandler(handler); diff --git a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java index 3cf6bf3c..9bb36887 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java @@ -55,6 +55,7 @@ public class RedisStandaloneConnection implements RedisConnectionInternal, Parse private Runnable onEvict; private boolean closed = false; private boolean tainted = false; + private boolean trackingConfigured = false; public RedisStandaloneConnection(VertxInternal vertx, ContextInternal context, PoolConnector.Listener connectionListener, NetSocket netSocket, PoolOptions options, int maxWaitingHandlers, RedisURI uri, ClientMetrics metrics, TracingPolicy tracingPolicy) { //System.out.println("#" + this.hashCode()); @@ -78,6 +79,10 @@ synchronized void setValid() { tainted = false; } + synchronized void setTrackingConfigured() { + trackingConfigured = true; + } + @Override public void forceClose() { //System.out.println("forceClose()#" + this.hashCode()); @@ -91,6 +96,11 @@ public boolean isValid() { return !closed && (expiresAt <= 0 || System.currentTimeMillis() < expiresAt); } + @Override + public boolean isTrackingConfigured() { + return trackingConfigured; + } + @Override public Future close() { //System.out.println("close()#" + this.hashCode()); diff --git a/src/main/java/io/vertx/redis/client/impl/RequestImpl.java b/src/main/java/io/vertx/redis/client/impl/RequestImpl.java index e80328a6..b2344573 100644 --- a/src/main/java/io/vertx/redis/client/impl/RequestImpl.java +++ b/src/main/java/io/vertx/redis/client/impl/RequestImpl.java @@ -19,6 +19,7 @@ import io.vertx.redis.client.Command; import io.vertx.redis.client.Request; +import io.vertx.redis.client.impl.cache.CacheKey; import java.nio.charset.StandardCharsets; import java.util.*; @@ -140,6 +141,14 @@ public Request arg(Buffer arg) { return this; } + boolean isCacheable() { + return keys().size() == 1 && cmd.isReadOnly(args); + } + + CacheKey cacheKey() { + return new CacheKey(new String(keys().get(0), StandardCharsets.UTF_8)); + } + Buffer encode() { return encode(Buffer.buffer()); } diff --git a/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java b/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java new file mode 100644 index 00000000..d7d4d9f0 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/cache/CacheKey.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl.cache; + +import java.util.Objects; + +public class CacheKey { + + private final String key; + private final long createdAtMillis; + + public CacheKey(String key) { + this(key, System.currentTimeMillis()); + } + + public CacheKey(String key, long createdAtMillis) { + this.key = key; + this.createdAtMillis = createdAtMillis; + } + + public String getKey() { + return key; + } + + public long createdAt() { + return createdAtMillis; + } + + public long ageMillis() { + return System.currentTimeMillis() - createdAtMillis; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(key, cacheKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java b/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java new file mode 100644 index 00000000..8b0f9cc1 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/cache/LRURedisClientCache.java @@ -0,0 +1,101 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.impl.cache; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.redis.client.CachingRedisOptions; +import io.vertx.redis.client.spi.RedisClientCache; +import io.vertx.redis.client.Response; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRURedisClientCache implements RedisClientCache { + + private final Store store; + private final long maxAgeMillis; + + public LRURedisClientCache(CachingRedisOptions options) { + this.store = new Store(options.getMaxCacheSize()); + this.maxAgeMillis = options.getMaxAgeUnit().toMillis(options.getMaxAge()); + } + + @Override + public @Nullable Response get(CacheKey key) { + final CacheEntry entry = store.get(key); + + if (entry == null) { + return null; + } + + if (entry.getAgeMillis() > maxAgeMillis) { + store.remove(key); + return null; + } + + return entry.response; + } + + @Override + public void put(CacheKey key, Response value) { + store.put(key, new CacheEntry(value)); + } + + @Override + public void delete(CacheKey key) { + store.remove(key); + } + + @Override + public void flush() { + store.clear(); + } + + @Override + public void close() { + flush(); + } + + static final class Store extends LinkedHashMap { + + private final int maxSize; + + Store(int maxSize) { + super(maxSize, 0.75f, true); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } + } + + static final class CacheEntry { + + private final Response response; + private final long insertTime; + + CacheEntry(Response response) { + this.response = response; + this.insertTime = System.currentTimeMillis(); + } + + long getAgeMillis() { + return System.currentTimeMillis() - insertTime; + } + } +} diff --git a/src/main/java/io/vertx/redis/client/spi/RedisClientCache.java b/src/main/java/io/vertx/redis/client/spi/RedisClientCache.java new file mode 100644 index 00000000..851bb18f --- /dev/null +++ b/src/main/java/io/vertx/redis/client/spi/RedisClientCache.java @@ -0,0 +1,60 @@ +/* + * Copyright 2019 Red Hat, Inc. + *

+ * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + *

+ * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + *

+ * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + *

+ * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.redis.client.spi; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.redis.client.Response; +import io.vertx.redis.client.impl.CachingRedis; +import io.vertx.redis.client.impl.cache.CacheKey; + +/** + * A client-side cache store used by a {@link CachingRedis}. + */ +public interface RedisClientCache { + + /** + * Get an item if it exists. + * + * @param key the key to get + * @return the item, or null if it does not exist + */ + @Nullable Response get(CacheKey key); + + /** + * Put an item, replacing anything that may exist. + * + * @param key the key + * @param value the value + */ + void put(CacheKey key, Response value); + + /** + * Remove an item if it exists. + * + * @param key the key to remove + */ + void delete(CacheKey key); + + /** + * Invalidate or remove all items, emptying the cache. + */ + void flush(); + + /** + * Close the cache and cleanup any resources. + */ + void close(); +} diff --git a/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java b/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java new file mode 100644 index 00000000..19fa16e7 --- /dev/null +++ b/src/test/java/io/vertx/redis/client/test/CachingRedisTest.java @@ -0,0 +1,64 @@ +package io.vertx.redis.client.test; + +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.impl.CachingRedis; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.GenericContainer; + +@RunWith(VertxUnitRunner.class) +public class CachingRedisTest { + + @ClassRule + public static final GenericContainer container = new GenericContainer<>("redis:7") + .withExposedPorts(6379); + + @Rule + public final RunTestOnContext rule = new RunTestOnContext(); + + @Test + public void testCacheWrapping(TestContext should) { + Redis redis = Redis.createClient(rule.vertx(), new RedisOptions() + .setConnectionString("redis://" + container.getHost() + ":" + container.getFirstMappedPort()) + .setCacheEnabled(true)); + + should.assertTrue(redis instanceof CachingRedis); + } + + @Test + public void simpleTest(TestContext should) { + final Async test = should.async(); + + Redis.createClient(rule.vertx(), new RedisOptions() + .setConnectionString("redis://" + container.getHost() + ":" + container.getFirstMappedPort()) + .setCacheEnabled(true)) + .send(Request.cmd(Command.PING)).onComplete(send -> { + should.assertTrue(send.succeeded()); + should.assertNotNull(send.result()); + should.assertEquals("PONG", send.result().toString()); + test.complete(); + }); + } + + @Test + public void preservesContext(TestContext should) { + Redis redis = Redis.createClient(rule.vertx(), new RedisOptions() + .setConnectionString("redis://" + container.getHost() + ":" + container.getFirstMappedPort()) + .setCacheEnabled(true)); + + PreservesContext.sendWithoutConnect(redis, should); + PreservesContext.batchWithoutConnect(redis, should); + PreservesContext.connect(redis, should); + PreservesContext.connectThenSend(redis, should); + PreservesContext.connectThenBatch(redis, should); + } +} diff --git a/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java b/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java new file mode 100644 index 00000000..355a3420 --- /dev/null +++ b/src/test/java/io/vertx/test/redis/CachingRedisClientTest.java @@ -0,0 +1,126 @@ +package io.vertx.test.redis; + +import io.vertx.core.Context; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.impl.CachingRedis; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.Response; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.GenericContainer; + +import java.util.UUID; + +@RunWith(VertxUnitRunner.class) +public class CachingRedisClientTest { + + @ClassRule + public static final GenericContainer container = new GenericContainer<>("redis:6.0.6") + .withExposedPorts(6379); + + @Rule + public final RunTestOnContext rule = new RunTestOnContext(); + + private Redis client; + private RedisAPI api; + + @Before + public void before(TestContext should) { + final Async before = should.async(); + + Context context = rule.vertx().getOrCreateContext(); + client = Redis.createClient(rule.vertx(), new RedisOptions() + .setConnectionString("redis://" + container.getHost() + ":" + container.getFirstMappedPort()) + .setCacheEnabled(true)); + + client.connect().onComplete(onConnect -> { + should.assertTrue(onConnect.succeeded()); + should.assertEquals(context, rule.vertx().getOrCreateContext()); + api = RedisAPI.api(client); + before.complete(); + }); + } + + @After + public void after() { + client.close(); + } + + @Test + public void testSimpleSetAndGet(TestContext should) { + final Async test = should.async(); + final String key = UUID.randomUUID().toString(); + + api + .send(Command.SET, key, "hello, world") + .onComplete(setResponse -> { + should.assertTrue(setResponse.succeeded()); + + api + .get(key) + .onComplete(get1Response -> { + should.assertTrue(get1Response.succeeded()); + + final Response response = get1Response.result(); + + should.assertEquals(response.toString(), "hello, world"); + + // The cache impl stores the actual object, so we can assert it's from cache by + // asserting a subsequent get returns the same object + api + .get(key) + .onComplete(get2Response -> { + should.assertTrue(get2Response.succeeded()); + should.assertEquals(get2Response.result().toString(), "hello, world"); + should.assertTrue(get2Response.result() == response); + test.complete(); + }); + }); + }); + } + + @Test + public void testInvalidation(TestContext should) { + final Async test = should.async(); + final String key = UUID.randomUUID().toString(); + + ((CachingRedis) client).invalidationHandler(keys -> { + should.assertEquals(keys.size(), 1); + test.complete(); + }); + + api.send(Command.SET, key, "hello, world").onComplete(set1Response -> { + should.assertTrue(set1Response.succeeded()); + + api.get(key).onComplete(get1Response -> { + should.assertTrue(get1Response.succeeded()); + + final Response response = get1Response.result(); + + should.assertEquals(response.toString(), "hello, world"); + + // The cache impl stores the actual object, so we can assert it's from cache by + // asserting a subsequent get returns the same object + api.get(key).onComplete(get2Response -> { + should.assertTrue(get2Response.succeeded()); + should.assertEquals(get2Response.result().toString(), "hello, world"); + should.assertTrue(get2Response.result() == response); + + api.send(Command.SET, key, "new value").onComplete(set2Response -> { + should.assertTrue(set2Response.succeeded()); + }); + }); + }); + }); + } +}