Skip to content

Commit

Permalink
Merge pull request #65 from RayTale/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
u-less authored Feb 19, 2021
2 parents 63a550a + c863c6b commit d4419bc
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/prerelease-nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
dotnet-version: 5.0.100
dotnet-version: 5.0.102

- uses: dotnet/nbgv@master
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
dotnet-version: 5.0.100
dotnet-version: 5.0.102

- uses: dotnet/nbgv@master
with:
Expand Down
25 changes: 12 additions & 13 deletions Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,35 @@
<PackageReference Update="Microsoft.Extensions.DependencyModel" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Caching.Abstractions" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.ObjectPool" Version="5.0.2" />
<PackageReference Update="Microsoft.Extensions.ObjectPool" Version="5.0.3" />
<PackageReference Update="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Update="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" />
<PackageReference Update="BenchmarkDotNet" Version="0.12.1" />

<PackageReference Update="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.4.0">
<PackageReference Update="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Update="Microsoft.Orleans.OrleansProviders" Version="3.4.0"/>
<PackageReference Update="Microsoft.Orleans.Core" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.Core.Abstractions" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.Core.OrleansRuntime" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.Runtime.Abstractions" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.OrleansRuntime" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.TestingHost" Version="3.4.0" />
<PackageReference Update="Microsoft.Orleans.OrleansProviders" Version="3.4.1"/>
<PackageReference Update="Microsoft.Orleans.Core" Version="3.4.1" />
<PackageReference Update="Microsoft.Orleans.Core.Abstractions" Version="3.4.1" />
<PackageReference Update="Microsoft.Orleans.Runtime.Abstractions" Version="3.4.1" />
<PackageReference Update="Microsoft.Orleans.OrleansRuntime" Version="3.4.1" />
<PackageReference Update="Microsoft.Orleans.TestingHost" Version="3.4.1" />

<PackageReference Update="System.Buffers" Version="4.5.1" />
<PackageReference Update="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Update="System.Text.Json" Version="5.0.1" />
<PackageReference Update="System.Threading.Tasks.Dataflow" Version="5.0.0" />
<PackageReference Update="Confluent.Kafka" Version="1.5.3" />
<PackageReference Update="RabbitMQ.Client" Version="6.2.1" />
<PackageReference Update="RabbitMQ.Client" Version="5.2.0" />
<PackageReference Update="System.Reactive" Version="4.4.1" />
<PackageReference Update="linq2db" Version="3.2.3" />
<PackageReference Update="IdGen" Version="3.0.0" />

<PackageReference Update="Lindhart.Analyser.MissingAwaitWarning" Version="2.0.0" PrivateAssets="All" />
<PackageReference Update="Microsoft.CodeAnalysis.CSharp" Version="3.8.0" PrivateAssets="All" />
<PackageReference Update="Microsoft.Data.Sqlite" Version="5.0.2" />
<PackageReference Update="Microsoft.Data.Sqlite" Version="5.0.3" />
<PackageReference Update="Nerdbank.GitVersioning" Version="3.3.37" PrivateAssets="All" />
<PackageReference Update="Microsoft.CodeQuality.Analyzers" Version="3.3.2">
<PrivateAssets>all</PrivateAssets>
Expand All @@ -53,13 +52,13 @@
</PackageReference>

<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.0-beta-20204-02" PrivateAssets="All"/>
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.9.0-preview-20210106-01" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.9.0-preview-20210127-04" />
<PackageReference Update="xunit" Version="2.4.1" />
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Update="coverlet.collector" Version="3.0.0">
<PackageReference Update="coverlet.collector" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
7 changes: 4 additions & 3 deletions src/Stream/Vertex.Stream.Kafka/Consumer/ConsumerRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public ConsumerRunner(

public Task Run()
{
var consumer = this.Client.GetConsumer(this.Queue.Group);
consumer.Handler.Subscribe(this.Queue.Topic);
this.available = true;
ThreadPool.UnsafeQueueUserWorkItem(
async state =>
{
using var consumer = this.Client.GetConsumer(this.Queue.Group);
consumer.Handler.Subscribe(this.Queue.Topic);
this.available = true;
while (!this.closed)
{
var list = new List<BytesBox>();
Expand Down Expand Up @@ -107,6 +107,7 @@ public Task Run()
}
this.available = false;
consumer.Handler.Unsubscribe();
consumer.Dispose();
}, null);
return Task.CompletedTask;
}
Expand Down
109 changes: 57 additions & 52 deletions src/Stream/Vertex.Stream.RabbitMQ/Consumer/ConsumerRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,74 +44,77 @@ public ConsumerRunner(

public Task Run()
{
ThreadPool.UnsafeQueueUserWorkItem(
async state =>
this.Model = this.Client.PullModel();
if (this.isFirst)
{
this.Model = this.Client.PullModel();
if (this.isFirst)
{
this.isFirst = false;
this.Model.Model.ExchangeDeclare(this.Queue.Exchange, "direct", true);
this.Model.Model.QueueDeclare(this.Queue.Queue, true, false, false, null);
this.Model.Model.QueueBind(this.Queue.Queue, this.Queue.Exchange, this.Queue.RoutingKey);
}
this.isFirst = false;
this.Model.Model.ExchangeDeclare(this.Queue.Exchange, "direct", true);
this.Model.Model.QueueDeclare(this.Queue.Queue, true, false, false, null);
this.Model.Model.QueueBind(this.Queue.Queue, this.Queue.Exchange, this.Queue.RoutingKey);
}

while (!this.closed)
ThreadPool.UnsafeQueueUserWorkItem(
async state =>
{
var list = new List<BytesBox>();
var batchStartTime = DateTimeOffset.UtcNow;
try
while (!this.closed)
{
while (true)
var list = new List<BytesBox>();
var batchStartTime = DateTimeOffset.UtcNow;
try
{
var whileResult = this.Model.Model.BasicGet(this.Queue.Queue, this.consumerOptions.AutoAck);
if (whileResult is null)
while (true)
{
break;
var whileResult =
this.Model.Model.BasicGet(this.Queue.Queue, this.consumerOptions.AutoAck);
if (whileResult is null)
{
break;
}
else
{
list.Add(new BytesBox(whileResult.Body, whileResult));
}

if ((DateTimeOffset.UtcNow - batchStartTime).TotalMilliseconds >
this.Model.Connection.Options.CunsumerMaxMillisecondsInterval ||
list.Count == this.Model.Connection.Options.CunsumerMaxBatchSize)
{
break;
}
}
else

if (list.Any())
{
list.Add(new BytesBox(whileResult.Body.ToArray(), whileResult));
await this.Notice(list);
}

if ((DateTimeOffset.UtcNow - batchStartTime).TotalMilliseconds > this.Model.Connection.Options.CunsumerMaxMillisecondsInterval ||
list.Count == this.Model.Connection.Options.CunsumerMaxBatchSize)
else
{
break;
await Task.Delay(WhileTimeoutSpan);
}
}

if (list.Any())
{
await this.Notice(list);
}
else
{
await Task.Delay(WhileTimeoutSpan);
}
}
catch (Exception exception)
{
this.Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {this.Queue}");
foreach (var item in list.Where(o => !o.Success))
catch (Exception exception)
{
this.Model.Model.BasicReject(((BasicGetResult)item.Origin).DeliveryTag, true);
this.Logger.LogError(exception.InnerException ?? exception,
$"An error occurred in {this.Queue}");
foreach (var item in list.Where(o => !o.Success))
{
this.Model.Model.BasicReject(((BasicGetResult) item.Origin).DeliveryTag, true);
}
}
}
finally
{
list = list.Where(o => o.Success).ToList();
if (list.Any())
finally
{
var maxDeliveryTag = list.Max(o => ((BasicGetResult)o.Origin).DeliveryTag);
if (maxDeliveryTag > 0)
list = list.Where(o => o.Success).ToList();
if (list.Any())
{
this.Model.Model.BasicAck(maxDeliveryTag, true);
var maxDeliveryTag = list.Max(o => ((BasicGetResult) o.Origin).DeliveryTag);
if (maxDeliveryTag > 0)
{
this.Model.Model.BasicAck(maxDeliveryTag, true);
}
}
}
}
}
}, null);
}, null);
return Task.CompletedTask;
}

Expand All @@ -121,11 +124,13 @@ private async Task Notice(List<BytesBox> list, int times = 0)
{
if (list.Count > 1)
{
await Task.WhenAll(this.Queue.SubActorType.Select(subType => this.streamSubHandler.EventHandler(subType, list)));
await Task.WhenAll(this.Queue.SubActorType.Select(subType =>
this.streamSubHandler.EventHandler(subType, list)));
}
else if (list.Count == 1)
{
await Task.WhenAll(this.Queue.SubActorType.Select(subType => this.streamSubHandler.EventHandler(subType, list[0])));
await Task.WhenAll(this.Queue.SubActorType.Select(subType =>
this.streamSubHandler.EventHandler(subType, list[0])));
}
}
catch
Expand Down Expand Up @@ -161,4 +166,4 @@ public void Close()
this.Model?.Dispose();
}
}
}
}

0 comments on commit d4419bc

Please sign in to comment.