Skip to content

Commit

Permalink
Merge pull request #45 from jraghavan-agoda/feature/executeasync-with…
Browse files Browse the repository at this point in the history
…-cancellation-token

Added overload to ExecuteReaderAsync with cancellation token support
  • Loading branch information
szaboopeeter committed Feb 20, 2024
2 parents 2f16628 + d726978 commit 6f1c823
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
23 changes: 23 additions & 0 deletions Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Moq;
Expand Down Expand Up @@ -664,6 +666,27 @@ public void QueryMultipleAsync_Retry_Failure()
Assert.IsNotNull(_onQueryCompleteEvents[0].Error);
Assert.IsNotNull(_onQueryCompleteEvents[1].Error);
}

[Test]
public async Task ExecuteReaderAsync_CancellationToken_Cancelled()

Check warning on line 671 in Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs

View workflow job for this annotation

GitHub Actions / Build Package

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 671 in Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs

View workflow job for this annotation

GitHub Actions / Build Package

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 671 in Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs

View workflow job for this annotation

GitHub Actions / Build Package

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromDays(1));
cancellationToken.Cancel();
var maxAttemptCount = 2;
Assert.ThrowsAsync<TaskCanceledException>(async () => await _db.ExecuteReaderAsync<string>("mobile_ro", "db.v1.sp_foo", 1,
maxAttemptCount,
cancellationToken.Token,
new IDbDataParameter[]
{
new SqlParameter("@param1", "value1"),
new SqlParameter("@param2", "value2")
}, reader => { return Task.FromResult("");})
);

_dbResources.Verify(x => x.ChooseDb("mobile_ro").UpdateWeight(It.IsAny<string>(), false), Times.Exactly(maxAttemptCount));
}


protected class FakeStoredProc : IStoredProc<string, string>
{
Expand Down
64 changes: 64 additions & 0 deletions Agoda.Frameworks.DB/DbRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Agoda.Frameworks.LoadBalancing;
using Dapper;
Expand Down Expand Up @@ -296,6 +297,69 @@ public Task<T> ExecuteReaderAsync<T>(
}
}, ShouldRetry(maxAttemptCount), RaiseOnError);
}

public Task<T> ExecuteReaderAsync<T>(
string database,
string storedProc,
int timeoutSecs,
int maxAttemptCount,
CancellationToken token,
IDbDataParameter[] parameters,
Func<SqlDataReader, Task<T>> callback)
{
return _dbResources.ChooseDb(database).ExecuteAsync(async (connectionStr, _) =>
{
var stopwatch = Stopwatch.StartNew();
Exception error = null;
try
{
using (var connection = _generateConnection(connectionStr))
{
if (connection is SqlConnection sqlConn)
{
await sqlConn.OpenAsync();
}
else
{
connection.Open();
}
SqlCommand sqlCommand = null;
try
{
sqlCommand = new SqlCommand(storedProc, connection as SqlConnection)
{
CommandType = CommandType.StoredProcedure,
CommandTimeout = timeoutSecs
};
sqlCommand.Parameters.AddRange(parameters);
using (var reader = await sqlCommand.ExecuteReaderAsync(token))
{
return await callback(reader);
}
}
finally
{
if (sqlCommand != null)
{
sqlCommand.Parameters.Clear();
sqlCommand.Dispose();
}
}
}
}
catch (Exception e)
{
error = e;
throw;
}
finally
{
stopwatch.Stop();
RaiseOnExecuteReaderComplete(
database, storedProc, stopwatch.ElapsedMilliseconds, error);
}
}, ShouldRetry(maxAttemptCount), RaiseOnError);
}

private IEnumerable<TResult> QueryImpl<TRequest, TResult>(
IStoredProc<TRequest, TResult> sp,
Expand Down
11 changes: 11 additions & 0 deletions Agoda.Frameworks.DB/IDbRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;

namespace Agoda.Frameworks.DB
Expand All @@ -26,6 +27,7 @@ Task<T> ExecuteReaderAsync<T>(
Func<SqlDataReader, Task<T>> callback,
TimeSpan? timeSpan,
string cacheKey = "");

Task<object> ExecuteScalarAsync(
string dbName,
string sqlCommandString,
Expand Down Expand Up @@ -134,6 +136,15 @@ Task<T> ExecuteReaderAsync<T>(
IDbDataParameter[] parameters,
Func<SqlDataReader, Task<T>> callback);

Task<T> ExecuteReaderAsync<T>(
string database,
string storedProc,
int timeoutSecs,
int maxAttemptCount,
CancellationToken token,
IDbDataParameter[] parameters,
Func<SqlDataReader, Task<T>> callback);

event EventHandler<DbErrorEventArgs> OnError;
event EventHandler<QueryCompleteEventArgs> OnQueryComplete;
event EventHandler<ExecuteReaderCompleteEventArgs> OnExecuteReaderComplete;
Expand Down

0 comments on commit 6f1c823

Please sign in to comment.