diff --git a/CAP.sln b/CAP.sln index 26afd85a9..c77a38e9c 100644 --- a/CAP.sln +++ b/CAP.sln @@ -86,7 +86,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.D EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.OpenTelemetry", "src\DotNetCore.CAP.OpenTelemetry\DotNetCore.CAP.OpenTelemetry.csproj", "{83DDB126-A00B-4064-86E7-568322CA67EC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{0C734FB2-7D75-4FF3-B564-1E50E6280B14}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{0C734FB2-7D75-4FF3-B564-1E50E6280B14}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.GcpPubSub.GoogleSpanner", "samples\Sample.GcpPubSub.GoogleSpanner\Sample.GcpPubSub.GoogleSpanner.csproj", "{5F857ABF-449B-46C7-B160-CAC1C02DB782}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.GooglePubSub", "src\DotNetCore.CAP.GooglePubSub\DotNetCore.CAP.GooglePubSub.csproj", "{AB823637-CA8C-4139-B9EC-DD29494E2276}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.GoogleSpanner", "src\DotNetCore.CAP.GoogleSpanner\DotNetCore.CAP.GoogleSpanner.csproj", "{79278DDF-D699-4E1F-ACB4-907F8903E350}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -214,6 +220,18 @@ Global {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Debug|Any CPU.Build.0 = Debug|Any CPU {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.ActiveCfg = Release|Any CPU {0C734FB2-7D75-4FF3-B564-1E50E6280B14}.Release|Any CPU.Build.0 = Release|Any CPU + {5F857ABF-449B-46C7-B160-CAC1C02DB782}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5F857ABF-449B-46C7-B160-CAC1C02DB782}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5F857ABF-449B-46C7-B160-CAC1C02DB782}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5F857ABF-449B-46C7-B160-CAC1C02DB782}.Release|Any CPU.Build.0 = Release|Any CPU + {AB823637-CA8C-4139-B9EC-DD29494E2276}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AB823637-CA8C-4139-B9EC-DD29494E2276}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AB823637-CA8C-4139-B9EC-DD29494E2276}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AB823637-CA8C-4139-B9EC-DD29494E2276}.Release|Any CPU.Build.0 = Release|Any CPU + {79278DDF-D699-4E1F-ACB4-907F8903E350}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {79278DDF-D699-4E1F-ACB4-907F8903E350}.Debug|Any CPU.Build.0 = Debug|Any CPU + {79278DDF-D699-4E1F-ACB4-907F8903E350}.Release|Any CPU.ActiveCfg = Release|Any CPU + {79278DDF-D699-4E1F-ACB4-907F8903E350}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -249,8 +267,11 @@ Global {DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} {83DDB126-A00B-4064-86E7-568322CA67EC} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {0C734FB2-7D75-4FF3-B564-1E50E6280B14} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {5F857ABF-449B-46C7-B160-CAC1C02DB782} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {AB823637-CA8C-4139-B9EC-DD29494E2276} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {79278DDF-D699-4E1F-ACB4-907F8903E350} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} EndGlobalSection -EndGlobal \ No newline at end of file +EndGlobal diff --git a/samples/Sample.GcpPubSub.GoogleSpanner/Controllers/ValuesController.cs b/samples/Sample.GcpPubSub.GoogleSpanner/Controllers/ValuesController.cs new file mode 100644 index 000000000..2126eb301 --- /dev/null +++ b/samples/Sample.GcpPubSub.GoogleSpanner/Controllers/ValuesController.cs @@ -0,0 +1,79 @@ +using System; +using System.Data; +using System.Text.Json; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP; +using Google.Cloud.Spanner.Data; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Configuration; + +namespace Sample.GcpPubSub.GoogleSpanner +{ + [Route("api/[controller]")] + public class ValuesController : Controller, ICapSubscribe + { + public class MyObj + { + public string SingerId { get; set; } + public string FirstName { get; set; } + } + + private readonly string _connectionString; + private readonly ICapPublisher _capBus; + + public ValuesController(ICapPublisher producer, IConfiguration configuration) + { + _capBus = producer; + _connectionString = configuration.GetConnectionString("SpannerDB"); + } + + [Route("~/without/transaction")] + public async Task WithoutTransaction() + { + await _capBus.PublishAsync("sample.gcppubsub.googlespanner", DateTime.Now); + + return Ok(); + } + + [Route("~/adonet/transaction")] + public IActionResult AdonetWithTransaction() + { + var random = new Random().Next(1, 10000); + try + { + using (var connection = new SpannerConnection(_connectionString)) + { + using (var transaction = connection.BeginTransaction()) + { + var id = Guid.NewGuid().ToString(); + var sql = "INSERT INTO Singers (SingerId, FirstName, LastName, FullName) " + + " values ('" + id + "', 'User" + random + "', 'Doe', 'User" + random + " Doe')"; + var cmd = connection.CreateDmlCommand(sql); + cmd.Transaction = transaction; + _ = cmd.ExecuteNonQuery(); + + var msg = new MyObj { SingerId = id, FirstName = "User" + random }; + + _capBus.Publish("sample.gcppubsub.googlespanner", msg); + + transaction.Commit(); + } + } + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + + return Ok(); + } + + + [CapSubscribe("sample.gcppubsub.googlespanner")] + public void Test2(MyObj value) + { + Console.WriteLine("Subscriber output message: " + JsonSerializer.Serialize(value)); + } + } +} \ No newline at end of file diff --git a/samples/Sample.GcpPubSub.GoogleSpanner/Program.cs b/samples/Sample.GcpPubSub.GoogleSpanner/Program.cs new file mode 100644 index 000000000..52490c837 --- /dev/null +++ b/samples/Sample.GcpPubSub.GoogleSpanner/Program.cs @@ -0,0 +1,20 @@ +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Hosting; + +namespace Sample.GcpPubSub.GoogleSpanner +{ + public class Program + { + public static void Main(string[] args) + { + CreateHostBuilder(args).Build().Run(); + } + + public static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.UseStartup(); + }); + } +} \ No newline at end of file diff --git a/samples/Sample.GcpPubSub.GoogleSpanner/Sample.GcpPubSub.GoogleSpanner.csproj b/samples/Sample.GcpPubSub.GoogleSpanner/Sample.GcpPubSub.GoogleSpanner.csproj new file mode 100644 index 000000000..06ddf0935 --- /dev/null +++ b/samples/Sample.GcpPubSub.GoogleSpanner/Sample.GcpPubSub.GoogleSpanner.csproj @@ -0,0 +1,21 @@ + + + + net6.0 + NU1701 + NU1701 + + + + + + + + + + + + + + + diff --git a/samples/Sample.GcpPubSub.GoogleSpanner/Startup.cs b/samples/Sample.GcpPubSub.GoogleSpanner/Startup.cs new file mode 100644 index 000000000..a260514a8 --- /dev/null +++ b/samples/Sample.GcpPubSub.GoogleSpanner/Startup.cs @@ -0,0 +1,48 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using DotNetCore.CAP.GoogleSpanner; +using Microsoft.Extensions.Configuration; +using DotNetCore.CAP; +using DotNetCore.CAP.Dashboard.NodeDiscovery; + +namespace Sample.GcpPubSub.GoogleSpanner +{ + public class Startup + { + public Startup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + string connectionString = Configuration["ConnectionStrings:SpannerCapDB"]; + + services.AddCap(x => + { + x.UseGoogleSpanner(connectionString); + x.UseGooglePubSub(cfg => + { + cfg.ProjectId = Configuration["Pubsub:ProjectId"]; + cfg.SubscriptionId = Configuration["Pubsub:SubscriptionId"]; + cfg.VerificationToken = Configuration["Pubsub:VerificationToken"]; + cfg.TopicId = Configuration["Pubsub:TopicId"]; + }); + x.UseDashboard(); + }); + //services.AddSingleton(); + services.AddControllers(); + } + + public void Configure(IApplicationBuilder app) + { + app.UseRouting(); + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + }); + } + } +} \ No newline at end of file diff --git a/samples/Sample.GcpPubSub.GoogleSpanner/appsettings.json b/samples/Sample.GcpPubSub.GoogleSpanner/appsettings.json new file mode 100644 index 000000000..fa8d21aab --- /dev/null +++ b/samples/Sample.GcpPubSub.GoogleSpanner/appsettings.json @@ -0,0 +1,18 @@ +{ + "Pubsub": { + "ProjectId": "", + "VerificationToken": "", + "TopicId": "", + "SubscriptionId": "" + }, + "ConnectionStrings": { + "SpannerDB": "Data Source=projects/{project}/instances/{instance}/databases/{database}", + "SpannerCapDB": "Data Source=projects/{project}/instances/{instance}/databases/{database}" + }, + "Logging": { + "IncludeScopes": false, + "LogLevel": { + "Default": "Debug" + } + } +} diff --git a/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptions.cs b/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptions.cs new file mode 100644 index 000000000..b6ae57481 --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptions.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + /// + /// Provides programmatic configuration for the CAP Google Cloud Platform Pub/Sub project. + /// + public class GooglePubSubOptions + { + /// + /// The GCP Project ID. + /// + public string ProjectId { get; set; } + + public string TopicId { get; set; } = "CAP"; + + public string VerificationToken { get; set; } + + public string SubscriptionId { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptionsExtension.cs b/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptionsExtension.cs new file mode 100644 index 000000000..519dc67aa --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/CAP.GooglePubSubOptionsExtension.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.GooglePubSub; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal sealed class GooglePubSubOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public GooglePubSubOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + + services.Configure(_configure); + + services.AddSingleton(); + services.AddSingleton(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.GooglePubSub/CAP.Options.Extensions.cs new file mode 100644 index 000000000..e7e532ae7 --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/CAP.Options.Extensions.cs @@ -0,0 +1,42 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + /// + /// Configuration to use Google Cloud Pub/Sub in CAP. + /// + /// CAP configuration options + /// The GCP Project ID. + public static CapOptions UseGooglePubSub(this CapOptions options,string projectId) + { + return options.UseGooglePubSub(opt => + { + opt.ProjectId = projectId; + }); + } + + /// + /// Configuration to use Google Cloud Pub/Sub in CAP. + /// + /// CAP configuration options + /// Provides programmatic configuration for the Google Cloud Pub/Sub. + public static CapOptions UseGooglePubSub(this CapOptions options, Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + options.RegisterExtension(new GooglePubSubOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/DotNetCore.CAP.GooglePubSub.csproj b/src/DotNetCore.CAP.GooglePubSub/DotNetCore.CAP.GooglePubSub.csproj new file mode 100644 index 000000000..1be7064cb --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/DotNetCore.CAP.GooglePubSub.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.1 + DotNetCore.CAP.GooglePubSub + $(PackageTags);PubSub;GCP;Google Cloud PubSub + + + + NU1605;NU1701 + NU1701;CS1591 + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.GooglePubSub.xml + + + + + + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClient.cs b/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClient.cs new file mode 100644 index 000000000..8031e0350 --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClient.cs @@ -0,0 +1,106 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Google.Api.Gax.ResourceNames; +using Google.Cloud.PubSub.V1; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.GooglePubSub +{ + internal sealed class GooglePubSubConsumerClient : IConsumerClient + { + private readonly ProjectName _projectName; + private readonly SubscriptionName _subscriptionName; + private readonly TopicName _topicName; + private SubscriberServiceApiClient _subscriberClient; + + public GooglePubSubConsumerClient(string subscriptionName, IOptions options) + { + _projectName = new ProjectName(options.Value.ProjectId); + _subscriptionName = new SubscriptionName(options.Value.ProjectId, subscriptionName); + _topicName = new TopicName(options.Value.ProjectId, options.Value.TopicId); + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public BrokerAddress BrokerAddress => new BrokerAddress("GooglePubSub", string.Empty); + + public void Subscribe(IEnumerable topics) + { + var hasSubscriptions = _subscriberClient.ListSubscriptions(_projectName) + .Any(x => x.TopicAsTopicName == _topicName && x.SubscriptionName == _subscriptionName); + + if (!hasSubscriptions) + { + _subscriberClient.CreateSubscription(_subscriptionName, _topicName, null, 60); + } + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + Connect(); + + while (true) + { + PullFromGcp: + + var response = _subscriberClient.Pull(_subscriptionName, returnImmediately: true, maxMessages: 1); + if (response.ReceivedMessages.Count > 0) + { + OnConsumerReceived(response.ReceivedMessages[0]); + + goto PullFromGcp; + } + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + // ReSharper disable once FunctionNeverReturns + } + + + public void Commit(object sender) + { + _subscriberClient.Acknowledge(_subscriptionName, new[] { (string)sender }); + } + + public void Reject(object sender) + { + // ignore + } + + public void Dispose() + { + + } + + public void Connect() + { + _subscriberClient ??= SubscriberServiceApiClient.Create(); + } + + #region private methods + + private void OnConsumerReceived(ReceivedMessage message) + { + var header = message.Message.Attributes + .ToDictionary(x => x.Key, y => y.Value); + header.Add(Headers.Group, _subscriptionName.SubscriptionId); + + //TODO: Since GCP does not support multiple topics attached to one subscription, + //TODO: multiple messages will be generated here and need to be removed + var context = new TransportMessage(header, message.Message.Data.ToByteArray()); + + OnMessageReceived?.Invoke(message.AckId, context); + } + + #endregion private methods + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClientFactory.cs b/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClientFactory.cs new file mode 100644 index 000000000..2c1708655 --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/GooglePubSubConsumerClientFactory.cs @@ -0,0 +1,38 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.GooglePubSub +{ + internal sealed class GooglePubSubConsumerClientFactory : IConsumerClientFactory + { + private readonly IOptions _options; + private GooglePubSubConsumerClient consumerClient; + + public GooglePubSubConsumerClientFactory(IOptions options) + { + _options = options; + } + + public IConsumerClient Create(string groupId) + { + + try + { + if (consumerClient == null) + { + consumerClient = new GooglePubSubConsumerClient(groupId, _options); + consumerClient.Connect(); + } + return consumerClient; + } + catch (System.Exception e) + { + throw new BrokerConnectionException(e); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GooglePubSub/ITransport.GooglePubSub.cs b/src/DotNetCore.CAP.GooglePubSub/ITransport.GooglePubSub.cs new file mode 100644 index 000000000..c25439198 --- /dev/null +++ b/src/DotNetCore.CAP.GooglePubSub/ITransport.GooglePubSub.cs @@ -0,0 +1,107 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Google.Api.Gax.ResourceNames; +using Google.Apis.Auth.OAuth2; +using Google.Cloud.PubSub.V1; +using Google.Protobuf; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.GooglePubSub +{ + internal class GooglePubSubTransport : ITransport + { + private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + + private readonly IOptions _options; + private readonly ILogger _logger; + + private PublisherServiceApiClient _publisherClient; + + public GooglePubSubTransport(IOptions options, + ILogger logger) + { + _options = options; + _logger = logger; + + Connect(); + + CreateTopic(); + } + + public BrokerAddress BrokerAddress => new BrokerAddress("GooglePubSub", string.Empty); + + public async Task SendAsync(TransportMessage transportMessage) + { + try + { + Connect(); + + var message = new PubsubMessage + { + Data = ByteString.CopyFrom(transportMessage.Body) + }; + + foreach (var header in transportMessage.Headers) + { + if (header.Value != null) + message.Attributes.Add(header.Key, header.Value); + } + var topicName = new TopicName(_options.Value.ProjectId, _options.Value.TopicId); + + await _publisherClient.PublishAsync(topicName, new[] { message }); + + _logger.LogDebug($"Topic message [{transportMessage.GetName()}] has been published."); + + return OperateResult.Success; + } + catch (Exception ex) + { + var wrapperEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(wrapperEx); + } + } + + private void Connect() + { + if (_publisherClient != null) + { + return; + } + + _connectionLock.Wait(); + + try + { + _publisherClient ??= PublisherServiceApiClient.Create(); + + + } + finally + { + _connectionLock.Release(); + } + } + + private void CreateTopic() + { + var credential = GoogleCredential.GetApplicationDefault(); + + var topicName = new TopicName(_options.Value.ProjectId, _options.Value.TopicId); + if (_publisherClient.ListTopics(new ProjectName(_options.Value.ProjectId)) + .All(x => x.TopicName != topicName)) + { + _publisherClient.CreateTopic(topicName); + } + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/CAP.EFOptions.cs b/src/DotNetCore.CAP.GoogleSpanner/CAP.EFOptions.cs new file mode 100644 index 000000000..b69be6b74 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/CAP.EFOptions.cs @@ -0,0 +1,22 @@ +using System; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public class EFOptions + { + public const string DefaultSchema = "cap"; + + /// + /// Gets or sets the schema to use when creating database objects. + /// Default is . + /// + public string Schema { get; set; } = DefaultSchema; + + internal Type DbContextType { get; set; } + + /// + /// Data version + /// + internal string Version { get; set; } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerCapOptionsExtension.cs b/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerCapOptionsExtension.cs new file mode 100644 index 000000000..b54840469 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerCapOptionsExtension.cs @@ -0,0 +1,28 @@ +using DotNetCore.CAP.Persistence; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using System; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public class GoogleSpannerCapOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public GoogleSpannerCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.Configure(_configure); + services.AddSingleton, ConfigureGoogleSpannerOptions>(); + + services.AddSingleton(); + services.AddSingleton(); + services.AddTransient(); + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerOptions.cs b/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerOptions.cs new file mode 100644 index 000000000..a882a9319 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/CAP.GoogleSpannerOptions.cs @@ -0,0 +1,46 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public class GoogleSpannerOptions : EFOptions + { + /// + /// The GCP Project ID. + /// + public string ProjectId { get; set; } + + public string InstanceId { get; set; } + + public string DatabaseId { get; set; } + + public bool IsEmulator { get; set; } = false; + + /// + /// Gets or sets the database's connection string that will be used to store database entities. + /// + public string ConnectionString { get; set; } + } + + internal class ConfigureGoogleSpannerOptions : IConfigureOptions + { + private readonly IServiceScopeFactory _serviceScopeFactory; + + public ConfigureGoogleSpannerOptions(IServiceScopeFactory serviceScopeFactory) + { + _serviceScopeFactory = serviceScopeFactory; + } + + public void Configure(GoogleSpannerOptions options) + { + if (options.DbContextType != null) + { + using var scope = _serviceScopeFactory.CreateScope(); + var provider = scope.ServiceProvider; + using var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType); + options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; + } + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.GoogleSpanner/CAP.Options.Extensions.cs new file mode 100644 index 000000000..8fd879b75 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/CAP.Options.Extensions.cs @@ -0,0 +1,45 @@ +using Microsoft.EntityFrameworkCore; +using System; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public static class CapOptionsExtensions + { + public static CapOptions UseGoogleSpanner(this CapOptions options, string connectionString) + { + return options.UseGoogleSpanner(opt => { opt.ConnectionString = connectionString; }); + } + + public static CapOptions UseGoogleSpanner(this CapOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + configure += x => x.Version = options.Version; + + options.RegisterExtension(new GoogleSpannerCapOptionsExtension(configure)); + + return options; + } + + public static CapOptions UseEntityFramework(this CapOptions options) + where TContext : DbContext + { + return options.UseEntityFramework(opt => { }); + } + + public static CapOptions UseEntityFramework(this CapOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new GoogleSpannerCapOptionsExtension(x => + { + configure(x); + x.Version = options.Version; + x.DbContextType = typeof(TContext); + })); + + return options; + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/DotNetCore.CAP.GoogleSpanner.csproj b/src/DotNetCore.CAP.GoogleSpanner/DotNetCore.CAP.GoogleSpanner.csproj new file mode 100644 index 000000000..c9a3617a0 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/DotNetCore.CAP.GoogleSpanner.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.1 + + + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.GoogleSpanner/ICapTransaction.GoogleSpanner.cs b/src/DotNetCore.CAP.GoogleSpanner/ICapTransaction.GoogleSpanner.cs new file mode 100644 index 000000000..c1e001ec1 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/ICapTransaction.GoogleSpanner.cs @@ -0,0 +1,142 @@ +using DotNetCore.CAP.Transport; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.DependencyInjection; +using System.Data; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public class GoogleSpannerCapTransaction : CapTransactionBase + { + public GoogleSpannerCapTransaction(IDispatcher dispatcher) : base(dispatcher) + { + } + + public override void Commit() + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + dbContextTransaction.Commit(); + break; + } + + Flush(); + } + + public override async Task CommitAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.CommitAsync(cancellationToken); + break; + } + + Flush(); + } + + public override void Rollback() + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + dbContextTransaction.Rollback(); + break; + } + } + + public override async Task RollbackAsync(CancellationToken cancellationToken = default) + { + Debug.Assert(DbTransaction != null); + + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.RollbackAsync(cancellationToken); + break; + } + } + + public override void Dispose() + { + (DbTransaction as IDbTransaction)?.Dispose(); + DbTransaction = null; + } + } + + public static class CapTransactionExtensions + { + public static ICapTransaction Begin(this ICapTransaction transaction, + IDbTransaction dbTransaction, bool autoCommit = false) + { + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + + public static ICapTransaction Begin(this ICapTransaction transaction, + IDbContextTransaction dbTransaction, bool autoCommit = false) + { + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// The object. + public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false) + { + if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); + + var dbTransaction = dbConnection.BeginTransaction(); + publisher.Transaction.Value = publisher.ServiceProvider.GetService(); + return publisher.Transaction.Value.Begin(dbTransaction, autoCommit); + } + + /// + /// Start the CAP transaction + /// + /// The . + /// The . + /// Whether the transaction is automatically committed when the message is published + /// The of EF DbContext transaction object. + public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, + ICapPublisher publisher, bool autoCommit = false) + { + var trans = database.BeginTransaction(); + publisher.Transaction.Value = publisher.ServiceProvider.GetService(); + var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); + return new CapEFDbTransaction(capTrans); + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/IDataStorage.GoogleSpanner.cs b/src/DotNetCore.CAP.GoogleSpanner/IDataStorage.GoogleSpanner.cs new file mode 100644 index 000000000..fc31069f4 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/IDataStorage.GoogleSpanner.cs @@ -0,0 +1,262 @@ +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Serialization; +using Google.Cloud.Spanner.Data; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.GoogleSpanner +{ + public class GoogleSpannerDataStorage : IDataStorage + { + private readonly IOptions _capOptions; + private readonly IStorageInitializer _initializer; + private readonly IOptions _options; + private readonly ISerializer _serializer; + private readonly string _pubName; + private readonly string _recName; + + public GoogleSpannerDataStorage( + IOptions options, + IOptions capOptions, + IStorageInitializer initializer, + ISerializer serializer) + { + _capOptions = capOptions; + _initializer = initializer; + _options = options; + _serializer = serializer; + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); + } + + public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) => + await ChangeMessageStateAsync(_pubName, message, state); + + public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) => + await ChangeMessageStateAsync(_recName, message, state); + + public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) + { + var sql = + $"INSERT INTO {_pubName} (Id,Version,Name,Content,Retries,Added,ExpiresAt,StatusName)" + + $"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + + var message = new MediumMessage + { + DbId = content.GetId(), + Origin = content, + Content = _serializer.Serialize(content), + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + var sqlParams = new SpannerParameterCollection() + { + { "Id", SpannerDbType.String, message.DbId }, + { "Name", SpannerDbType.String, name }, + { "Content", SpannerDbType.String, message.Content }, + { "Retries", SpannerDbType.Int64, message.Retries }, + { "Added", SpannerDbType.Timestamp, message.Added }, + { "ExpiresAt", SpannerDbType.Timestamp, message.ExpiresAt ?? (object)DBNull.Value }, + { "StatusName", SpannerDbType.String, nameof(StatusName.Scheduled)} + }; + + + try + { + if (dbTransaction == null) + { + using var connection = new SpannerConnection(_options.Value.ConnectionString); + var cmd = connection.CreateDmlCommand(sql, sqlParams); + cmd.ExecuteNonQuery(); + } + else + { + var dbTrans = dbTransaction as IDbTransaction; + if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) + dbTrans = dbContextTrans.GetDbTransaction(); + + using var connection = new SpannerConnection(_options.Value.ConnectionString); + var cmd = connection.CreateDmlCommand(sql, sqlParams); + cmd.Transaction = (DbTransaction)dbTrans; + cmd.ExecuteNonQuery(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + + return message; + } + + public void StoreReceivedExceptionMessage(string name, string group, string content) + { + var sqlParams = new SpannerParameterCollection() + { + { "Id", SpannerDbType.String, SnowflakeId.Default().NextId() }, + { "Name", SpannerDbType.String, name }, + { "GroupName", SpannerDbType.String, group }, + { "Content", SpannerDbType.String, content }, + { "Retries", SpannerDbType.Int64, _capOptions.Value.FailedRetryCount }, + { "Added", SpannerDbType.Timestamp, DateTime.Now }, + { "ExpiresAt", SpannerDbType.Timestamp, DateTime.Now.AddDays(15) }, + { "StatusName", SpannerDbType.String, nameof(StatusName.Failed)} + }; + + StoreReceivedMessage(sqlParams); + } + + public MediumMessage StoreReceivedMessage(string name, string group, Message message) + { + var mdMessage = new MediumMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Origin = message, + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + var sqlParams = new SpannerParameterCollection() + { + { "Id", SpannerDbType.String, mdMessage.DbId }, + { "Name", SpannerDbType.String, name }, + { "GroupName", SpannerDbType.String, group }, + { "Content", SpannerDbType.String, _serializer.Serialize(mdMessage.Origin) }, + { "Retries", SpannerDbType.Int64, mdMessage.Retries }, + { "Added", SpannerDbType.Timestamp, mdMessage.Added }, + { "ExpiresAt", SpannerDbType.Timestamp, mdMessage.ExpiresAt?? (object) DBNull.Value }, + { "StatusName", SpannerDbType.String, nameof(StatusName.Scheduled)} + }; + + StoreReceivedMessage(sqlParams); + return mdMessage; + } + + public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, + CancellationToken token = default) + { + long count = 0; + try + { + using var connection = new SpannerConnection(_options.Value.ConnectionString); + var sqlParams = new SpannerParameterCollection() + { + { "timeout", SpannerDbType.Timestamp, timeout }, + { "batchCount", SpannerDbType.Int64, batchCount }, + }; + var sql = $"DELETE FROM {table} WHERE Id IN (SELECT Id FROM {table} WHERE ExpiresAt < @timeout LIMIT @batchCount)"; + var cmd = connection.CreateDmlCommand(sql, sqlParams); + + count = await cmd.ExecuteNonQueryAsync(); + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + + return await Task.FromResult((int)count); + } + + public async Task> GetPublishedMessagesOfNeedRetry() => + await GetMessagesOfNeedRetryAsync(_pubName); + + public async Task> GetReceivedMessagesOfNeedRetry() => + await GetMessagesOfNeedRetryAsync(_recName); + + public IMonitoringApi GetMonitoringApi() + { + return new GoogleSpannerMonitoringApi(_options, _initializer); + } + + private async Task ChangeMessageStateAsync(string tableName, MediumMessage mdMessage, StatusName state) + { + var sql = + $"UPDATE {tableName} SET Content=@Content,Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + + try + { + var sqlParams = new SpannerParameterCollection() + { + { "Id", SpannerDbType.String, mdMessage.DbId }, + { "Content", SpannerDbType.String, _serializer.Serialize(mdMessage.Origin) }, + { "Retries", SpannerDbType.Int64, mdMessage.Retries }, + { "ExpiresAt", SpannerDbType.Timestamp, mdMessage.ExpiresAt }, + { "StatusName", SpannerDbType.String, state.ToString("G")} + }; + + var connection = new SpannerConnection(_options.Value.ConnectionString); + var cmd = connection.CreateDmlCommand(sql, sqlParams); + cmd.ExecuteNonQuery(); + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + + await Task.CompletedTask; + } + + private void StoreReceivedMessage(SpannerParameterCollection sqlParams) + { + var sql = + $"INSERT INTO {_recName}(Id,Version,Name,GroupName,Content,Retries,Added,ExpiresAt,StatusName)" + + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@GroupName,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + + try + { + using var connection = new SpannerConnection(_options.Value.ConnectionString); + var cmd = connection.CreateDmlCommand(sql, sqlParams); + cmd.ExecuteNonQuery(); + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + } + + private async Task> GetMessagesOfNeedRetryAsync(string tableName) + { + var messages = new List(); + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("s"); + var sql = + $"SELECT Id,Content,Retries,Added FROM {tableName} WHERE Retries<{_capOptions.Value.FailedRetryCount} " + + $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName='{StatusName.Failed}' OR StatusName='{StatusName.Scheduled}') LIMIT 200;"; + + try + { + using var connection = new SpannerConnection(_options.Value.ConnectionString); + var cmd = connection.CreateSelectCommand(sql); + + using var reader = await cmd.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + messages.Add(new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Origin = _serializer.Deserialize(reader.GetString(1)), + Retries = reader.GetInt32(2), + Added = Convert.ToDateTime(reader.GetString(3)) + }); + } + } + catch (Exception ex) + { + Console.WriteLine(ex); + } + + return messages; + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/IDbContextTransaction.CAP.cs b/src/DotNetCore.CAP.GoogleSpanner/IDbContextTransaction.CAP.cs new file mode 100644 index 000000000..3b4788f29 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/IDbContextTransaction.CAP.cs @@ -0,0 +1,51 @@ +using Microsoft.EntityFrameworkCore.Storage; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.GoogleSpanner +{ + internal class CapEFDbTransaction : IDbContextTransaction + { + private readonly ICapTransaction _transaction; + + public CapEFDbTransaction(ICapTransaction transaction) + { + _transaction = transaction; + var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; + TransactionId = dbContextTransaction.TransactionId; + } + + public void Dispose() + { + _transaction.Dispose(); + } + + public void Commit() + { + _transaction.Commit(); + } + + public void Rollback() + { + _transaction.Rollback(); + } + + public Task CommitAsync(CancellationToken cancellationToken = default) + { + return _transaction.CommitAsync(cancellationToken); + } + + public Task RollbackAsync(CancellationToken cancellationToken = default) + { + return _transaction.RollbackAsync(cancellationToken); + } + + public Guid TransactionId { get; } + + public ValueTask DisposeAsync() + { + return new ValueTask(Task.Run(() => _transaction.Dispose())); + } + } +} diff --git a/src/DotNetCore.CAP.GoogleSpanner/IMonitoringApi.GoogleSpanner.cs b/src/DotNetCore.CAP.GoogleSpanner/IMonitoringApi.GoogleSpanner.cs new file mode 100644 index 000000000..2a612faf5 --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/IMonitoringApi.GoogleSpanner.cs @@ -0,0 +1,273 @@ +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using Google.Cloud.Spanner.Data; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.GoogleSpanner +{ + internal class GoogleSpannerMonitoringApi : IMonitoringApi + { + private readonly GoogleSpannerOptions _options; + private readonly string _pubName; + private readonly string _recName; + + public GoogleSpannerMonitoringApi(IOptions options, IStorageInitializer initializer) + { + _options = options.Value ?? throw new ArgumentNullException(nameof(options)); + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); + } + + public async Task GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); + + public async Task GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); + + public StatisticsDto GetStatistics() + { + var sql = $@" + SELECT + ( + SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = 'Succeeded' + ) AS PublishedSucceeded, + ( + SELECT COUNT(Id) FROM {_recName} WHERE StatusName = 'Succeeded' + ) AS ReceivedSucceeded, + ( + SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = 'Failed' + ) AS PublishedFailed, + ( + SELECT COUNT(Id) FROM {_recName} WHERE StatusName = 'Failed' + ) AS ReceivedFailed"; + + StatisticsDto statistics = new StatisticsDto(); + using (var connection = new SpannerConnection(_options.ConnectionString)) + { + var cmd = connection.CreateSelectCommand(sql); + using (var reader = cmd.ExecuteReader()) + { + while (reader.Read()) + { + statistics.PublishedSucceeded = reader.GetInt32(0); + statistics.ReceivedSucceeded = reader.GetInt32(1); + statistics.PublishedFailed = reader.GetInt32(2); + statistics.ReceivedFailed = reader.GetInt32(3); + } + } + } + return statistics; + } + + public PagedQueryResult Messages(MessageQueryDto queryDto) + { + var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName; + var where = string.Empty; + + if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(StatusName) = Lower(@StatusName)"; + + if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Lower(Name) = Lower(@Name)"; + + if (!string.IsNullOrEmpty(queryDto.Group)) where += " and Lower(GroupName) = Lower(@Group)"; + + if (!string.IsNullOrEmpty(queryDto.Content)) where += " and Content Like @Content"; + + var sqlQuery = + $"select * from {tableName} where 1=1 {where} order by Added desc limit @Limit offset @Offset"; + + var sqlParams = new SpannerParameterCollection() + { + { "StatusName", SpannerDbType.String, queryDto.StatusName ?? string.Empty }, + { "GroupName", SpannerDbType.String, queryDto.Group ?? string.Empty }, + { "Name", SpannerDbType.String, queryDto.Name ?? string.Empty }, + { "Content", SpannerDbType.String, $"'%{queryDto.Content}%'" }, + { "Offset", SpannerDbType.Int64, queryDto.CurrentPage * queryDto.PageSize}, + { "Limit", SpannerDbType.Int64, queryDto.PageSize} + }; + + var messages = new List(); + long count = 0; + using (var connection = new SpannerConnection(_options.ConnectionString)) + { + var countCmd = connection.CreateSelectCommand($"select count(1) from {tableName} where 1=1 {where}", + new SpannerParameterCollection() + { + { "StatusName", SpannerDbType.String, queryDto.StatusName ?? string.Empty }, + { "GroupName", SpannerDbType.String, queryDto.Group ?? string.Empty }, + { "Name", SpannerDbType.String, queryDto.Name ?? string.Empty }, + { "Content", SpannerDbType.String, $"'%{queryDto.Content}%'" }, + { "Offset", SpannerDbType.Int64, queryDto.CurrentPage * queryDto.PageSize}, + { "Limit", SpannerDbType.Int64, queryDto.PageSize} + }); + count = (long)countCmd.ExecuteScalar(); + + var cmd = connection.CreateSelectCommand(sqlQuery, sqlParams); + + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + var index = 0; + messages.Add(new MessageDto + { + Id = reader.GetString(index++), + Version = reader.GetString(index++), + Name = reader.GetString(index++), + Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, + Content = reader.GetString(index++), + Retries = reader.GetInt32(index++), + Added = reader.GetDateTime(index++), + ExpiresAt = reader.GetDateTime(index++), + StatusName = reader.GetString(index) + }); + } + } + return new PagedQueryResult { Items = messages, PageIndex = queryDto.CurrentPage, PageSize = queryDto.PageSize, Totals = count }; + } + + public int PublishedFailedCount() + { + return GetNumberOfMessage(_pubName, nameof(StatusName.Failed)); + } + + public int PublishedSucceededCount() + { + return GetNumberOfMessage(_pubName, nameof(StatusName.Succeeded)); + } + + public int ReceivedFailedCount() + { + return GetNumberOfMessage(_recName, nameof(StatusName.Failed)); + } + + public int ReceivedSucceededCount() + { + return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded)); + } + + public IDictionary HourlySucceededJobs(MessageType type) + { + var tableName = type == MessageType.Publish ? _pubName : _recName; + return GetHourlyTimelineStats(tableName, nameof(StatusName.Succeeded)); + } + + public IDictionary HourlyFailedJobs(MessageType type) + { + var tableName = type == MessageType.Publish ? _pubName : _recName; + return GetHourlyTimelineStats(tableName, nameof(StatusName.Failed)); + } + + private int GetNumberOfMessage(string tableName, string statusName) + { + var sqlQuery = + $"select count(Id) from {tableName} where Lower(StatusName) = Lower(@state)"; + + var sqlParams = new SpannerParameterCollection() + { + { "state", SpannerDbType.String, statusName }, + }; + + using var connection = new SpannerConnection(_options.ConnectionString); + var cmd = connection.CreateSelectCommand(sqlQuery, sqlParams); + var count = cmd.ExecuteScalar(); + return (int)count; + } + + private Dictionary GetHourlyTimelineStats(string tableName, string statusName) + { + var endDate = DateTime.Now; + var dates = new List(); + for (var i = 0; i < 24; i++) + { + dates.Add(endDate); + endDate = endDate.AddHours(-1); + } + + var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x); + + return GetTimelineStats(tableName, statusName, keyMaps); + } + + private Dictionary GetTimelineStats( + string tableName, + string statusName, + IDictionary keyMaps) + { + var sqlQuery = + $@" +with aggr as ( + select FORMAT_TIMESTAMP('%F-%H', Added, 'UTC') as Key, + count(Id) as Count + from {tableName} + where StatusName = 'Succeeded' + group by FORMAT_TIMESTAMP('%F-%H', Added, 'UTC') +) +select Key,Count from aggr where Key >= @minKey and Key <= @maxKey"; + + var sqlParams = new SpannerParameterCollection() + { + { "statusName", SpannerDbType.String, statusName }, + { "minKey", SpannerDbType.String, keyMaps.Keys.Min() }, + { "maxKey", SpannerDbType.String, keyMaps.Keys.Max() }, + }; + + Dictionary valuesMap = new Dictionary(); + + using (var connection = new SpannerConnection(_options.ConnectionString)) + { + var cmd = connection.CreateSelectCommand(sqlQuery, sqlParams); + using(var reader = cmd.ExecuteReader()) + { + while (reader.Read()) + { + valuesMap.Add(reader.GetString(0), reader.GetInt32(1)); + } + } + } + + foreach (var key in keyMaps.Keys) + { + if (!valuesMap.ContainsKey(key)) + valuesMap.Add(key, 0); + } + + var result = new Dictionary(); + for (var i = 0; i < keyMaps.Count; i++) + { + var value = valuesMap[keyMaps.ElementAt(i).Key]; + result.Add(keyMaps.ElementAt(i).Value, value); + } + + return result; + } + + private async Task GetMessageAsync(string tableName, long id) + { + var sql = $@"SELECT Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WHERE Id={id}"; + + MediumMessage message = null; + using var connection = new SpannerConnection(_options.ConnectionString); + var cmd = connection.CreateSelectCommand(sql); + using(var reader = await cmd.ExecuteReaderAsync()) + { + while (reader.Read()) + { + message = new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Content = reader.GetString(1), + Added = reader.GetDateTime(2), + ExpiresAt = reader.GetDateTime(3), + Retries = reader.GetInt32(4) + }; + } + } + + return message; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.GoogleSpanner/IStorageInitializer.GoogleSpanner.cs b/src/DotNetCore.CAP.GoogleSpanner/IStorageInitializer.GoogleSpanner.cs new file mode 100644 index 000000000..14074692a --- /dev/null +++ b/src/DotNetCore.CAP.GoogleSpanner/IStorageInitializer.GoogleSpanner.cs @@ -0,0 +1,105 @@ +using DotNetCore.CAP.Persistence; +using Google.Cloud.Spanner.Data; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.GoogleSpanner +{ + internal class GoogleSpannerStorageInitializer : IStorageInitializer + { + private readonly ILogger _logger; + private readonly IOptions _options; + + public GoogleSpannerStorageInitializer( + ILogger logger, + IOptions options) + { + _options = options; + _logger = logger; + } + + public virtual string GetPublishedTableName() + { + return $"{_options.Value.Schema}_published"; + } + + public virtual string GetReceivedTableName() + { + return $"{_options.Value.Schema}_received"; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var sql = CreatePublishedTableScript(_options.Value.Schema); + using (var connection = new SpannerConnection(_options.Value.ConnectionString)) + { + try + { + var cmd = connection.CreateDdlCommand(sql); + await cmd.ExecuteNonQueryAsync(); + } + catch (SpannerException e) when (e.ErrorCode == ErrorCode.FailedPrecondition) + { + // Table already exist. Not a problem. + } + } + + sql = CreateReceivedTableScript(_options.Value.Schema); + using (var connection = new SpannerConnection(_options.Value.ConnectionString)) + { + try + { + var cmd = connection.CreateDdlCommand(sql); + await cmd.ExecuteNonQueryAsync(); + } + catch (SpannerException e) when (e.ErrorCode == ErrorCode.FailedPrecondition) + { + // Table already exist. Not a problem. + } + } + + await Task.CompletedTask; + + _logger.LogDebug("Ensuring all create database tables script are applied."); + } + + protected virtual string CreatePublishedTableScript(string schema) + { + //CREATE SCHEMA IF NOT EXISTS ""{schema}""; + var batchSql = $@" + CREATE TABLE {GetPublishedTableName()} ( + Id STRING(50) NOT NULL, + Version STRING(20), + Name STRING(50), + Content STRING(MAX), + Retries INT64 NOT NULL, + Added TIMESTAMP NOT NULL, + ExpiresAt TIMESTAMP, + StatusName STRING(50) + )PRIMARY KEY (Id)"; + return batchSql; + } + + protected virtual string CreateReceivedTableScript(string schema) + { + //CREATE SCHEMA IF NOT EXISTS ""{schema}""; + var batchSql = $@" + CREATE TABLE {GetReceivedTableName()} ( + Id STRING(50) NOT NULL, + Version STRING(20), + Name STRING(50), + GroupName STRING(50), + Content STRING(MAX), + Retries INT64 NOT NULL, + Added TIMESTAMP NOT NULL, + ExpiresAt TIMESTAMP, + StatusName STRING(50) + )PRIMARY KEY (Id)"; + return batchSql; + } + } +} \ No newline at end of file