Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache fallback layer to RPC call #333

Open
seemayerpeter opened this issue Jun 4, 2021 · 1 comment
Open

Cache fallback layer to RPC call #333

seemayerpeter opened this issue Jun 4, 2021 · 1 comment

Comments

@seemayerpeter
Copy link

I'd like some help implementing a caching layer into the Router as a fallback if the callee is not registered at the moment of an RPC call.

The problem is as follows:
Client -> Router -> Callee (RPC)
Callee gives data to client.
Callee goes offline.
Client tries to call RPC, gets „procedure not registered”.

Basically if the procedure is not found, it would try to get the previous calls’ data from Redis/some other cache instead of throwing a „procedure not registered” right away. I’ve tried following the inner workings of the router by debugging it, however I do not think it is conventionally possible to implement it this way.

So what I would basically want to achieve is:
Client -> Router ->Callee
Callee->Router -> Cache data here -> caller
Callee goes offline
Client- >router->procedure not found -> go to cache -> cache data found -> return cache data instead.

I know it would be possible to implement this at the callers end, however it would be a lot simpler and a lot more efficient if it could be implemented on the router's side.

@darkl
Copy link
Member

darkl commented Jun 5, 2021

Hi Péter,
Thanks for opening this issue. It is possible to implement this on the router side, but it will require writing a lot of code, mainly due to the fact that there are 3 signatures for the CALL \ INVOKE \ YIELD \ RESULT \ ERROR messages in WAMP.

The point of extension is the IWampRealmContainer interface passed to the constructor of WampHost.

We initialize the WampHost with a custom IWampRealmContainer by calling

WampHost wampHost = new WampHost(new RpcCacheWampRealmContainer());

Next we need to do some ceremony and define a bunch of implementations for all the interfaces in the hierarchy.

public class RpcCacheWampRealmContainer : IWampRealmContainer
{
    private readonly IWampRealmContainer mContainer;

    public RpcCacheWampRealmContainer() : this(new WampRealmContainer())
    {
    }

    public RpcCacheWampRealmContainer(IWampRealmContainer container)
    {
        mContainer = container;
    }

    public IWampRealm GetRealmByName(string name)
    {
        IWampRealm realm = mContainer.GetRealmByName(name);
        RpcCacheWampRealm wrapped = new RpcCacheWampRealm(realm);
        return wrapped;
    }
}

public class RpcCacheWampRealm : IWampRealm
{
    private readonly IWampRealm mRealm;
    private readonly IWampRpcOperationCatalog mRpcCacheOperationCatalog;

    public RpcCacheWampRealm(IWampRealm realm)
    {
        mRealm = realm;
        mRpcCacheOperationCatalog = new RpcCacheWampRpcOperationCatalog(mRealm.RpcCatalog);
    }

    public string Name => mRealm.Name;

    public IWampRpcOperationCatalog RpcCatalog => mRpcCacheOperationCatalog;

    public IWampTopicContainer TopicContainer => mRealm.TopicContainer;
}

Now the magic happens in our implementation of RpcCacheWampRpcOperationCatalog. We will check if there is an operation available. If there is such operation, we will call it and cache the result. If not, we will check if there is something available in our cache and return it as a result.

In order to make my life simpler, I will only show how to implement this when your function takes no arguments. In order to take care of the case where your function takes arguments, you will need to create some struct that stores them and write some equality comparer code.

We introduce some cache interface for our purpose:

public interface IRpcInvocationCache
{
    void MapInvocationToResult(string procedure,
                               ResultArguments resultArguments);

    bool TryGetInvocationResult(string procedure,
                                out ResultArguments value);
}

public class ResultArguments
{
    public YieldOptions YieldOptions { get; }
    public object[] Arguments { get; }
    public IDictionary<string, object> ArgumentsKeywords { get; }

    public ResultArguments(YieldOptions yieldOptions,
                           object[] arguments = null,
                           IDictionary<string, object> argumentsKeywords = null)
    {
        YieldOptions = yieldOptions;
        Arguments = arguments;
        ArgumentsKeywords = argumentsKeywords;
    }

    public static ResultArguments FromArguments<TMessage>
    (YieldOptions yieldOptions,
     TMessage[] arguments = null,
     IDictionary<string, TMessage> argumentsKeywords = null)
    {
        return new ResultArguments(yieldOptions,
                                   arguments?.Cast<object>().ToArray(),
                                   argumentsKeywords?.ToDictionary(x => x.Key,
                                                                   x => (object) x.Value));
    }
}

public class LocalRpcInvocationCache : IRpcInvocationCache
{
    private readonly ConcurrentDictionary<string, ResultArguments> mProcedureUriToResult = 
        new ConcurrentDictionary<string, ResultArguments>();

    public void MapInvocationToResult(string procedure, ResultArguments resultArguments)
    {
        mProcedureUriToResult[procedure] = resultArguments;
    }

    public bool TryGetInvocationResult(string procedure, out ResultArguments value)
    {
        return mProcedureUriToResult.TryGetValue(procedure, out value);
    }
}

Next we implement RpcCacheWampRpcOperationCatalog, which implements IWampRpcOperationCatalog, so that it will use the cache:

public class RpcCacheWampRpcOperationCatalog : IWampRpcOperationCatalog
{
    private readonly IWampRpcOperationCatalog mRpcOperationCatalog;
    private readonly IRpcInvocationCache mInvocationCache = new LocalRpcInvocationCache();

    public RpcCacheWampRpcOperationCatalog(IWampRpcOperationCatalog rpcOperationCatalog)
    {
        mRpcOperationCatalog = rpcOperationCatalog;
    }

    public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
                                                       InvocationDetails details, string procedure)
    {
        if (mRpcOperationCatalog.GetMatchingOperation(procedure) != null)
        {
            return mRpcOperationCatalog.Invoke(new RpcCacheWampRawOperationRouterCallback(caller, procedure, mInvocationCache),
                                               formatter, details, procedure);
        }
        else
        {
            if (!mInvocationCache.TryGetInvocationResult(procedure, out ResultArguments resultArguments))
            {
                // Call "base" to throw an exception.
                return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure);
            }
            else
            {
                if (resultArguments.ArgumentsKeywords != null)
                {
                    caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions,
                                  resultArguments.Arguments, resultArguments.ArgumentsKeywords);
                }
                else if (resultArguments.Arguments != null)
                {
                    caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions,
                                  resultArguments.Arguments);
                }
                else
                {
                    caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions);
                }

                return null;
            }
        }
    }

    public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
                                                       InvocationDetails details, string procedure, TMessage[] arguments)
    {
        if (arguments.Length == 0)
        {
            return Invoke(caller, formatter, details, procedure);
        }

        return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure, arguments);
    }

    public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
                                                       InvocationDetails details, string procedure, TMessage[] arguments,
                                                       IDictionary<string, TMessage> argumentsKeywords)
    {
        return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure, arguments, argumentsKeywords);
    }

    public IWampRegistrationSubscriptionToken Register(IWampRpcOperation operation, RegisterOptions options)
    {
        return mRpcOperationCatalog.Register(operation, options);
    }

    public event EventHandler<WampProcedureRegisterEventArgs> RegistrationAdded
    {
        add => mRpcOperationCatalog.RegistrationAdded += value;
        remove => mRpcOperationCatalog.RegistrationAdded -= value;
    }

    public event EventHandler<WampProcedureRegisterEventArgs> RegistrationRemoved
    {
        add => mRpcOperationCatalog.RegistrationRemoved += value;
        remove => mRpcOperationCatalog.RegistrationRemoved -= value;
    }

    public IWampRpcOperation GetMatchingOperation(string criteria)
    {
        return mRpcOperationCatalog.GetMatchingOperation(criteria);
    }

    private class RpcCacheWampRawOperationRouterCallback : IWampRawRpcOperationRouterCallback
    {
        private readonly IWampRawRpcOperationRouterCallback mCallback;
        private readonly string mProcedureName;
        private readonly IRpcInvocationCache mInvocationCache;

        public RpcCacheWampRawOperationRouterCallback(
            IWampRawRpcOperationRouterCallback callback, string procedureName,
            IRpcInvocationCache invocationCache)
        {
            mCallback = callback;
            mProcedureName = procedureName;
            mInvocationCache = invocationCache;
        }

        public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options)
        {
            mInvocationCache.MapInvocationToResult(mProcedureName, new ResultArguments(options));
            mCallback.Result(formatter, options);
        }

        public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options, TMessage[] arguments)
        {
            mInvocationCache.MapInvocationToResult(mProcedureName, ResultArguments.FromArguments(options, arguments));
            mCallback.Result(formatter, options, arguments);
        }

        public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options, TMessage[] arguments,
                                     IDictionary<string, TMessage> argumentsKeywords)
        {
            mInvocationCache.MapInvocationToResult(mProcedureName, ResultArguments.FromArguments(options, arguments, argumentsKeywords));
            mCallback.Result(formatter, options, arguments, argumentsKeywords);
        }

        public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error)
        {
            mCallback.Error(formatter, details, error);
        }

        public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error, TMessage[] arguments)
        {
            mCallback.Error(formatter, details, error, arguments);
        }

        public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error, TMessage[] arguments,
                                    TMessage argumentsKeywords)
        {
            mCallback.Error(formatter, details, error, arguments, argumentsKeywords);
        }
    }
}

Here's a gist with a full working example.

In case you decide to complete this implementation, please upload it here so others can make use of if.

Elad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants