Skip to content

Commit

Permalink
better documentation and naming (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Dec 15, 2021
1 parent a02c5aa commit e8b5a43
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public static async Task<IReadOnlyCollection<ChangeData<dynamic>>> GetNetChanges
/// </returns>
public static async Task<IReadOnlyCollection<ChangeData<dynamic>>> GetAllChanges(
SqlConnection connection,
string tableName,
string captureInstance,
long beginLsn,
long endLsn)
{
var cdcColumns = await CdcDatabase.GetAllChanges(connection, tableName, beginLsn, endLsn);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, tableName)).ToList();
var cdcColumns = await CdcDatabase.GetAllChanges(connection, captureInstance, beginLsn, endLsn);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
}
}
20 changes: 10 additions & 10 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MsSqlCdc;

public static class CdcDatabase
internal static class CdcDatabase
{
public static async Task<byte[]> GetMinLsn(SqlConnection connection, string captureInstance)
{
Expand All @@ -16,7 +16,7 @@ public static async Task<byte[]> GetMinLsn(SqlConnection connection, string capt

using SqlDataReader reader = await command.ExecuteReaderAsync();

var minLsn = new byte[8];
var minLsn = new byte[10];
while (await reader.ReadAsync())
{
minLsn = (byte[])reader["min_lsn"];
Expand All @@ -32,7 +32,7 @@ public static async Task<byte[]> GetMaxLsn(SqlConnection connection)
using var command = new SqlCommand(sql, connection);
using SqlDataReader reader = await command.ExecuteReaderAsync();

var maxLsn = new byte[8];
var maxLsn = new byte[10];
while (await reader.ReadAsync())
{
maxLsn = (byte[])reader["max_lsn"];
Expand All @@ -50,7 +50,7 @@ public static async Task<byte[]> GetNextLsn(SqlConnection connection, long lsn)

using SqlDataReader reader = await command.ExecuteReaderAsync();

var nextLsn = new byte[8];
var nextLsn = new byte[10];
while (await reader.ReadAsync())
{
nextLsn = (byte[])reader["next_lsn"];
Expand All @@ -61,32 +61,32 @@ public static async Task<byte[]> GetNextLsn(SqlConnection connection, long lsn)

public static async Task<List<List<Tuple<string, object>>>> GetAllChanges(
SqlConnection connection,
string tableName,
string captureInstance,
long beginLsn,
long endLsn)
{
return await GetChanges(connection, "cdc.fn_cdc_get_all_changes", tableName, beginLsn, endLsn);
return await GetChanges(connection, "cdc.fn_cdc_get_all_changes", captureInstance, beginLsn, endLsn);
}

public static async Task<List<List<Tuple<string, object>>>> GetNetChanges(
SqlConnection connection,
string tableName,
string captureInstance,
long beginLsn,
long endLsn)
{
return await GetChanges(connection, "cdc.fn_cdc_get_net_changes", tableName, beginLsn, endLsn);
return await GetChanges(connection, "cdc.fn_cdc_get_net_changes", captureInstance, beginLsn, endLsn);
}

private static async Task<List<List<Tuple<string, object>>>> GetChanges(
SqlConnection connection,
string cdcFunction,
string tableName,
string captureInstance,
long beginLsn,
long endLsn)
{
var builder = new SqlCommandBuilder();
// We have to do this here, since we cannot pass the function as command parameter.
var function = builder.UnquoteIdentifier($"{cdcFunction}_{tableName}");
var function = builder.UnquoteIdentifier($"{cdcFunction}_{captureInstance}");

var sql = $"SELECT * FROM {function}(@begin_lsn, @end_lsn, 'all update old')";

Expand Down
38 changes: 33 additions & 5 deletions src/MsSqlCdc/ChangeData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,59 @@ public enum Operation : ushort

public record ChangeData<T>
{
/// <summary>
/// Commit LSN associated with the change that preserves the commit order of the change.
/// Changes committed in the same transaction share the same commit LSN value.
/// </summary>
public long StartLineSequenceNumber { get; init; }

/// <summary>
/// Sequence value used to order changes to a row within a transaction.
/// </summary>
public long SequenceValue { get; init; }


/// <summary>
/// Identifies the data manipulation language (DML) operation needed
/// to apply the row of change data to the target data source.
/// </summary>
public Operation Operation { get; init; }

/// <summary>
/// A bit mask with a bit corresponding to each captured column identified for the capture instance.
/// This value has all defined bits set to 1 when __$operation = 1 or 2. When __$operation = 3 or 4,
/// only those bits corresponding to columns that changed are set to 1.
/// </summary>
public string UpdateMask { get; init; }
public string TableName { get; set; }

/// <summary>
/// The name of the capture instance associated with the change.
/// </summary>
public string CaptureInstance { get; set; }

/// <summary>
/// Custom capture instance fields.
/// </summary>
public T Body { get; init; }

public ChangeData(
long startLineSequenceNumber,
long sequenceValue,
Operation operation,
string updateMask,
string tableName,
string captureInstance,
T body)
{
if (body is null)
throw new ArgumentNullException($"{nameof(body)} cannot be null.");
if (string.IsNullOrWhiteSpace(tableName))
throw new ArgumentNullException($"{nameof(tableName)} cannot be null, empty or whitespace.");
if (string.IsNullOrWhiteSpace(captureInstance))
throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace.");

StartLineSequenceNumber = startLineSequenceNumber;
SequenceValue = sequenceValue;
Operation = operation;
UpdateMask = updateMask;
TableName = tableName;
CaptureInstance = captureInstance;
Body = body;
}
}
6 changes: 3 additions & 3 deletions src/MsSqlCdc/DataConvert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public static class DataConvert
/// Converts a a colection of columns represented as Tuple<string, object> to ChangeData<dynamic> representation.
/// </summary>
/// <param name="column">List of tuples with Item1 being the name column and Item2 being the column value</param>
/// <param name="tableName">The tablename of the column.</param>
/// <param name="captureInstance">The tablename of the column.</param>
/// <returns>Returns the CDC column as a ChangeData record.</returns>
public static ChangeData<dynamic> ConvertCdcColumn(List<Tuple<string, object>> column, string tableName)
public static ChangeData<dynamic> ConvertCdcColumn(List<Tuple<string, object>> column, string captureInstance)
{
var startLsn = ConvertBinaryLsn((byte[])column[0].Item2);
var seqVal = ConvertBinaryLsn((byte[])column[1].Item2);
Expand All @@ -30,7 +30,7 @@ public static ChangeData<dynamic> ConvertCdcColumn(List<Tuple<string, object>> c
seqVal,
operation,
updateMask,
tableName,
captureInstance,
body
);
}
Expand Down

0 comments on commit e8b5a43

Please sign in to comment.