-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRmiServer.cs
218 lines (179 loc) · 7.52 KB
/
RmiServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
namespace Menees.Remoting;
#region Using Directives
using System.Reflection;
using Menees.Remoting.Models;
using Menees.Remoting.Pipes;
using Microsoft.Extensions.Logging;
#endregion
/// <summary>
/// Exposes the <typeparamref name="TServiceInterface"/> interface from a given service object instance
/// as a remotely invocable server.
/// </summary>
/// <typeparam name="TServiceInterface">The interface to make available for remote invocation.</typeparam>
public sealed class RmiServer<TServiceInterface> : RmiNode<TServiceInterface>, IServer
where TServiceInterface : class
{
#region Private Data Members
private static readonly Lazy<Dictionary<string, MethodInfo>> MethodSignatureCache = new(CreateMethodSignatureDictionary);
private readonly PipeServer pipe;
private readonly CancellationToken cancellationToken;
private TServiceInterface? serviceInstance;
#endregion
#region Constructors
/// <summary>
/// Creates a new server instance to expose a <typeparamref name="TServiceInterface"/> implementation
/// to <see cref="RmiClient{TServiceInterface}"/> instances.
/// </summary>
/// <param name="serviceInstance">An instance of <typeparamref name="TServiceInterface"/> on which to execute remote invocations.</param>
/// <param name="serverPath">The path used to expose the service.</param>
/// <param name="maxListeners">The maximum number of server listener tasks to start.</param>
/// <param name="minListeners">The minimum number of server listener tasks to start.</param>
/// <param name="loggerFactory">An optional factory for creating type-specific server loggers for status information.</param>
public RmiServer(
TServiceInterface serviceInstance,
string serverPath,
int maxListeners = ServerSettings.MaxAllowedListeners,
int minListeners = 1,
ILoggerFactory? loggerFactory = null)
: this(serviceInstance, new ServerSettings(serverPath)
{
MaxListeners = maxListeners,
MinListeners = minListeners,
CreateLogger = loggerFactory != null ? loggerFactory.CreateLogger : null,
})
{
}
/// <summary>
/// Creates a new server instance to expose a <typeparamref name="TServiceInterface"/> implementation
/// to <see cref="RmiClient{TServiceInterface}"/> instances.
/// </summary>
/// <param name="serviceInstance">An instance of <typeparamref name="TServiceInterface"/> on which to execute remote invocations.</param>
/// <param name="settings">Parameters used to initialize this instance.</param>
public RmiServer(TServiceInterface serviceInstance, ServerSettings settings)
: base(settings)
{
ArgumentNullException.ThrowIfNull(settings);
this.serviceInstance = serviceInstance ?? throw new ArgumentNullException(nameof(serviceInstance));
// Note: The pipe is created with no listeners until we explicitly start them.
this.pipe = new(
settings.ServerPath,
settings.MinListeners,
settings.MaxListeners,
this.ProcessRequestAsync,
this,
this,
(PipeServerSecurity?)settings.Security);
this.cancellationToken = settings.CancellationToken;
}
#endregion
#region Public Events
/// <inheritdoc/>
public event EventHandler? Stopped
{
add => this.pipe.Stopped += value;
remove => this.pipe.Stopped -= value;
}
#endregion
#region Public Properties
/// <inheritdoc/>
public Action<Exception>? ReportUnhandledException
{
get => this.pipe.ReportUnhandledException;
set => this.pipe.ReportUnhandledException = value;
}
#endregion
#region Public Methods
/// <inheritdoc/>
public void Start() => this.pipe.EnsureMinListeners();
/// <inheritdoc/>
public void Stop() => this.pipe.StopListening();
#endregion
#region Protected Methods
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
this.serviceInstance = null;
this.pipe.Dispose();
}
}
#endregion
#region Private Methods
private static Dictionary<string, MethodInfo> CreateMethodSignatureDictionary()
{
List<(string Signature, MethodInfo Method)> items = [];
void AddMethods(Type interfaceType)
{
items.AddRange(interfaceType.GetMethods().Select(method => (GetMethodSignature(method), method)));
foreach (Type implementedType in interfaceType.GetInterfaces())
{
AddMethods(implementedType);
}
}
// Calling GetMethods() on an interface type only gets its declared methods. Even BindingFlags don't help
// because a "derived" interface still has "object" as its base and any "inherited" interface is really just an
// "implements" relationship and not an "inherits" relationship. So, we have to recursively find all "inherited"
// interfaces and flatten them out ourselves.
// https://stackoverflow.com/questions/3395174/c-sharp-interface-inheritance/3395327#3395327
AddMethods(typeof(TServiceInterface));
// Take the first MethodInfo instance for each signature because interfaces can have a diamond shaped
// "inheritance" pattern. For example, suppose an interface "inherits" from two interfaces that are
// both IDisposable like this:
// IStreamReader : IDisposable
// IStreamWriter : IDisposable
// IStreamReaderWriter : IStreamReader, IStreamWriter
Dictionary<string, MethodInfo> result = items.GroupBy(pair => pair.Signature)
.ToDictionary(pair => pair.Key, pair => pair.First().Method);
return result;
}
private async Task ProcessRequestAsync(Stream clientStream)
{
await ServerUtility.ProcessRequestAsync(
this,
this,
clientStream,
async (request, cancellation) =>
{
Response response;
if (!MethodSignatureCache.Value.TryGetValue(request.MethodSignature ?? string.Empty, out MethodInfo? method))
{
response = ServerUtility.CreateResponse(new TargetException(
$"A {typeof(TServiceInterface).FullName} method with signature '{request.MethodSignature}' was not found."));
}
else if (this.serviceInstance is not object target)
{
response = ServerUtility.CreateResponse(new ObjectDisposedException(this.GetType().FullName));
}
else
{
IEnumerable<UserSerializedValue> serializedArgs = request.Arguments ?? Enumerable.Empty<UserSerializedValue>();
object?[] args = serializedArgs.Select(arg => arg.DeserializeValue(this.UserSerializer)).ToArray();
object? methodResult;
try
{
methodResult = method.Invoke(target, args);
Type returnType = methodResult?.GetType() ?? method.ReturnType;
// In theory, if the return type is Task then we could await methodResult. However, that gets complicated
// very quickly since the client is synchronously waiting for the result. We can't serialize Task or CancellationToken
// directly, so we'd have to implement custom support for them in the client and server. For now, we'll keep things
// simple and let the serializer throw if those types are used. A caller can use MessageServer for async calls instead.
response = new Response { Result = new UserSerializedValue(returnType, methodResult, this.UserSerializer) };
}
catch (TargetInvocationException ex)
{
// The inner exception is the original exception thrown by the invoked method.
response = ServerUtility.CreateResponse(ex.InnerException ?? ex);
methodResult = null;
}
// The inner try..catch for Invoke only handles TargetInvocationException so any exceptions from
// invalid request arguments or from trying to serialize methodResult will pass through the
// ReportUnhandledException action below before being returned.
}
return await Task.FromResult(response).ConfigureAwait(false);
},
this.cancellationToken).ConfigureAwait(false);
}
#endregion
}