From e8b5a436f0e6a955905813a58c407545a2808fd0 Mon Sep 17 00:00:00 2001 From: Rune Nielsen Date: Wed, 15 Dec 2021 22:00:39 +0100 Subject: [PATCH] better documentation and naming (#8) --- src/MsSqlCdc/Cdc.cs | 6 +++--- src/MsSqlCdc/CdcDatabase.cs | 20 +++++++++---------- src/MsSqlCdc/ChangeData.cs | 38 ++++++++++++++++++++++++++++++++----- src/MsSqlCdc/DataConvert.cs | 6 +++--- 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/src/MsSqlCdc/Cdc.cs b/src/MsSqlCdc/Cdc.cs index 3f3a4d1..3d152a6 100644 --- a/src/MsSqlCdc/Cdc.cs +++ b/src/MsSqlCdc/Cdc.cs @@ -78,11 +78,11 @@ public static async Task>> GetNetChanges /// public static async Task>> GetAllChanges( SqlConnection connection, - string tableName, + string captureInstance, long beginLsn, long endLsn) { - var cdcColumns = await CdcDatabase.GetAllChanges(connection, tableName, beginLsn, endLsn); - return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, tableName)).ToList(); + var cdcColumns = await CdcDatabase.GetAllChanges(connection, captureInstance, beginLsn, endLsn); + return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList(); } } diff --git a/src/MsSqlCdc/CdcDatabase.cs b/src/MsSqlCdc/CdcDatabase.cs index 23863b4..20f2b57 100644 --- a/src/MsSqlCdc/CdcDatabase.cs +++ b/src/MsSqlCdc/CdcDatabase.cs @@ -5,7 +5,7 @@ namespace MsSqlCdc; -public static class CdcDatabase +internal static class CdcDatabase { public static async Task GetMinLsn(SqlConnection connection, string captureInstance) { @@ -16,7 +16,7 @@ public static async Task GetMinLsn(SqlConnection connection, string capt using SqlDataReader reader = await command.ExecuteReaderAsync(); - var minLsn = new byte[8]; + var minLsn = new byte[10]; while (await reader.ReadAsync()) { minLsn = (byte[])reader["min_lsn"]; @@ -32,7 +32,7 @@ public static async Task GetMaxLsn(SqlConnection connection) using var command = new SqlCommand(sql, connection); using SqlDataReader reader = await command.ExecuteReaderAsync(); - var maxLsn = new byte[8]; + var maxLsn = new byte[10]; while (await reader.ReadAsync()) { maxLsn = (byte[])reader["max_lsn"]; @@ -50,7 +50,7 @@ public static async Task GetNextLsn(SqlConnection connection, long lsn) using SqlDataReader reader = await command.ExecuteReaderAsync(); - var nextLsn = new byte[8]; + var nextLsn = new byte[10]; while (await reader.ReadAsync()) { nextLsn = (byte[])reader["next_lsn"]; @@ -61,32 +61,32 @@ public static async Task GetNextLsn(SqlConnection connection, long lsn) public static async Task>>> GetAllChanges( SqlConnection connection, - string tableName, + string captureInstance, long beginLsn, long endLsn) { - return await GetChanges(connection, "cdc.fn_cdc_get_all_changes", tableName, beginLsn, endLsn); + return await GetChanges(connection, "cdc.fn_cdc_get_all_changes", captureInstance, beginLsn, endLsn); } public static async Task>>> GetNetChanges( SqlConnection connection, - string tableName, + string captureInstance, long beginLsn, long endLsn) { - return await GetChanges(connection, "cdc.fn_cdc_get_net_changes", tableName, beginLsn, endLsn); + return await GetChanges(connection, "cdc.fn_cdc_get_net_changes", captureInstance, beginLsn, endLsn); } private static async Task>>> GetChanges( SqlConnection connection, string cdcFunction, - string tableName, + string captureInstance, long beginLsn, long endLsn) { var builder = new SqlCommandBuilder(); // We have to do this here, since we cannot pass the function as command parameter. - var function = builder.UnquoteIdentifier($"{cdcFunction}_{tableName}"); + var function = builder.UnquoteIdentifier($"{cdcFunction}_{captureInstance}"); var sql = $"SELECT * FROM {function}(@begin_lsn, @end_lsn, 'all update old')"; diff --git a/src/MsSqlCdc/ChangeData.cs b/src/MsSqlCdc/ChangeData.cs index a779ba8..839dea8 100644 --- a/src/MsSqlCdc/ChangeData.cs +++ b/src/MsSqlCdc/ChangeData.cs @@ -12,11 +12,39 @@ public enum Operation : ushort public record ChangeData { + /// + /// Commit LSN associated with the change that preserves the commit order of the change. + /// Changes committed in the same transaction share the same commit LSN value. + /// public long StartLineSequenceNumber { get; init; } + + /// + /// Sequence value used to order changes to a row within a transaction. + /// public long SequenceValue { get; init; } + + + /// + /// Identifies the data manipulation language (DML) operation needed + /// to apply the row of change data to the target data source. + /// public Operation Operation { get; init; } + + /// + /// A bit mask with a bit corresponding to each captured column identified for the capture instance. + /// This value has all defined bits set to 1 when __$operation = 1 or 2. When __$operation = 3 or 4, + /// only those bits corresponding to columns that changed are set to 1. + /// public string UpdateMask { get; init; } - public string TableName { get; set; } + + /// + /// The name of the capture instance associated with the change. + /// + public string CaptureInstance { get; set; } + + /// + /// Custom capture instance fields. + /// public T Body { get; init; } public ChangeData( @@ -24,19 +52,19 @@ public ChangeData( long sequenceValue, Operation operation, string updateMask, - string tableName, + string captureInstance, T body) { if (body is null) throw new ArgumentNullException($"{nameof(body)} cannot be null."); - if (string.IsNullOrWhiteSpace(tableName)) - throw new ArgumentNullException($"{nameof(tableName)} cannot be null, empty or whitespace."); + if (string.IsNullOrWhiteSpace(captureInstance)) + throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace."); StartLineSequenceNumber = startLineSequenceNumber; SequenceValue = sequenceValue; Operation = operation; UpdateMask = updateMask; - TableName = tableName; + CaptureInstance = captureInstance; Body = body; } } diff --git a/src/MsSqlCdc/DataConvert.cs b/src/MsSqlCdc/DataConvert.cs index ebb1330..addfe0c 100644 --- a/src/MsSqlCdc/DataConvert.cs +++ b/src/MsSqlCdc/DataConvert.cs @@ -12,9 +12,9 @@ public static class DataConvert /// Converts a a colection of columns represented as Tuple to ChangeData representation. /// /// List of tuples with Item1 being the name column and Item2 being the column value - /// The tablename of the column. + /// The tablename of the column. /// Returns the CDC column as a ChangeData record. - public static ChangeData ConvertCdcColumn(List> column, string tableName) + public static ChangeData ConvertCdcColumn(List> column, string captureInstance) { var startLsn = ConvertBinaryLsn((byte[])column[0].Item2); var seqVal = ConvertBinaryLsn((byte[])column[1].Item2); @@ -30,7 +30,7 @@ public static ChangeData ConvertCdcColumn(List> c seqVal, operation, updateMask, - tableName, + captureInstance, body ); }