Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC - Add the DataAdapter pattern to the SQS Stream Provider #8723

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

jamescarter-le
Copy link
Contributor

@jamescarter-le jamescarter-le commented Nov 13, 2023

The EventHub StreamProvider has a concept of a DataAdapter, where the user can inject their own implemention to parse messages for the queue, specifically for the purpose of picking up messages from sources that are not Orleans, and have not already passed through the StreamProvider.

I am attempting to implement this in the SQS provider, as I need similar functionality.

Request for comments / code review on this PR, I need to pull some more data out to configuration still.

The first commit was required to get the SQS tests passing, before I moved on to the commit with the Data Adapter change.

Contract changes:

  • SQSBatchContainer changed from internal to public (as the DataAdapter needs to construct one, this is crux of the change).
  • SQSBatchContainer.ctor changed from internal to public as it is the only method of passing a SequenceToken to the object.

Alternative approach, would be to have the consumer implement an IBatchContainer but it would be nice to have a default one.

Microsoft Reviewers: Open in CodeFlow

@jamescarter-le
Copy link
Contributor Author

Could a core member check if the SQS tests are actually getting run in CI? If you pull master and run AWSUtils.Tests.Streaming.SQSAdapterTests it will fail before my Fix SQS Tests commit but CI pipeline reports it's all fine.

@benjaminpetit benjaminpetit self-assigned this Nov 13, 2023
@jamescarter-le jamescarter-le force-pushed the request/SQS-DataAdapter branch from f4306cc to b8aface Compare January 30, 2024 10:52
@jamescarter-le
Copy link
Contributor Author

Just rebased this ontop of main for Orleans 8.0.0

Would appreciate someone looking to see if we can merge this PR.

@jamescarter-le
Copy link
Contributor Author

@benjaminpetit Any possibility of getting this change into Orleans 8.1.0?

@jamescarter-le jamescarter-le force-pushed the request/SQS-DataAdapter branch from 5087395 to f4bb41f Compare February 29, 2024 11:52
@jamescarter-le jamescarter-le force-pushed the request/SQS-DataAdapter branch from 0b40d64 to 519eb4d Compare May 23, 2024 10:57
@jamescarter-le
Copy link
Contributor Author

Hey Orleans team,

Is there something that is designed wrong in this PR that is preventing us from pulling it into Orleans?

I would really like to get away from having to rebase on each Orleans release and maintaining a fork for our apps.

I'm sure being able to support Native -> Orleans streams via SQS would be useful for others.

Let me know if I need to alter this PR in any way to get it included.

@jamescarter-le jamescarter-le force-pushed the request/SQS-DataAdapter branch from 519eb4d to 483cda0 Compare November 5, 2024 12:08
@jamescarter-le
Copy link
Contributor Author

Just added some changes to partition the IQueueCache when using SQS FIFO based queues, to allow the SQS SequenceNumber that is aligned by MessageGroupId, to be used for the SequenceToken, considering the limitations of the SimpleQueueCache.

@jamescarter-le
Copy link
Contributor Author

@OrleansTeam if there is no intention of accepting this PR or changes to the AWS extensions, let me know and I can make this a separate package, instead of having to continually rebase these changes when I want to upgrade my own Orleans versions, and others may find it useful.

/// </summary>
public int? VisibilityTimeoutSeconds { get; set; }

public bool FifoQueue { get; set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the doc for this new flag?

/// <param name="serviceId">The service ID</param>
public SQSStorage(ILoggerFactory loggerFactory, string queueName, string connectionString, string serviceId = "")
public SQSStorage(ILoggerFactory loggerFactory, string queueName, SqsOptions sqsOptions, string serviceId = "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note for later, not to do in this PR)

We should inject the SQS Client directly here

@@ -221,7 +304,7 @@ public async Task DeleteMessage(SQSMessage message)
if (string.IsNullOrWhiteSpace(queueUrl))
throw new InvalidOperationException("Queue not initialized");

await sqsClient.DeleteMessageAsync(
var result = await sqsClient.DeleteMessageAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used?

var result = await sqsClient.DeleteMessageBatchAsync(deleteRequest);
foreach (var failed in result.Failed)
{
Logger.LogWarning("Failed to delete message {MessageId} from SQS queue {QueueName}. Error code: {ErrorCode}. Error message: {ErrorMessage}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: can you update to use a message template

{
[JsonProperty]
[Orleans.Id(0)]
private EventSequenceTokenV2 sequenceToken;
private StreamSequenceToken sequenceToken;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine but we ahould really double check that it doesn't break backward compat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants