-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC - Add the DataAdapter pattern to the SQS Stream Provider #8723
base: main
Are you sure you want to change the base?
RFC - Add the DataAdapter pattern to the SQS Stream Provider #8723
Conversation
Could a core member check if the SQS tests are actually getting run in CI? If you pull master and run |
f4306cc
to
b8aface
Compare
Just rebased this ontop of main for Orleans 8.0.0 Would appreciate someone looking to see if we can merge this PR. |
@benjaminpetit Any possibility of getting this change into Orleans 8.1.0? |
5087395
to
f4bb41f
Compare
0b40d64
to
519eb4d
Compare
Hey Orleans team, Is there something that is designed wrong in this PR that is preventing us from pulling it into Orleans? I would really like to get away from having to rebase on each Orleans release and maintaining a fork for our apps. I'm sure being able to support Native -> Orleans streams via SQS would be useful for others. Let me know if I need to alter this PR in any way to get it included. |
SqsOptions are now passed down to SQSStorage to be accessible
519eb4d
to
483cda0
Compare
Just added some changes to partition the |
@OrleansTeam if there is no intention of accepting this PR or changes to the AWS extensions, let me know and I can make this a separate package, instead of having to continually rebase these changes when I want to upgrade my own Orleans versions, and others may find it useful. |
/// </summary> | ||
public int? VisibilityTimeoutSeconds { get; set; } | ||
|
||
public bool FifoQueue { get; set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the doc for this new flag?
/// <param name="serviceId">The service ID</param> | ||
public SQSStorage(ILoggerFactory loggerFactory, string queueName, string connectionString, string serviceId = "") | ||
public SQSStorage(ILoggerFactory loggerFactory, string queueName, SqsOptions sqsOptions, string serviceId = "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Note for later, not to do in this PR)
We should inject the SQS Client directly here
@@ -221,7 +304,7 @@ public async Task DeleteMessage(SQSMessage message) | |||
if (string.IsNullOrWhiteSpace(queueUrl)) | |||
throw new InvalidOperationException("Queue not initialized"); | |||
|
|||
await sqsClient.DeleteMessageAsync( | |||
var result = await sqsClient.DeleteMessageAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used?
var result = await sqsClient.DeleteMessageBatchAsync(deleteRequest); | ||
foreach (var failed in result.Failed) | ||
{ | ||
Logger.LogWarning("Failed to delete message {MessageId} from SQS queue {QueueName}. Error code: {ErrorCode}. Error message: {ErrorMessage}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: can you update to use a message template
{ | ||
[JsonProperty] | ||
[Orleans.Id(0)] | ||
private EventSequenceTokenV2 sequenceToken; | ||
private StreamSequenceToken sequenceToken; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine but we ahould really double check that it doesn't break backward compat
The EventHub StreamProvider has a concept of a DataAdapter, where the user can inject their own implemention to parse messages for the queue, specifically for the purpose of picking up messages from sources that are not Orleans, and have not already passed through the StreamProvider.
I am attempting to implement this in the SQS provider, as I need similar functionality.
Request for comments / code review on this PR, I need to pull some more data out to configuration still.
The first commit was required to get the SQS tests passing, before I moved on to the commit with the Data Adapter change.
Contract changes:
internal
topublic
(as the DataAdapter needs to construct one, this is crux of the change).internal
topublic
as it is the only method of passing a SequenceToken to the object.Alternative approach, would be to have the consumer implement an
IBatchContainer
but it would be nice to have a default one.Microsoft Reviewers: Open in CodeFlow