Skip to content

Commit

Permalink
client-side caching with a new client impl and connection wrapper
Browse files Browse the repository at this point in the history
simplify cache impls; some javadocs and tests

more javadocs; invalidation handler on client

basic support for max age in default impls

initial PR feedback; naming consistencies

checkpoint on wrapped connection

push tracking status down to actual conn impl

simplify cache impls to just LRU backed by LinkedHashMap

cleanup client wrapper; test simple operation

docs update
  • Loading branch information
craig-day committed Sep 21, 2023
1 parent a3571b4 commit fc7ce99
Show file tree
Hide file tree
Showing 18 changed files with 1,338 additions and 4 deletions.
28 changes: 27 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Redis has a rich API and it can be organized in the following groups:
* Strings - Commands to work with Strings.
* Transactions - Commands to handle transaction lifecycle.
* Streams - Commands to handle streaming.
* Client-side caching - Commands to control client-side caching.
== Using Vert.x-Redis

Expand Down Expand Up @@ -182,9 +183,34 @@ And from another place in the code publish messages to the queue:
----

NOTE: It is important to remember that the commands `SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE` and `PUNSUBSCRIBE` are `void`.
This means that the result in case of success is `null` not a instance of response.
This means that the result in case of success is `null` not an instance of response.
All messages are then routed through the handler on the client.

== Client-side Caching

Redis supports client-side caching implementations using a strategy called _Tracking_.

All modes of the client support caching except connections that are in pub/sub mode.

To create a client with client-side caching, one would do:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching1}
----

The default implementation will use a simple Least-Recently-Used (LFU) cache backed by a `LinkedHashMap`.
You can also provide your own implementation of the cache:

[source,$lang]
----
{@link examples.RedisExamples#clientCaching2}
----

NOTE: The cache is not a write-through cache. A value will not be stored in the client-side cache until the value is fetched from Redis for the first time.
To avoid write-then-read race conditions within the same batch, read commands that are part of a batch will not check the cache first.
Additionally, the current implementation does not support the `OPTIN` or `NOLOOP` options.

== Tracing commands

The Redis client can trace command execution when Vert.x has tracing enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.vertx.redis.client;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;

/**
* Converter and mapper for {@link io.vertx.redis.client.CachingRedisOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.redis.client.CachingRedisOptions} original class using Vert.x codegen.
*/
public class CachingRedisOptionsConverter {


private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, CachingRedisOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "maxAge":
if (member.getValue() instanceof Number) {
obj.setMaxAge(((Number)member.getValue()).longValue());
}
break;
case "maxAgeUnit":
if (member.getValue() instanceof String) {
obj.setMaxAgeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "maxCacheSize":
if (member.getValue() instanceof Number) {
obj.setMaxCacheSize(((Number)member.getValue()).intValue());
}
break;
case "mode":
if (member.getValue() instanceof String) {
obj.setMode(io.vertx.redis.client.ClientSideCacheMode.valueOf((String)member.getValue()));
}
break;
case "prefix":
if (member.getValue() instanceof String) {
obj.setPrefix((String)member.getValue());
}
break;
case "prefixes":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
list.add((String)item);
});
obj.setPrefixes(list);
}
break;
case "prefixs":
if (member.getValue() instanceof JsonArray) {
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof String)
obj.addPrefix((String)item);
});
}
break;
}
}
}

public static void toJson(CachingRedisOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(CachingRedisOptions obj, java.util.Map<String, Object> json) {
json.put("maxAge", obj.getMaxAge());
if (obj.getMaxAgeUnit() != null) {
json.put("maxAgeUnit", obj.getMaxAgeUnit().name());
}
json.put("maxCacheSize", obj.getMaxCacheSize());
if (obj.getMode() != null) {
json.put("mode", obj.getMode().name());
}
if (obj.getPrefixes() != null) {
JsonArray array = new JsonArray();
obj.getPrefixes().forEach(item -> array.add(item));
json.put("prefixes", array);
}
}
}
26 changes: 25 additions & 1 deletion src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Future<RedisConnection> createRedisClient() {

// make sure to invalidate old connection if present
if (redis != null) {
redis.close();;
redis.close();
}

if (CONNECTING.compareAndSet(false, true)) {
Expand Down Expand Up @@ -246,4 +246,28 @@ public void example13(Vertx vertx) {
public void tracing1(RedisOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void clientCaching1(Vertx vertx) {
CachingRedis
.create(vertx)
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from a local in-memory cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}

public void clientCaching2(Vertx vertx, RedisClientCache customCache) {
CachingRedis
.create(vertx, customCache)
.connect()
.onSuccess(conn -> {
// get the value for a key, returning from the custom cache if
// it exists, or fetching from Redis if not. if the value is fetched from
// Redis, it will be stored in the local cache
conn.send(Request.cmd(Command.GET).arg("key"));
});
}
}
123 changes: 123 additions & 0 deletions src/main/java/io/vertx/redis/client/CachingRedis.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2019 Red Hat, Inc.
* <p>
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* <p>
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* <p>
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
* <p>
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.redis.client;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.redis.client.impl.CachingRedisClient;
import io.vertx.redis.client.impl.cache.CacheKey;

import java.util.Collection;

/**
* A {@link Redis} client wrapper that implements client-side caching.
*
* @see <a href="https://redis.io/docs/manual/client-side-caching/">Client-side caching in Redis</a>
*/
public interface CachingRedis extends Redis {

/**
* Create a new caching client using default client and cache options.
*
* @param vertx the vertx instance
* @return the caching client
*/
static CachingRedis create(Vertx vertx) {
return create(vertx, Redis.createClient(vertx));
}

/**
* Create a new caching client wrapping an existing redis client with default caching options.
*
* @param vertx the vertx instance
* @param redis the redis client to wrap
* @return the caching client
*/
static CachingRedis create(Vertx vertx, Redis redis) {
return create(vertx, redis, RedisClientCache.lru(new CachingRedisOptions()));
}

/**
* Create a new caching client using default client and cache options, backed by a given cache.
*
* @param vertx the vertx instance
* @param cache the backing cache
* @return the caching client
*/
static CachingRedis create(Vertx vertx, RedisClientCache cache) {
return create(vertx, Redis.createClient(vertx), cache);
}

/**
* Create a new caching client wrapping an existing redis client and backed by a given cache.
*
* @param vertx the vertx instance
* @param redis the redis client to wrap
* @param cache the backing cache
* @return the caching client
*/
static CachingRedis create(Vertx vertx, Redis redis, RedisClientCache cache) {
return create(vertx, redis, cache, new CachingRedisOptions());
}

/**
* Create a new caching client wrapping an existing redis client and using the given cache options.
*
* @param vertx the vertx instance
* @param redis the redis client to wrap
* @param options the cache options
* @return the caching client
*/
static CachingRedis create(Vertx vertx, Redis redis, CachingRedisOptions options) {
return create(vertx, redis, RedisClientCache.lru(options), options);
}

/**
* Create a new caching client wrapping an existing redis client, using the given cache and cache options.
*
* @param vertx the vertx instance
* @param redis the redis client to wrap
* @param cache the backing cache
* @param options the cache options
* @return the caching client
*/
static CachingRedis create(Vertx vertx, Redis redis, RedisClientCache cache, CachingRedisOptions options) {
return new CachingRedisClient(vertx, redis, cache, options);
}

/**
* Flush the local cache.
*
* <p>
* This operation only clears the local cache and has no interaction with the server.
*
* @return a future indicating the status of the operation
*/
Future<Void> flush();

/**
* Set a handler to be called when invalidation is performed.
*
* <p>
* The client will clear the keys before this handler is invoked. It is not recommended to modify
* the cache as a part of this handler. The primary function is for instrumentation.
*
* @param handler a handler that accepts the keys which were invalidated
* @return fluent self
*/
CachingRedis invalidationHandler(Handler<Collection<CacheKey>> handler);
}
Loading

0 comments on commit fc7ce99

Please sign in to comment.