Skip to content

Commit

Permalink
Support specifying timeouts on a per-grain-call basis (#8599)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond authored Aug 31, 2023
1 parent 36829a8 commit 8666e23
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 144 deletions.
31 changes: 31 additions & 0 deletions src/Orleans.CodeGenerator/InvokableGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using Orleans.CodeGenerator.Diagnostics;
using static Microsoft.CodeAnalysis.CSharp.SyntaxFactory;
using System.Threading;

namespace Orleans.CodeGenerator
{
Expand Down Expand Up @@ -45,7 +46,14 @@ public static (ClassDeclarationSyntax Syntax, GeneratedInvokerDescription Invoke
.AddMembers(fields);

if (ctor != null)
{
classDeclaration = classDeclaration.AddMembers(ctor);
}

if (method.ResponseTimeoutTicks.HasValue)
{
classDeclaration = classDeclaration.AddMembers(GenerateResponseTimeoutPropertyMembers(libraryTypes, method.ResponseTimeoutTicks.Value));
}

classDeclaration = AddOptionalMembers(classDeclaration,
GenerateGetArgumentCount(method),
Expand Down Expand Up @@ -119,6 +127,29 @@ static Accessibility GetAccessibility(InvokableInterfaceDescription interfaceDes
}
}

private static MemberDeclarationSyntax[] GenerateResponseTimeoutPropertyMembers(LibraryTypes libraryTypes, long value)
{
var timespanField = FieldDeclaration(
VariableDeclaration(
libraryTypes.TimeSpan.ToTypeSyntax(),
SingletonSeparatedList(VariableDeclarator("_responseTimeoutValue")
.WithInitializer(EqualsValueClause(
InvocationExpression(
IdentifierName("global::System.TimeSpan").Member("FromTicks"),
ArgumentList(SeparatedList(new[]
{
Argument(LiteralExpression(SyntaxKind.NumericLiteralExpression, Literal(value)))
}))))))))
.AddModifiers(Token(SyntaxKind.PrivateKeyword), Token(SyntaxKind.StaticKeyword), Token(SyntaxKind.ReadOnlyKeyword));

var responseTimeoutProperty = MethodDeclaration(NullableType(libraryTypes.TimeSpan.ToTypeSyntax()), "GetDefaultResponseTimeout")
.WithExpressionBody(ArrowExpressionClause(IdentifierName("_responseTimeoutValue")))
.WithSemicolonToken(Token(SyntaxKind.SemicolonToken))
.AddModifiers(Token(SyntaxKind.PublicKeyword), Token(SyntaxKind.OverrideKeyword));
;
return new MemberDeclarationSyntax[] { timespanField, responseTimeoutProperty };
}

private static ClassDeclarationSyntax AddOptionalMembers(ClassDeclarationSyntax decl, params MemberDeclarationSyntax[] items)
=> decl.WithMembers(decl.Members.AddRange(items.Where(i => i != null)));

Expand Down
225 changes: 112 additions & 113 deletions src/Orleans.CodeGenerator/LibraryTypes.cs

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/Orleans.CodeGenerator/Model/MethodDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ private void PopulateOverrides(InvokableInterfaceDescription containingType, IMe
CustomInitializerMethods.Add((methodName, methodArgument));
}
}

if (SymbolEqualityComparer.Default.Equals(methodAttr.AttributeClass, containingType.CodeGenerator.LibraryTypes.ResponseTimeoutAttribute))
{
ResponseTimeoutTicks = TimeSpan.Parse((string)methodAttr.ConstructorArguments[0].Value).Ticks;
}
}

bool TryGetNamedArgument(ImmutableArray<KeyValuePair<string, TypedConstant>> arguments, string name, out TypedConstant value)
Expand Down Expand Up @@ -161,6 +166,11 @@ bool TryGetNamedArgument(ImmutableArray<KeyValuePair<string, TypedConstant>> arg
/// </summary>
public Dictionary<INamedTypeSymbol, INamedTypeSymbol> InvokableBaseTypes { get; }

/// <summary>
/// Gets the response timeout ticks, if set.
/// </summary>
public long? ResponseTimeoutTicks { get; private set; }

public override int GetHashCode() => SymbolEqualityComparer.Default.GetHashCode(Method);
}
}
3 changes: 3 additions & 0 deletions src/Orleans.Core.Abstractions/Runtime/GrainReference.cs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ public void AddInvokeMethodOptions(InvokeMethodOptions options)

/// <inheritdoc/>
public override string ToString() => IRequest.ToString(this);

/// <inheritdoc/>
public virtual TimeSpan? GetDefaultResponseTimeout() => null;
}

/// <summary>
Expand Down
20 changes: 17 additions & 3 deletions src/Orleans.Core/Runtime/CallbackData.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Text;
using System.Diagnostics;
using System.Threading;
using Microsoft.Extensions.Logging;
using Orleans.Serialization.Invocation;
Expand Down Expand Up @@ -37,10 +37,23 @@ public void OnStatusUpdate(StatusResponse status)
public bool IsExpired(long currentTimestamp)
{
var duration = currentTimestamp - this.stopwatch.GetRawTimestamp();
return duration > shared.ResponseTimeoutStopwatchTicks;
return duration > GetResponseTimeoutStopwatchTicks();
}

public void OnTimeout(TimeSpan timeout)
private long GetResponseTimeoutStopwatchTicks()
{
var defaultResponseTimeout = (Message.BodyObject as IInvokable)?.GetDefaultResponseTimeout();
if (defaultResponseTimeout.HasValue)
{
return (long)(defaultResponseTimeout.Value.TotalSeconds * Stopwatch.Frequency);
}

return shared.ResponseTimeoutStopwatchTicks;
}

private TimeSpan GetResponseTimeout() => (Message.BodyObject as IInvokable)?.GetDefaultResponseTimeout() ?? shared.ResponseTimeout;

public void OnTimeout()
{
if (Interlocked.CompareExchange(ref completed, 1, 0) != 0)
{
Expand All @@ -58,6 +71,7 @@ public void OnTimeout(TimeSpan timeout)
var msg = this.Message; // Local working copy

var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
var timeout = GetResponseTimeout();
this.shared.Logger.LogWarning(
(int)ErrorCode.Runtime_Error_100157,
"Response did not arrive on time in {Timeout} for message: {Message}. {StatusMessage}. About to break its promise.",
Expand Down
10 changes: 3 additions & 7 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,6 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
var message = this.messageFactory.CreateMessage(request, options);
OrleansOutsideRuntimeClientEvent.Log.SendRequest(message);

SendRequestMessage(target, message, context, options);
}

private void SendRequestMessage(GrainReference target, Message message, IResponseCompletionSource context, InvokeMethodOptions options)
{
message.InterfaceType = target.InterfaceType;
message.InterfaceVersion = target.InterfaceVersion;
var targetGrainId = target.GrainId;
Expand All @@ -254,7 +249,8 @@ private void SendRequestMessage(GrainReference target, Message message, IRespons
if (message.IsExpirableMessage(this.clientMessagingOptions.DropExpiredMessages))
{
// don't set expiration for system target messages.
message.TimeToLive = this.clientMessagingOptions.ResponseTimeout;
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
message.TimeToLive = ttl;
}

if (!oneWay)
Expand Down Expand Up @@ -452,7 +448,7 @@ private void OnCallbackExpiryTick(object state)
{
var callback = pair.Value;
if (callback.IsCompleted) continue;
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout(this.clientMessagingOptions.ResponseTimeout);
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout();
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void SendRequest(

if (message.IsExpirableMessage(this.messagingOptions.DropExpiredMessages))
{
message.TimeToLive = sharedData.ResponseTimeout;
message.TimeToLive = request.GetDefaultResponseTimeout() ?? sharedData.ResponseTimeout;
}

var oneWay = (options & InvokeMethodOptions.OneWay) != 0;
Expand Down Expand Up @@ -532,12 +532,11 @@ public void Participate(ISiloLifecycle lifecycle)
private void OnCallbackExpiryTick(object state)
{
var currentStopwatchTicks = ValueStopwatch.GetTimestamp();
var responseTimeout = this.messagingOptions.ResponseTimeout;
foreach (var pair in callbacks)
{
var callback = pair.Value;
if (callback.IsCompleted) continue;
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout(responseTimeout);
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout();
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/Orleans.Serialization.Abstractions/Annotations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,24 @@ public GenerateCodeForDeclaringAssemblyAttribute(Type type)
public Type Type { get; }
}

/// <summary>
/// Specifies the response timeout for the interface method which it is specified on.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public sealed class ResponseTimeoutAttribute : Attribute
{
/// <summary>
/// Specifies the response timeout for the interface method which it is specified on.
/// </summary>
/// <param name="timeout">The response timeout, using <see cref="TimeSpan.Parse(string)"/> syntax.</param>
public ResponseTimeoutAttribute(string timeout) => Timeout = TimeSpan.Parse(timeout);

/// <summary>
/// Gets or sets the response timeout for this method.
/// </summary>
public TimeSpan? Timeout { get; init; }
}

/// <summary>
/// Functionality for converting between two types.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Orleans.Serialization/Invocation/IInvokable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,10 @@ public interface IInvokable : IDisposable
/// Gets the interface type.
/// </summary>
Type GetInterfaceType();

/// <summary>
/// Gets the default response timeout.
/// </summary>
TimeSpan? GetDefaultResponseTimeout();
}
}
37 changes: 19 additions & 18 deletions test/DefaultCluster.Tests/EchoTaskGrainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,27 @@ await promise.ContinueWith(t =>
}

[Fact, TestCategory("SlowBVT"), TestCategory("Echo"), TestCategory("Timeout")]
public async Task EchoGrain_Timeout_Wait()
public async Task EchoGrain_Timeout_ContinueWith()
{
grain = this.GrainFactory.GetGrain<IEchoTaskGrain>(Guid.NewGuid());

TimeSpan delay30 = TimeSpan.FromSeconds(30); // grain call timeout (set in config)
TimeSpan delay5 = TimeSpan.FromSeconds(30); // grain call timeout (set in config)
TimeSpan delay45 = TimeSpan.FromSeconds(45);
TimeSpan delay60 = TimeSpan.FromSeconds(60);
Stopwatch sw = new Stopwatch();
sw.Start();
Task<int> promise = grain.BlockingCallTimeoutAsync(delay60);
Task<int> promise = grain.BlockingCallTimeoutNoResponseTimeoutOverrideAsync(delay60);
await promise.ContinueWith(
t =>
{
if (!t.IsFaulted) Assert.True(false); // BlockingCallTimeout should not have completed successfully
if (!t.IsFaulted) Assert.Fail("BlockingCallTimeout should not have completed successfully");

Exception exc = t.Exception;
while (exc is AggregateException) exc = exc.InnerException;
Assert.IsAssignableFrom<TimeoutException>(exc);
}).WithTimeout(delay45);
sw.Stop();
Assert.True(TimeIsLonger(sw.Elapsed, delay30), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsLonger(sw.Elapsed, delay5), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsShorter(sw.Elapsed, delay60), $"Elapsed time out of range: {sw.Elapsed}");
}

Expand All @@ -100,47 +100,48 @@ public async Task EchoGrain_Timeout_Await()
{
grain = this.GrainFactory.GetGrain<IEchoTaskGrain>(Guid.NewGuid());

TimeSpan delay30 = TimeSpan.FromSeconds(30);
TimeSpan delay60 = TimeSpan.FromSeconds(60);
TimeSpan delay5 = TimeSpan.FromSeconds(5);
TimeSpan delay25 = TimeSpan.FromSeconds(25);
Stopwatch sw = new Stopwatch();
sw.Start();
try
{
int res = await grain.BlockingCallTimeoutAsync(delay60);
Assert.True(false); // BlockingCallTimeout should not have completed successfully
int res = await grain.BlockingCallTimeoutAsync(delay25);
Assert.Fail($"BlockingCallTimeout should not have completed successfully, but returned {res}");
}
catch (Exception exc)
{
while (exc is AggregateException) exc = exc.InnerException;
Assert.IsAssignableFrom<TimeoutException>(exc);
}
sw.Stop();
Assert.True(TimeIsLonger(sw.Elapsed, delay30), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsShorter(sw.Elapsed, delay60), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsLonger(sw.Elapsed, delay5), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsShorter(sw.Elapsed, delay25), $"Elapsed time out of range: {sw.Elapsed}");
}

[Fact, TestCategory("SlowBVT"), TestCategory("Echo"), TestCategory("Timeout")]
public async Task EchoGrain_Timeout_Result()
public void EchoGrain_Timeout_Result()
{
grain = this.GrainFactory.GetGrain<IEchoTaskGrain>(Guid.NewGuid());

TimeSpan delay30 = TimeSpan.FromSeconds(30);
TimeSpan delay60 = TimeSpan.FromSeconds(60);
TimeSpan delay5 = TimeSpan.FromSeconds(5);
TimeSpan delay25 = TimeSpan.FromSeconds(25);
Stopwatch sw = new Stopwatch();
sw.Start();
try
{
int res = await grain.BlockingCallTimeoutAsync(delay60);
Assert.True(false, "BlockingCallTimeout should not have completed successfully, but returned " + res);
// Note that this method purposely uses Task.Result.
int res = grain.BlockingCallTimeoutAsync(delay25).Result;
Assert.Fail($"BlockingCallTimeout should not have completed successfully, but returned {res}");
}
catch (Exception exc)
{
while (exc is AggregateException) exc = exc.InnerException;
Assert.IsAssignableFrom<TimeoutException>(exc);
}
sw.Stop();
Assert.True(TimeIsLonger(sw.Elapsed, delay30), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsShorter(sw.Elapsed, delay60), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsLonger(sw.Elapsed, delay5), $"Elapsed time out of range: {sw.Elapsed}");
Assert.True(TimeIsShorter(sw.Elapsed, delay25), $"Elapsed time out of range: {sw.Elapsed}");
}

[Fact, TestCategory("BVT"), TestCategory("Echo")]
Expand Down
4 changes: 4 additions & 0 deletions test/Grains/TestGrainInterfaces/IEchoTaskGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ public interface IEchoTaskGrain : IGrainWithGuidKey
Task<string> EchoAsync(string data);
Task<string> EchoErrorAsync(string data);

[ResponseTimeout("00:00:05")]
Task<int> BlockingCallTimeoutAsync(TimeSpan delay);

Task<int> BlockingCallTimeoutNoResponseTimeoutOverrideAsync(TimeSpan delay);

Task PingAsync();

Task PingLocalSiloAsync();
Task PingRemoteSiloAsync(SiloAddress siloAddress);
Task PingOtherSiloAsync();
Expand Down
10 changes: 10 additions & 0 deletions test/Grains/TestInternalGrains/EchoTaskGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ public Task<int> BlockingCallTimeoutAsync(TimeSpan delay)
throw new InvalidOperationException("Timeout should have been returned to caller before " + delay);
}

public Task<int> BlockingCallTimeoutNoResponseTimeoutOverrideAsync(TimeSpan delay)
{
logger.LogInformation("IEchoGrainAsync.BlockingCallTimeoutNoResponseTimeoutOverrideAsync Delay={Delay}", delay);
Stopwatch sw = new Stopwatch();
sw.Start();
Thread.Sleep(delay);
logger.LogInformation("IEchoGrainAsync.BlockingCallTimeoutNoResponseTimeoutOverrideAsync Awoke from sleep after {ElapsedDuration}", sw.Elapsed);
throw new InvalidOperationException("Timeout should have been returned to caller before " + delay);
}

public Task PingAsync()
{
logger.LogInformation("IEchoGrainAsync.Ping");
Expand Down
1 change: 1 addition & 0 deletions test/Orleans.Serialization.UnitTests/Request.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public abstract class UnitTestRequestBase : IInvokable
public abstract Type GetInterfaceType();

public abstract MethodInfo GetMethod();
public virtual TimeSpan? GetDefaultResponseTimeout() => null;
}

[GenerateSerializer]
Expand Down

0 comments on commit 8666e23

Please sign in to comment.