Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-side caching support #402

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Redis has a rich API and it can be organized in the following groups:
* Strings - Commands to work with Strings.
* Transactions - Commands to handle transaction lifecycle.
* Streams - Commands to handle streaming.
* Client-side caching - Commands to control client-side caching.

== Using Vert.x-Redis

Expand Down Expand Up @@ -182,9 +183,54 @@ And from another place in the code publish messages to the queue:
----

NOTE: It is important to remember that the commands `SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE` and `PUNSUBSCRIBE` are `void`.
This means that the result in case of success is `null` not a instance of response.
This means that the result in case of success is `null` not an instance of response.
All messages are then routed through the handler on the client.

== Client-side Caching

Redis supports client-side caching implementations using a strategy called _Tracking_.

All modes of the client support caching except connections that are in pub/sub mode.

To create a client with client-side caching, one would do:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching1}
----

A specific interface, `CachingRedis`, is exposed for a caching client that allows an invalidation handler to be attached or a flush command to be issued.
The invalidation handler will be invoked with all the keys being invalidated whenever a message is received on the invalidations connection.

To attach an invalidations handler:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching2}
----

To manually flush the client's cache store:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching3}
----

The client comes with a default cache store out of the box, but you can write your own if you prefer.

The implementations are expected to follow the `ServiceLoader` conventions and all stores that are available at runtime from the classpath will be exposed.
When more than 1 implementation is available the first one that can be instantiated and configured with success becomes the default.
If none is available, then the default is a simple Least-Recently-Used (LRU) cache backed by a `LinkedHashMap`.

[source,$lang]
----
{@link examples.RedisExamples#clientCaching4}
----

NOTE: The cache is not a write-through cache. A value will not be stored in the client-side cache until the value is fetched from Redis for the first time.
To avoid write-then-read race conditions within the same batch, read commands that are part of a batch will not check the cache first.
Additionally, the current implementation does not support the `OPTIN` or `NOLOOP` options.

== Tracing commands

The Redis client can trace command execution when Vert.x has tracing enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.vertx.redis.client;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

/**
* Converter and mapper for {@link io.vertx.redis.client.CachingRedisOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.redis.client.CachingRedisOptions} original class using Vert.x codegen.
*/
public class CachingRedisOptionsConverter {


private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, CachingRedisOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "enabled":
if (member.getValue() instanceof Boolean) {
obj.setEnabled((Boolean)member.getValue());
}
break;
case "maxCacheSize":
if (member.getValue() instanceof Number) {
obj.setMaxCacheSize(((Number)member.getValue()).intValue());
}
break;
case "maxAge":
if (member.getValue() instanceof Number) {
obj.setMaxAge(((Number)member.getValue()).longValue());
}
break;
case "maxAgeUnit":
if (member.getValue() instanceof String) {
obj.setMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "mode":
if (member.getValue() instanceof String) {
obj.setMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue()));
}
break;
case "prefixes":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
list.add((String)item);
});
obj.setPrefixes(list);
}
break;
case "prefix":
if (member.getValue() instanceof String) {
obj.setPrefix((String)member.getValue());
}
break;
case "prefixs":
if (member.getValue() instanceof JsonArray) {
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
obj.addPrefix((String)item);
});
}
break;
}
}
}

public static void toJson(CachingRedisOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(CachingRedisOptions obj, java.util.Map<String, Object> json) {
json.put("enabled", obj.getEnabled());
json.put("maxCacheSize", obj.getMaxCacheSize());
json.put("maxAge", obj.getMaxAge());
if (obj.getMaxAgeUnit() != null) {
json.put("maxAgeUnit", obj.getMaxAgeUnit().name());
}
if (obj.getMode() != null) {
json.put("mode", obj.getMode().name());
}
if (obj.getPrefixes() != null) {
JsonArray array = new JsonArray();
obj.getPrefixes().forEach(item -> array.add(item));
json.put("prefixes", array);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,56 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setPoolName((String)member.getValue());
}
break;
case "cachingOptions":
break;
case "cacheEnabled":
if (member.getValue() instanceof Boolean) {
obj.setCacheEnabled((Boolean)member.getValue());
}
break;
case "cacheMaxSize":
if (member.getValue() instanceof Number) {
obj.setCacheMaxSize(((Number)member.getValue()).intValue());
}
break;
case "cacheMaxAge":
if (member.getValue() instanceof Number) {
obj.setCacheMaxAge(((Number)member.getValue()).longValue());
}
break;
case "cacheMaxAgeUnit":
if (member.getValue() instanceof String) {
obj.setCacheMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "cacheMode":
if (member.getValue() instanceof String) {
obj.setCacheMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue()));
}
break;
case "cachePrefixes":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
list.add((String)item);
});
obj.setCachePrefixes(list);
}
break;
case "cachePrefix":
if (member.getValue() instanceof String) {
obj.setCachePrefix((String)member.getValue());
}
break;
case "cachePrefixs":
if (member.getValue() instanceof JsonArray) {
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
obj.addCachePrefix((String)item);
});
}
break;
}
}
}
Expand Down Expand Up @@ -171,5 +221,19 @@ public static void toJson(RedisOptions obj, java.util.Map<String, Object> json)
if (obj.getPoolName() != null) {
json.put("poolName", obj.getPoolName());
}
json.put("cacheEnabled", obj.getCacheEnabled());
json.put("cacheMaxSize", obj.getCacheMaxSize());
json.put("cacheMaxAge", obj.getCacheMaxAge());
if (obj.getCacheMaxAgeUnit() != null) {
json.put("cacheMaxAgeUnit", obj.getCacheMaxAgeUnit().name());
}
if (obj.getCacheMode() != null) {
json.put("cacheMode", obj.getCacheMode().name());
}
if (obj.getCachePrefixes() != null) {
JsonArray array = new JsonArray();
obj.getCachePrefixes().forEach(item -> array.add(item));
json.put("cachePrefixes", array);
}
}
}
88 changes: 87 additions & 1 deletion src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package examples;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.redis.client.*;

import io.vertx.redis.client.impl.CachingRedis;
import io.vertx.redis.client.impl.CachingRedisClient;
import io.vertx.redis.client.impl.RedisClient;
import io.vertx.redis.client.impl.cache.CacheKey;
import io.vertx.redis.client.spi.RedisClientCache;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -143,7 +151,7 @@ private Future<RedisConnection> createRedisClient() {

// make sure to invalidate old connection if present
if (redis != null) {
redis.close();;
redis.close();
}

if (CONNECTING.compareAndSet(false, true)) {
Expand Down Expand Up @@ -246,4 +254,82 @@ public void example13(Vertx vertx) {
public void tracing1(RedisOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void clientCaching1(Vertx vertx) {
Redis.createClient(
vertx,
new RedisOptions()
.setCacheEnabled(true)
.setCacheMaxSize(256)
.setCacheMaxAge(60_000))
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from a local in-memory cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}

public void clientCaching2(Redis redis) {
CachingRedis cachingClient = (CachingRedis) redis;

cachingClient.invalidationHandler(keys -> {
// something...
});
}

public void clientCaching3(Redis redis) {
CachingRedis cachingClient = (CachingRedis) redis;

cachingClient.flush().onSuccess(ignored -> {
// Success!
});
}

public void clientCaching4(Vertx vertx, RedisClientCache customCache) {

// Register this class in META-INF/services/io.vertx.redis.client.spi.RedisClientCache
class CustomCache implements RedisClientCache {

private final Map<CacheKey, Response> store = new HashMap<>();

@Override
public @Nullable Response get(CacheKey key) {
return store.get(key);
}

@Override
public void put(CacheKey key, Response value) {
store.put(key, value);
}

@Override
public void delete(CacheKey key) {
store.remove(key);
}

@Override
public void flush() {
store.clear();
}

@Override
public void close() {
// Nothing to do here
}
}

Redis.createClient(
vertx,
new RedisOptions()
.setCacheEnabled(true))
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from the custom cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}
}
Loading