Skip to content

Commit

Permalink
Add specific StreamSequenceToken to support AWS SQS FIFO SequenceNumber
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescarter-le committed May 23, 2024
1 parent 67c69c0 commit 519eb4d
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 4 deletions.
4 changes: 4 additions & 0 deletions src/AWS/Orleans.Streaming.SQS/Storage/SQSStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public async Task InitQueueAsync()
{ QueueAttributeName.DeduplicationScope, "messageGroup" },
{ QueueAttributeName.ContentBasedDeduplication, "true" },
};

// We require to bring down the AWS set SequenceNumber when on a FIFO queue
// in order to populate the SQSFIFOSequenceToken from it.
sqsOptions.ReceiveAttributes.Add("SequenceNumber");
}

if (sqsOptions.ReceiveWaitTimeSeconds.HasValue)
Expand Down
8 changes: 8 additions & 0 deletions src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Streaming.SQS.Streams;
using System.Linq;

namespace OrleansAWSUtils.Streams
{
Expand Down Expand Up @@ -59,6 +60,13 @@ public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> ev

var sqsMessage = dataAdapter.ToQueueMessage(streamId, events, token, requestContext);
var sqsRequest = new SendMessageRequest(string.Empty, sqsMessage.Body);

if (this.sqsOptions.FifoQueue)
{
// Ensure the SQS Queue ensures FIFO order of messages over this QueueId.
sqsRequest.MessageGroupId = queueId.ToString();
}

foreach (var attr in sqsMessage.Attributes)
{
sqsRequest.MessageAttributes.Add(attr.Key, new MessageAttributeValue
Expand Down
23 changes: 19 additions & 4 deletions src/AWS/Orleans.Streaming.SQS/Streams/SQSBatchContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class SQSBatchContainer : IBatchContainer
{
[JsonProperty]
[Orleans.Id(0)]
private EventSequenceTokenV2 sequenceToken;
private StreamSequenceToken sequenceToken;

[JsonProperty]
[Orleans.Id(1)]
Expand Down Expand Up @@ -45,7 +45,7 @@ public SQSBatchContainer(
StreamId streamId,
List<object> events,
Dictionary<string, object> requestContext,
EventSequenceTokenV2 sequenceToken)
StreamSequenceToken sequenceToken)
: this(streamId, events, requestContext)
{
this.sequenceToken = sequenceToken;
Expand All @@ -62,7 +62,17 @@ public SQSBatchContainer(StreamId streamId, List<object> events, Dictionary<stri

public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
return events.OfType<T>().Select((e, i) => Tuple.Create<T, StreamSequenceToken>(e, sequenceToken.CreateSequenceTokenForEvent(i)));
static StreamSequenceToken CreateStreamSequenceToken(StreamSequenceToken tok, int eventIndex)
{
return tok switch
{
EventSequenceTokenV2 v2Tok => v2Tok.CreateSequenceTokenForEvent(eventIndex),
SQSFIFOSequenceToken fifoTok => fifoTok.CreateSequenceTokenForEvent(eventIndex),
_ => throw new NotSupportedException("Unknown SequenceToken provided.")
};
}

return events.OfType<T>().Select((e, i) => Tuple.Create<T, StreamSequenceToken>(e, CreateStreamSequenceToken(sequenceToken, i)));
}

internal static SQSMessage ToSQSMessage<T>(
Expand All @@ -88,7 +98,12 @@ internal static SQSBatchContainer FromSQSMessage(Serializer<SQSBatchContainer> s
var json = JObject.Parse(msg.Body);
var sqsBatch = serializer.Deserialize(json["payload"].ToObject<byte[]>());
sqsBatch.Message = msg;
sqsBatch.sequenceToken = new EventSequenceTokenV2(sequenceNumber);

if(msg.Attributes.TryGetValue("SequenceNumber", out var fifoSeqNum))
sqsBatch.sequenceToken = new SQSFIFOSequenceToken(UInt128.Parse(fifoSeqNum));
else
sqsBatch.sequenceToken = new EventSequenceTokenV2(sequenceNumber);

return sqsBatch;
}

Expand Down
121 changes: 121 additions & 0 deletions src/AWS/Orleans.Streaming.SQS/Streams/SQSFIFOSequenceToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Globalization;
using Newtonsoft.Json;
using Orleans;
using Orleans.Streams;

namespace OrleansAWSUtils.Streams
{
/// <summary>
/// Stream sequence token that tracks sequence number and event index
/// </summary>
[Serializable]
[GenerateSerializer]
public class SQSFIFOSequenceToken : StreamSequenceToken
{
/// <summary>
/// Gets the number of event batches in stream prior to this event batch
/// </summary>
[Id(0)]
[JsonProperty]
public UInt128 SqsSequenceNumber { get; set; }

/// <summary>
/// Gets the number of event batches in stream prior to this event batch
/// </summary>
public override long SequenceNumber
{
get => throw new NotSupportedException();
protected set => throw new NotSupportedException();
}

/// <summary>
/// Gets the number of events in batch prior to this event
/// </summary>
[Id(1)]
[JsonProperty]
public override int EventIndex { get; protected set; }

/// <summary>
/// Initializes a new instance of the <see cref="SQSFIFOSequenceToken"/> class.
/// </summary>
/// <param name="seqNumber">The sequence number.</param>
public SQSFIFOSequenceToken(UInt128 seqNumber)
{
SqsSequenceNumber = seqNumber;
EventIndex = 0;
}

/// <summary>
/// Initializes a new instance of the <see cref="SQSFIFOSequenceToken"/> class.
/// </summary>
/// <param name="seqNumber">The sequence number.</param>
/// <param name="eventInd">The event index, for events which are part of a batch of events.</param>
public SQSFIFOSequenceToken(UInt128 seqNumber, int eventInd)
{
SqsSequenceNumber = seqNumber;
EventIndex = eventInd;
}

/// <summary>
/// Initializes a new instance of the <see cref="SQSFIFOSequenceToken"/> class.
/// </summary>
/// <remarks>
/// This constructor is for serializer use only.
/// </remarks>
public SQSFIFOSequenceToken()
{
}

/// <summary>
/// Creates a sequence token for a specific event in the current batch
/// </summary>
/// <param name="eventInd">The event index.</param>
/// <returns>A new sequence token.</returns>
public SQSFIFOSequenceToken CreateSequenceTokenForEvent(int eventInd)
{
return new SQSFIFOSequenceToken(SqsSequenceNumber, eventInd);
}

/// <inheritdoc/>
public override bool Equals(object obj)
{
return Equals(obj as SQSFIFOSequenceToken);
}

/// <inheritdoc/>
public override bool Equals(StreamSequenceToken other)
{
var token = other as SQSFIFOSequenceToken;
return token != null && (token.SqsSequenceNumber == SqsSequenceNumber &&
token.EventIndex == EventIndex);
}

/// <inheritdoc/>
public override int CompareTo(StreamSequenceToken other)
{
if (other == null)
return 1;

var token = other as SQSFIFOSequenceToken;
if (token == null)
throw new ArgumentOutOfRangeException(nameof(other));

int difference = SqsSequenceNumber.CompareTo(token.SqsSequenceNumber);
return difference != 0 ? difference : EventIndex.CompareTo(token.EventIndex);
}

/// <inheritdoc/>
public override int GetHashCode()
{
// why 397?
return (EventIndex * 397) ^ SqsSequenceNumber.GetHashCode();
}

/// <inheritdoc/>
public override string ToString()
{
return string.Format(CultureInfo.InvariantCulture, "[SQSFIFOSequenceToken: SeqNum={0}, EventIndex={1}]", SqsSequenceNumber, EventIndex);
}
}
}

0 comments on commit 519eb4d

Please sign in to comment.