From 498e584731ff07bc2e8d908345c15947265b8ca4 Mon Sep 17 00:00:00 2001 From: Rune Nielsen Date: Thu, 13 Jan 2022 19:00:11 +0100 Subject: [PATCH] now uses dictionary instead of dynamic for change row fields (#47) * now uses dictionary instead of dynamic for row fields * now uses fluent assertion instead of json hack --- examples/Example/Program.cs | 4 +- src/MsSqlCdc/Cdc.cs | 4 +- src/MsSqlCdc/CdcDatabase.cs | 26 +--- src/MsSqlCdc/ChangeRow.cs | 15 +- src/MsSqlCdc/DataConvert.cs | 30 ++-- test/MsSqlCdc.Tests/DataConvertTest.cs | 205 ++++++++++++------------- 6 files changed, 133 insertions(+), 151 deletions(-) diff --git a/examples/Example/Program.cs b/examples/Example/Program.cs index 258f546..415f052 100644 --- a/examples/Example/Program.cs +++ b/examples/Example/Program.cs @@ -22,7 +22,7 @@ public static void Main(string[] args) var cdcCancellation = new CancellationTokenSource(); var cdcCancellationToken = cdcCancellation.Token; - var changeDataChannel = Channel.CreateUnbounded>>(); + var changeDataChannel = Channel.CreateUnbounded>(); _ = Task.Factory.StartNew(async () => { var lowBoundLsn = await GetStartLsn(connectionString); @@ -47,7 +47,7 @@ public static void Main(string[] args) { Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}"); - var changes = new List>(); + var changes = new List(); foreach (var table in tables) { var changeSets = await Cdc.GetAllChanges( diff --git a/src/MsSqlCdc/Cdc.cs b/src/MsSqlCdc/Cdc.cs index 956dec9..8ebdce5 100644 --- a/src/MsSqlCdc/Cdc.cs +++ b/src/MsSqlCdc/Cdc.cs @@ -267,7 +267,7 @@ public static async Task GetNextLsn(SqlConnection connection, BigInt /// /// Returns one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range. /// - public static async Task>> GetNetChanges( + public static async Task> GetNetChanges( SqlConnection connection, string captureInstance, BigInteger fromLsn, @@ -294,7 +294,7 @@ public static async Task>> GetNetChanges( /// Returns one row for each change applied to the source table within the specified log sequence number (LSN) range. /// If a source row had multiple changes during the interval, each change is represented in the returned result set. /// - public static async Task>> GetAllChanges( + public static async Task> GetAllChanges( SqlConnection connection, string captureInstance, BigInteger beginLsn, diff --git a/src/MsSqlCdc/CdcDatabase.cs b/src/MsSqlCdc/CdcDatabase.cs index d332063..bffebd9 100644 --- a/src/MsSqlCdc/CdcDatabase.cs +++ b/src/MsSqlCdc/CdcDatabase.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Numerics; using System.Threading.Tasks; using Microsoft.Data.SqlClient; @@ -14,7 +13,6 @@ internal static class CdcDatabase using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@position", position); command.Parameters.AddWithValue("@updateMask", updateMask); - return (bool?)(await command.ExecuteScalarAsync()); } @@ -29,7 +27,6 @@ internal static class CdcDatabase command.Parameters.AddWithValue("@capture_instance", captureInstance); command.Parameters.AddWithValue("@column_name", columnName); command.Parameters.AddWithValue("@update_mask", updateMask); - return (bool?)(await command.ExecuteScalarAsync()); } @@ -42,7 +39,6 @@ internal static class CdcDatabase using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@capture_instance", captureInstance); command.Parameters.AddWithValue("@column_name", columnName); - return (int?)(await command.ExecuteScalarAsync()); } @@ -51,7 +47,6 @@ internal static class CdcDatabase var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - return (DateTime?)(await command.ExecuteScalarAsync()); } @@ -64,7 +59,6 @@ internal static class CdcDatabase using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@relational_operator", relationOperator); command.Parameters.AddWithValue("@tracking_time", trackingTime); - return (byte[]?)(await command.ExecuteScalarAsync()); } @@ -73,7 +67,6 @@ internal static class CdcDatabase var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@capture_instance", captureInstance); - return (byte[]?)(await command.ExecuteScalarAsync()); } @@ -81,7 +74,6 @@ internal static class CdcDatabase { var sql = "SELECT sys.fn_cdc_get_max_lsn()"; using var command = new SqlCommand(sql, connection); - return (byte[]?)(await command.ExecuteScalarAsync()); } @@ -90,7 +82,6 @@ internal static class CdcDatabase var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - return (byte[]?)(await command.ExecuteScalarAsync()); } @@ -99,11 +90,10 @@ internal static class CdcDatabase var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - return (byte[]?)(await command.ExecuteScalarAsync()); } - public static async Task>> GetAllChanges( + public static async Task>> GetAllChanges( SqlConnection connection, string captureInstance, byte[] beginLsn, @@ -119,7 +109,7 @@ internal static class CdcDatabase filterOption); } - public static async Task>> GetNetChanges( + public static async Task>> GetNetChanges( SqlConnection connection, string captureInstance, byte[] beginLsn, @@ -135,7 +125,7 @@ internal static class CdcDatabase filterOption); } - private static async Task>> GetChanges( + private static async Task>> GetChanges( SqlConnection connection, string cdcFunction, string captureInstance, @@ -149,19 +139,19 @@ internal static class CdcDatabase command.Parameters.AddWithValue("@end_lsn", endLsn); command.Parameters.AddWithValue("@filter_option", filterOption); - var changes = new List>(); + var columns = new List>(); using var reader = await command.ExecuteReaderAsync(); while (await reader.ReadAsync()) { - var column = new List<(string fieldName, object fieldValue)>(); + var column = new Dictionary(); for (var i = 0; i < reader.FieldCount; i++) { - column.Add((reader.GetName(i), reader.GetValue(i))); + column.Add(reader.GetName(i), reader.GetValue(i)); } - changes.Add(column); + columns.Add(column); } - return changes; + return columns; } } diff --git a/src/MsSqlCdc/ChangeRow.cs b/src/MsSqlCdc/ChangeRow.cs index 513490a..58fb102 100644 --- a/src/MsSqlCdc/ChangeRow.cs +++ b/src/MsSqlCdc/ChangeRow.cs @@ -1,5 +1,6 @@ using System; using System.Numerics; +using System.Collections.Generic; namespace MsSqlCdc; @@ -11,7 +12,7 @@ public enum Operation AfterUpdate = 4 } -public record ChangeRow +public record ChangeRow { /// /// Commit LSN associated with the change that preserves the commit order of the change. @@ -43,9 +44,9 @@ public record ChangeRow public string CaptureInstance { get; set; } /// - /// Dynamic column fields. + /// The row fields. /// - public T Body { get; init; } + public IReadOnlyDictionary Fields { get; init; } public ChangeRow( BigInteger startLineSequenceNumber, @@ -53,10 +54,10 @@ public ChangeRow( Operation operation, string updateMask, string captureInstance, - T body) + IReadOnlyDictionary fields) { - if (body is null) - throw new ArgumentNullException($"{nameof(body)} cannot be null."); + if (fields is null) + throw new ArgumentNullException($"{nameof(fields)} cannot be null."); if (string.IsNullOrWhiteSpace(captureInstance)) throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace."); @@ -65,6 +66,6 @@ public ChangeRow( Operation = operation; UpdateMask = updateMask; CaptureInstance = captureInstance; - Body = body; + Fields = fields; } } diff --git a/src/MsSqlCdc/DataConvert.cs b/src/MsSqlCdc/DataConvert.cs index 8e2baa4..db6d002 100644 --- a/src/MsSqlCdc/DataConvert.cs +++ b/src/MsSqlCdc/DataConvert.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Dynamic; using System.Linq; using System.Numerics; using System.Text; @@ -16,8 +15,8 @@ internal static class DataConvert /// The tablename of the column. /// Returns the CDC column as a ChangeData record. /// - public static ChangeRow ConvertCdcColumn( - List<(string fieldName, object fieldValue)> columnFields, + public static ChangeRow ConvertCdcColumn( + IReadOnlyDictionary columnFields, string captureInstance) { bool isDefaultCdcField(string fieldName) => @@ -26,31 +25,26 @@ bool isDefaultCdcField(string fieldName) => fieldName == CdcFieldName.Operation || fieldName == CdcFieldName.UpdateMask; - if (columnFields.Where(x => isDefaultCdcField(x.fieldName)).Count() < 4) + if (columnFields.Where(x => isDefaultCdcField(x.Key)).Count() < 4) throw new ArgumentException( $"The column fields does not contain all the default CDC column fields."); - var startLsn = ConvertBinaryLsn( - (byte[])columnFields.First(x => x.fieldName == CdcFieldName.StartLsn).fieldValue); - var seqVal = ConvertBinaryLsn( - (byte[])columnFields.First(x => x.fieldName == CdcFieldName.SeqVal).fieldValue); - var operation = ConvertOperation( - (int)columnFields.First(x => x.fieldName == CdcFieldName.Operation).fieldValue); - var updateMask = Encoding.UTF8.GetString( - (byte[])columnFields.First(x => x.fieldName == CdcFieldName.UpdateMask).fieldValue); + var nonDefaultCdcFields = columnFields + .Where(x => !isDefaultCdcField(x.Key)) + .ToDictionary(x => x.Key, x => x.Value); - var body = columnFields - .Where(x => !isDefaultCdcField(x.fieldName)) - .Aggregate(new ExpandoObject() as IDictionary, - (acc, x) => { acc[x.fieldName] = x.fieldValue; return acc; }) as dynamic; + var startLsn = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.StartLsn]); + var seqVal = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.SeqVal]); + var operation = ConvertOperation((int)columnFields[CdcFieldName.Operation]); + var updateMask = Encoding.UTF8.GetString((byte[])columnFields[CdcFieldName.UpdateMask]); - return new ChangeRow( + return new ChangeRow( startLsn, seqVal, operation, updateMask, captureInstance, - body); + nonDefaultCdcFields); } /// diff --git a/test/MsSqlCdc.Tests/DataConvertTest.cs b/test/MsSqlCdc.Tests/DataConvertTest.cs index bdf3105..01f720a 100644 --- a/test/MsSqlCdc.Tests/DataConvertTest.cs +++ b/test/MsSqlCdc.Tests/DataConvertTest.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Text; using System.Linq; -using System.Text.Json; namespace MsSqlCdc.Tests; @@ -15,181 +14,181 @@ public static IEnumerable CdcColumnFieldsData() { yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$operation", (int)Operation.AfterUpdate), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("Id", 10), - ("Name", "Rune"), - ("Salary", 20000.00), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)Operation.AfterUpdate}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00}, }, "dbo_Employee", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.AfterUpdate, "MASK", "dbo_Employee", - new { - Id = 10, - Name = "Rune", - Salary = 20000.00 + new Dictionary { + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00} }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$operation", (int)Operation.BeforeUpdate), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("Id", 1), - ("Name", "Simon"), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)Operation.BeforeUpdate}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 1}, + {"Name", "Simon"}, }, "dbo_Employee", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.BeforeUpdate, "MASK", "dbo_Employee", - new { - Id = 1, - Name = "Simon", + new Dictionary { + {"Id", 1}, + {"Name", "Simon"}, }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$operation", (int)Operation.Delete), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("Id", 0), - ("Name", "Jesper"), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)Operation.Delete}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 0}, + {"Name", "Jesper"}, }, "dbo_Employee", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Delete, "MASK", "dbo_Employee", - new { - Id = 0, - Name = "Jesper", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$operation", (int)Operation.Insert), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("Id", 10), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)Operation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, }, "dbo_Animal", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Insert, "MASK", "dbo_Animal", - new { - Id = 10, + new Dictionary{ + {"Id", 10}, }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$operation", (int)Operation.Insert), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)Operation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, }, "dbo_Animal", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Insert, "MASK", "dbo_Animal", - new { + new Dictionary{ }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$operation", (int)Operation.Insert), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$operation", (int)Operation.Insert}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, }, "dbo_Animal", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Insert, "MASK", "dbo_Animal", - new { + new Dictionary{ }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("__$operation", (int)Operation.Insert), + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)Operation.Insert}, }, "dbo_Animal", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Insert, "MASK", "dbo_Animal", - new { + new Dictionary{ }) }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("Id", 0), - ("__$operation", (int)Operation.Delete), - ("Name", "Jesper"), - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), + {"Id", 0}, + {"__$operation", (int)Operation.Delete}, + {"Name", "Jesper"}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, }, "dbo_Employee", - new ChangeRow( + new ChangeRow( 25000L, 25002L, Operation.Delete, "MASK", "dbo_Employee", - new { - Id = 0, - Name = "Jesper", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, }) }; } @@ -198,61 +197,61 @@ public static IEnumerable CdcDefaultFieldsInvalidData() { yield return new object[] { - new List<(string name, object fieldValue)>(), + new Dictionary(), "dbo_Employee", }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, }, "dbo_Employee", }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, }, "dbo_Employee", }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, }, "dbo_Employee", }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("__$update_mask", Encoding.ASCII.GetBytes("MASK")), - ("__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()), - ("__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()), - ("Id", 0), - ("Name", "Rune") + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"Id", 0}, + {"Name", "Rune"} }, "dbo_Employee", }; yield return new object[] { - new List<(string name, object fieldValue)> + new Dictionary { - ("Address", "Streetvalley 20"), - ("Salary", 2000.00), - ("Id", 0), - ("Name", "Rune") + {"Address", "Streetvalley 20"}, + {"Salary", 2000.00}, + {"Id", 0}, + {"Name", "Rune"} }, "dbo_Employee", }; @@ -261,20 +260,18 @@ public static IEnumerable CdcDefaultFieldsInvalidData() [Theory] [MemberData(nameof(CdcColumnFieldsData))] public void Conversion_cdc_column_to_change_row( - List<(string name, object fieldValue)> columnFields, + Dictionary columnFields, string captureInstance, - ChangeRow expected) + ChangeRow expected) { var result = DataConvert.ConvertCdcColumn(columnFields, captureInstance); - - // We do this since record type equality operator does not work with dynamic members. - JsonSerializer.Serialize(result).Should().Be(JsonSerializer.Serialize(expected)); + result.Should().BeEquivalentTo(expected); } [Theory] [MemberData(nameof(CdcDefaultFieldsInvalidData))] public void Conversion_cdc_column_without_default_fields_is_invalid( - List<(string name, object fieldValue)> columnFields, + Dictionary columnFields, string captureInstance) { Invoking(() => DataConvert.ConvertCdcColumn(columnFields, captureInstance))