Skip to content

Commit

Permalink
client-side caching with a new client impl and connection wrapper
Browse files Browse the repository at this point in the history
simplify cache impls; some javadocs and tests

more javadocs; invalidation handler on client

basic support for max age in default impls

initial PR feedback; naming consistencies

checkpoint on wrapped connection

push tracking status down to actual conn impl

simplify cache impls to just LRU backed by LinkedHashMap

cleanup client wrapper; test simple operation

docs update

make caching configured via options, not an explicit wrapper

regenerate options converters
  • Loading branch information
craig-day committed Sep 21, 2023
1 parent cbe76df commit ff1f481
Show file tree
Hide file tree
Showing 20 changed files with 1,595 additions and 8 deletions.
48 changes: 47 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Redis has a rich API and it can be organized in the following groups:
* Strings - Commands to work with Strings.
* Transactions - Commands to handle transaction lifecycle.
* Streams - Commands to handle streaming.
* Client-side caching - Commands to control client-side caching.
== Using Vert.x-Redis

Expand Down Expand Up @@ -182,9 +183,54 @@ And from another place in the code publish messages to the queue:
----

NOTE: It is important to remember that the commands `SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE` and `PUNSUBSCRIBE` are `void`.
This means that the result in case of success is `null` not a instance of response.
This means that the result in case of success is `null` not an instance of response.
All messages are then routed through the handler on the client.

== Client-side Caching

Redis supports client-side caching implementations using a strategy called _Tracking_.

All modes of the client support caching except connections that are in pub/sub mode.

To create a client with client-side caching, one would do:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching1}
----

A specific interface, `CachingRedis`, is exposed for a caching client that allows an invalidation handler to be attached or a flush command to be issued.
The invalidation handler will be invoked with all the keys being invalidated whenever a message is received on the invalidations connection.

To attach an invalidations handler:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching2}
----

To manually flush the client's cache store:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching3}
----

The client comes with a default cache store out of the box, but you can write your own if you prefer.

The implementations are expected to follow the `ServiceLoader` conventions and all stores that are available at runtime from the classpath will be exposed.
When more than 1 implementation is available the first one that can be instantiated and configured with success becomes the default.
If none is available, then the default is a simple Least-Recently-Used (LRU) cache backed by a `LinkedHashMap`.

[source,$lang]
----
{@link examples.RedisExamples#clientCaching4}
----

NOTE: The cache is not a write-through cache. A value will not be stored in the client-side cache until the value is fetched from Redis for the first time.
To avoid write-then-read race conditions within the same batch, read commands that are part of a batch will not check the cache first.
Additionally, the current implementation does not support the `OPTIN` or `NOLOOP` options.

== Tracing commands

The Redis client can trace command execution when Vert.x has tracing enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.vertx.redis.client;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

/**
* Converter and mapper for {@link io.vertx.redis.client.CachingRedisOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.redis.client.CachingRedisOptions} original class using Vert.x codegen.
*/
public class CachingRedisOptionsConverter {


private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, CachingRedisOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "enabled":
if (member.getValue() instanceof Boolean) {
obj.setEnabled((Boolean)member.getValue());
}
break;
case "maxCacheSize":
if (member.getValue() instanceof Number) {
obj.setMaxCacheSize(((Number)member.getValue()).intValue());
}
break;
case "maxAge":
if (member.getValue() instanceof Number) {
obj.setMaxAge(((Number)member.getValue()).longValue());
}
break;
case "maxAgeUnit":
if (member.getValue() instanceof String) {
obj.setMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "mode":
if (member.getValue() instanceof String) {
obj.setMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue()));
}
break;
case "prefixes":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
list.add((String)item);
});
obj.setPrefixes(list);
}
break;
case "prefix":
if (member.getValue() instanceof String) {
obj.setPrefix((String)member.getValue());
}
break;
case "prefixs":
if (member.getValue() instanceof JsonArray) {
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
obj.addPrefix((String)item);
});
}
break;
}
}
}

public static void toJson(CachingRedisOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(CachingRedisOptions obj, java.util.Map<String, Object> json) {
json.put("enabled", obj.getEnabled());
json.put("maxCacheSize", obj.getMaxCacheSize());
json.put("maxAge", obj.getMaxAge());
if (obj.getMaxAgeUnit() != null) {
json.put("maxAgeUnit", obj.getMaxAgeUnit().name());
}
if (obj.getMode() != null) {
json.put("mode", obj.getMode().name());
}
if (obj.getPrefixes() != null) {
JsonArray array = new JsonArray();
obj.getPrefixes().forEach(item -> array.add(item));
json.put("prefixes", array);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,56 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setPoolName((String)member.getValue());
}
break;
case "cachingOptions":
break;
case "cacheEnabled":
if (member.getValue() instanceof Boolean) {
obj.setCacheEnabled((Boolean)member.getValue());
}
break;
case "cacheMaxSize":
if (member.getValue() instanceof Number) {
obj.setCacheMaxSize(((Number)member.getValue()).intValue());
}
break;
case "cacheMaxAge":
if (member.getValue() instanceof Number) {
obj.setCacheMaxAge(((Number)member.getValue()).longValue());
}
break;
case "cacheMaxAgeUnit":
if (member.getValue() instanceof String) {
obj.setCacheMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "cacheMode":
if (member.getValue() instanceof String) {
obj.setCacheMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue()));
}
break;
case "cachePrefixes":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
list.add((String)item);
});
obj.setCachePrefixes(list);
}
break;
case "cachePrefix":
if (member.getValue() instanceof String) {
obj.setCachePrefix((String)member.getValue());
}
break;
case "cachePrefixs":
if (member.getValue() instanceof JsonArray) {
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
obj.addCachePrefix((String)item);
});
}
break;
}
}
}
Expand Down Expand Up @@ -171,5 +221,19 @@ public static void toJson(RedisOptions obj, java.util.Map<String, Object> json)
if (obj.getPoolName() != null) {
json.put("poolName", obj.getPoolName());
}
json.put("cacheEnabled", obj.getCacheEnabled());
json.put("cacheMaxSize", obj.getCacheMaxSize());
json.put("cacheMaxAge", obj.getCacheMaxAge());
if (obj.getCacheMaxAgeUnit() != null) {
json.put("cacheMaxAgeUnit", obj.getCacheMaxAgeUnit().name());
}
if (obj.getCacheMode() != null) {
json.put("cacheMode", obj.getCacheMode().name());
}
if (obj.getCachePrefixes() != null) {
JsonArray array = new JsonArray();
obj.getCachePrefixes().forEach(item -> array.add(item));
json.put("cachePrefixes", array);
}
}
}
88 changes: 87 additions & 1 deletion src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package examples;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.redis.client.*;

import io.vertx.redis.client.impl.CachingRedis;
import io.vertx.redis.client.impl.CachingRedisClient;
import io.vertx.redis.client.impl.RedisClient;
import io.vertx.redis.client.impl.cache.CacheKey;
import io.vertx.redis.client.spi.RedisClientCache;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -143,7 +151,7 @@ private Future<RedisConnection> createRedisClient() {

// make sure to invalidate old connection if present
if (redis != null) {
redis.close();;
redis.close();
}

if (CONNECTING.compareAndSet(false, true)) {
Expand Down Expand Up @@ -246,4 +254,82 @@ public void example13(Vertx vertx) {
public void tracing1(RedisOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void clientCaching1(Vertx vertx) {
Redis.createClient(
vertx,
new RedisOptions()
.setCacheEnabled(true)
.setCacheMaxSize(256)
.setCacheMaxAge(60_000))
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from a local in-memory cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}

public void clientCaching2(Redis redis) {
CachingRedis cachingClient = (CachingRedis) redis;

cachingClient.invalidationHandler(keys -> {
// something...
});
}

public void clientCaching3(Redis redis) {
CachingRedis cachingClient = (CachingRedis) redis;

cachingClient.flush().onSuccess(ignored -> {
// Success!
});
}

public void clientCaching4(Vertx vertx, RedisClientCache customCache) {

// Register this class in META-INF/services/io.vertx.redis.client.spi.RedisClientCache
class CustomCache implements RedisClientCache {

private final Map<CacheKey, Response> store = new HashMap<>();

@Override
public @Nullable Response get(CacheKey key) {
return store.get(key);
}

@Override
public void put(CacheKey key, Response value) {
store.put(key, value);
}

@Override
public void delete(CacheKey key) {
store.remove(key);
}

@Override
public void flush() {
store.clear();
}

@Override
public void close() {
// Nothing to do here
}
}

Redis.createClient(
vertx,
new RedisOptions()
.setCacheEnabled(true))
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from the custom cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}
}
Loading

0 comments on commit ff1f481

Please sign in to comment.