Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial cut of AWS Kinesis streaming #8967

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="1.0.0-beta13" />
<PackageVersion Include="AWSSDK.DynamoDBv2" Version="3.7.300.6" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.300" />
<PackageVersion Include="AWSSDK.Kinesis" Version="3.7.301.73" />
<PackageVersion Include="Consul" Version="1.6.10.9" />
<PackageVersion Include="Google.Protobuf" Version="3.24.4" />
<PackageVersion Include="protobuf-net" Version="3.2.26" />
Expand Down
7 changes: 7 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.AdoNet",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.Kinesis", "src\AWS\Orleans.Streaming.Kinesis\Orleans.Streaming.Kinesis.csproj", "{BD85A85E-3BDE-4FAE-A705-E68A5056A828}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -593,6 +595,10 @@ Global
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.Build.0 = Release|Any CPU
{BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -702,6 +708,7 @@ Global
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{2B994F33-16CF-4679-936A-5AEABC529D2C} = {EB2EDE59-5021-42EE-A97A-D59939B39C66}
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA} = {2CAB7894-777C-42B1-8B1E-322868CE92C7}
{BD85A85E-3BDE-4FAE-A705-E68A5056A828} = {DA8E126B-BCDB-4E8F-BFB9-2DBFD41F8F70}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using Orleans.Streaming.Kinesis;

namespace Orleans.Hosting
{
public static class ClientBuilderExtensions
{
/// <summary>
/// Configure cluster client to use Kinesis Data Stream persistent streams with default settings
/// </summary>
public static IClientBuilder AddKinesisStreams(this IClientBuilder builder, string name, Action<KinesisStreamOptions> configureOptions)
{
builder.AddKinesisStreams(name, b =>
b.ConfigureKinesis(ob => ob.Configure(configureOptions)));
return builder;
}

/// <summary>
/// Configure cluster client to use Kinesis Data Stream persistent streams.
/// </summary>
public static IClientBuilder AddKinesisStreams(this IClientBuilder builder, string name, Action<ClusterClientKinesisStreamConfigurator> configure)
{
var configurator = new ClusterClientKinesisStreamConfigurator(name, builder);
configure?.Invoke(configurator);
return builder;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Streaming.Kinesis;

namespace Orleans.Hosting
{
public class ClusterClientKinesisStreamConfigurator : ClusterClientPersistentStreamConfigurator
{
public ClusterClientKinesisStreamConfigurator(string name, IClientBuilder builder)
: base(name, builder, KinesisAdapterFactory.Create)
{
this.ConfigureDelegate(services =>
{
services.ConfigureNamedOptionForLogging<KinesisStreamOptions>(name)
.ConfigureNamedOptionForLogging<HashRingStreamQueueMapperOptions>(name);
});
}

public ClusterClientKinesisStreamConfigurator ConfigureKinesis(Action<OptionsBuilder<KinesisStreamOptions>> configureOptions)
{
this.Configure(configureOptions);
return this;
}

public ClusterClientKinesisStreamConfigurator ConfigureKinesis(Action<KinesisStreamOptions> configureOptions)
{
this.ConfigureKinesis(ob => ob.Configure(configureOptions));
return this;
}
}
}
29 changes: 29 additions & 0 deletions src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using Orleans.Streaming.Kinesis;

namespace Orleans.Hosting
{
public static class SiloBuilderExtensions
{
/// <summary>
/// Configure silo to use Kinesis Data Stream streaming with default settings.
/// </summary>
public static ISiloBuilder AddKinesisStreams(this ISiloBuilder builder, string name, Action<KinesisStreamOptions> configureOptions)
{
builder.AddKinesisStreams(name, b =>
b.ConfigureKinesis(ob => ob.Configure(configureOptions)));
return builder;
}

/// <summary>
/// Configure silo to use Kinesis Data Stream streaming.
/// </summary>
public static ISiloBuilder AddKinesisStreams(this ISiloBuilder builder, string name, Action<SiloKinesisStreamConfigurator> configure)
{
var configurator = new SiloKinesisStreamConfigurator(name,
configureServicesDelegate => builder.ConfigureServices(configureServicesDelegate));
configure?.Invoke(configurator);
return builder;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Streaming.Kinesis;
using Orleans.Streams;

namespace Orleans.Hosting
{
public class SiloKinesisStreamConfigurator : SiloPersistentStreamConfigurator
{
public SiloKinesisStreamConfigurator(string name, Action<Action<IServiceCollection>> configureServicesDelegate)
: base(name, configureServicesDelegate, KinesisAdapterFactory.Create)
{
this.ConfigureDelegate(services =>
{
services.ConfigureNamedOptionForLogging<KinesisStreamOptions>(name)
.ConfigureNamedOptionForLogging<SimpleQueueCacheOptions>(name)
.ConfigureNamedOptionForLogging<HashRingStreamQueueMapperOptions>(name)
.AddTransient<IConfigurationValidator>(sp => new StreamCheckpointerConfigurationValidator(sp, name));
});
}

public SiloKinesisStreamConfigurator ConfigureKinesis(Action<OptionsBuilder<KinesisStreamOptions>> configureOptions)
{
this.Configure(configureOptions);
return this;
}

public SiloKinesisStreamConfigurator ConfigureKinesis(Action<KinesisStreamOptions> configureOptions)
{
this.ConfigureKinesis(ob => ob.Configure(configureOptions));
return this;
}

public SiloKinesisStreamConfigurator ConfigureCheckpointer<TOptions>(
Func<IServiceProvider, string, IStreamQueueCheckpointerFactory> checkpointerFactoryBuilder,
Action<OptionsBuilder<TOptions>> configureOptions)
where TOptions : class, new()
{
this.ConfigureComponent(checkpointerFactoryBuilder, configureOptions);
return this;
}
}
}
21 changes: 21 additions & 0 deletions src/AWS/Orleans.Streaming.Kinesis/Orleans.Streaming.Kinesis.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Microsoft.Orleans.Streaming.Kinesis</PackageId>
<Title>Microsoft Orleans AWS Kinesis Streaming Provider</Title>
<Description>Microsoft Orleans streaming provider backed by AWS Kinesis</Description>
<PackageTags>$(PackageTags) AWS Kinesis</PackageTags>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<DefineConstants>$(DefineConstants);STREAMING_KINESIS</DefineConstants>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Shared\AWSUtils.cs" Link="AWSUtils.cs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="$(SourceRoot)src\Orleans.Streaming\Orleans.Streaming.csproj" />
<PackageReference Include="AWSSDK.Kinesis" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions src/AWS/Orleans.Streaming.Kinesis/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("AWSUtils.Tests")]
175 changes: 175 additions & 0 deletions src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Runtime;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Streams;

namespace Orleans.Streaming.Kinesis
{
/// <summary>
/// Queue adapter factory which allows the PersistentStreamProvider to use AWS Kinesis Data Streams as its backend persistent event queue.
/// </summary>
internal class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter
{
private readonly KinesisStreamOptions _options;
private readonly Serializer<KinesisBatchContainer.Body> _serializer;
private readonly IStreamQueueCheckpointerFactory _checkpointerFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly IQueueAdapterCache _adapterCache;
private readonly ILogger<KinesisAdapterFactory> _logger;
private readonly Func<string[], HashRingBasedPartitionedStreamQueueMapper> _queueMapperFactory;
private readonly AmazonKinesisClient _client;

private HashRingBasedPartitionedStreamQueueMapper _streamQueueMapper;

public KinesisAdapterFactory(
string name,
KinesisStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
Serializer<KinesisBatchContainer.Body> serializer,
IStreamQueueCheckpointerFactory checkpointerFactory,
ILoggerFactory loggerFactory
)
{
_options = options ?? throw new ArgumentNullException(nameof(options));

Name = name;
_serializer = serializer;
_checkpointerFactory = checkpointerFactory;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<KinesisAdapterFactory>();

_adapterCache = new SimpleQueueAdapterCache(
cacheOptions,
name,
loggerFactory
);

_queueMapperFactory = partitions => new HashRingBasedPartitionedStreamQueueMapper(partitions, Name);
_client = CreateClient();
}

public string Name { get; }

public bool IsRewindable => false;

public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;

public static KinesisAdapterFactory Create(IServiceProvider services, string name)
{
var streamsConfig = services.GetOptionsByName<KinesisStreamOptions>(name);
var cacheOptions = services.GetOptionsByName<SimpleQueueCacheOptions>(name);
var serializer = services.GetRequiredService<Serializer<KinesisBatchContainer.Body>>();
var checkpointerFactory = services.GetRequiredKeyedService<IStreamQueueCheckpointerFactory>(name);
var logger = services.GetRequiredService<ILoggerFactory>();

var factory = ActivatorUtilities.CreateInstance<KinesisAdapterFactory>(
services,
name,
streamsConfig,
cacheOptions,
serializer,
checkpointerFactory,
logger
);

return factory;
}

public async Task<IQueueAdapter> CreateAdapter()
{
if (_streamQueueMapper is null)
{
var kinesisStreams = await GetPartitionIdsAsync();
_streamQueueMapper = _queueMapperFactory(kinesisStreams);
}

return this;
}

public IQueueAdapterCache GetQueueAdapterCache()
=> _adapterCache;

public IStreamQueueMapper GetStreamQueueMapper()
=> _streamQueueMapper;

public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
=> Task.FromResult<IStreamFailureHandler>(new NoOpStreamDeliveryFailureHandler(false));

public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
var data = KinesisBatchContainer.ToKinesisPayload(_serializer, streamId, events, requestContext);

var putRecordRequest = new PutRecordRequest
{
StreamName = _options.StreamName,
Data = new MemoryStream(data),
PartitionKey = streamId.GetKeyAsString(),
};

_ = await _client.PutRecordAsync(putRecordRequest);
}

public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
var partition = _streamQueueMapper.QueueToPartition(queueId);

return new KinesisAdapterReceiver(
CreateClient(),
_options.StreamName,
partition,
_checkpointerFactory,
_serializer,
_loggerFactory
);
}

internal AmazonKinesisClient CreateClient()
{
if (_options.Service.StartsWith("http://", StringComparison.OrdinalIgnoreCase) ||
_options.Service.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
{
// Local Kinesis instance (for testing)
var credentials = !string.IsNullOrEmpty(_options.AccessKey) && !string.IsNullOrEmpty(_options.SecretKey) ?
new BasicAWSCredentials(_options.AccessKey, _options.SecretKey) :
new BasicAWSCredentials("dummy", "dummyKey");

return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { ServiceURL = _options.Service });
}
else if (!string.IsNullOrEmpty(_options.AccessKey) && !string.IsNullOrEmpty(_options.SecretKey))
{
// AWS Kinesis instance (auth via explicit credentials)
var credentials = new BasicAWSCredentials(_options.AccessKey, _options.SecretKey);
return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
else
{
// AWS Kinesis instance (implicit auth - EC2 IAM Roles etc)
return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
}

internal async Task<string[]> GetPartitionIdsAsync()
{
var request = new ListShardsRequest
{
StreamName = _options.StreamName,
};

var response = await _client.ListShardsAsync(request);

return response.Shards.Select(s => s.ShardId).ToArray();
}
}
}
Loading
Loading