Skip to content

Commit

Permalink
fixes issue with net changes (#51)
Browse files Browse the repository at this point in the history
* ChangeRow renamed to AllChangeRow to seperate netchanges and all changes

* moves AllChangeRow creation to its own factory

* adds NetChangeFactory

* adds get required fields for factories

* simplified factories
  • Loading branch information
runeanielsen authored Jan 16, 2022
1 parent 051c267 commit 7329e1e
Show file tree
Hide file tree
Showing 11 changed files with 890 additions and 391 deletions.
4 changes: 2 additions & 2 deletions examples/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static void Main(string[] args)
var cdcCancellation = new CancellationTokenSource();
var cdcCancellationToken = cdcCancellation.Token;

var changeDataChannel = Channel.CreateUnbounded<IReadOnlyCollection<ChangeRow>>();
var changeDataChannel = Channel.CreateUnbounded<IReadOnlyCollection<AllChangeRow>>();
_ = Task.Factory.StartNew(async () =>
{
var lowBoundLsn = await GetStartLsn(connectionString);
Expand All @@ -47,7 +47,7 @@ public static void Main(string[] args)
{
Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}");
var changes = new List<ChangeRow>();
var changes = new List<AllChangeRow>();
foreach (var table in tables)
{
var changeSets = await Cdc.GetAllChanges(
Expand Down
10 changes: 5 additions & 5 deletions src/MsSqlCdc/ChangeRow.cs → src/MsSqlCdc/AllChangeRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

namespace MsSqlCdc;

public enum Operation
public enum AllChangeOperation
{
Delete = 1,
Insert = 2,
BeforeUpdate = 3,
AfterUpdate = 4
}

public record ChangeRow
public record AllChangeRow
{
/// <summary>
/// Commit LSN associated with the change that preserves the commit order of the change.
Expand All @@ -29,7 +29,7 @@ public record ChangeRow
/// Identifies the data manipulation language (DML) operation needed
/// to apply the row of change data to the target data source.
/// </summary>
public Operation Operation { get; init; }
public AllChangeOperation Operation { get; init; }

/// <summary>
/// A bit mask with a bit corresponding to each captured column identified for the capture instance.
Expand All @@ -48,10 +48,10 @@ public record ChangeRow
/// </summary>
public IReadOnlyDictionary<string, object> Fields { get; init; }

public ChangeRow(
public AllChangeRow(
BigInteger startLineSequenceNumber,
BigInteger sequenceValue,
Operation operation,
AllChangeOperation operation,
string updateMask,
string captureInstance,
IReadOnlyDictionary<string, object> fields)
Expand Down
65 changes: 65 additions & 0 deletions src/MsSqlCdc/AllChangeRowFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Numerics;
using System.Text;

namespace MsSqlCdc;

internal static class AllChangeRowFactory
{
/// <summary>
/// Converts a a collection of columns represented as Dictionary<string, object> to ChangeData representation.
/// </summary>
/// <param name="fields">Dictionary of field name and field value.</param>
/// <param name="captureInstance">The capture instance.</param>
/// <returns>Returns the CDC column as a ChangeData record.</returns>
/// <exception cref="Exception"></exception>
public static AllChangeRow Create(IReadOnlyDictionary<string, object> fields, string captureInstance)
{
if (GetRequiredFields(fields).Count() < 4)
throw new ArgumentException($"The column fields does not contain all the default CDC column fields.");

return new AllChangeRow(
GetStartLsn(fields),
GetSeqVal(fields),
GetOperation(fields),
GetUpdateMask(fields),
captureInstance,
GetAdditionalFields(fields));
}

private static string GetUpdateMask(IReadOnlyDictionary<string, object> fields) =>
Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask]);

private static BigInteger GetSeqVal(IReadOnlyDictionary<string, object> fields) =>
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.SeqVal]);

private static BigInteger GetStartLsn(IReadOnlyDictionary<string, object> fields) =>
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]);

private static bool IsRequiredField(string fieldName) =>
fieldName == CdcFieldName.StartLsn ||
fieldName == CdcFieldName.SeqVal ||
fieldName == CdcFieldName.Operation ||
fieldName == CdcFieldName.UpdateMask;

private static IEnumerable<KeyValuePair<string, object>> GetRequiredFields(
IReadOnlyDictionary<string, object> fields) => fields.Where(x => IsRequiredField(x.Key));

private static Dictionary<string, object> GetAdditionalFields(IReadOnlyDictionary<string, object> fields)
=> fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value);

private static AllChangeOperation GetOperation(IReadOnlyDictionary<string, object> fields) =>
ConvertOperation((int)fields[CdcFieldName.Operation]);

private static AllChangeOperation ConvertOperation(int representation)
=> representation switch
{
1 => AllChangeOperation.Delete,
2 => AllChangeOperation.Insert,
3 => AllChangeOperation.BeforeUpdate,
4 => AllChangeOperation.AfterUpdate,
_ => throw new ArgumentException($"Not valid representation value '{representation}'")
};
}
8 changes: 4 additions & 4 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public static async Task<BigInteger> GetNextLsn(SqlConnection connection, BigInt
/// <returns>
/// Returns one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range.
/// </returns>
public static async Task<IReadOnlyCollection<ChangeRow>> GetNetChanges(
public static async Task<IReadOnlyCollection<NetChangeRow>> GetNetChanges(
SqlConnection connection,
string captureInstance,
BigInteger fromLsn,
Expand All @@ -279,7 +279,7 @@ public static async Task<IReadOnlyCollection<ChangeRow>> GetNetChanges(
var filterOption = DataConvert.ConvertNetChangesRowFilterOption(netChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetNetChanges(
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
return cdcColumns.Select(x => NetChangeRowFactory.Create(x, captureInstance)).ToList();
}

/// <summary>
Expand All @@ -294,7 +294,7 @@ public static async Task<IReadOnlyCollection<ChangeRow>> GetNetChanges(
/// Returns one row for each change applied to the source table within the specified log sequence number (LSN) range.
/// If a source row had multiple changes during the interval, each change is represented in the returned result set.
/// </returns>
public static async Task<IReadOnlyCollection<ChangeRow>> GetAllChanges(
public static async Task<IReadOnlyCollection<AllChangeRow>> GetAllChanges(
SqlConnection connection,
string captureInstance,
BigInteger beginLsn,
Expand All @@ -306,6 +306,6 @@ public static async Task<IReadOnlyCollection<ChangeRow>> GetAllChanges(
var filterOption = DataConvert.ConvertAllChangesRowFilterOption(allChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetAllChanges(
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
return cdcColumns.Select(x => AllChangeRowFactory.Create(x, captureInstance)).ToList();
}
}
57 changes: 0 additions & 57 deletions src/MsSqlCdc/DataConvert.cs
Original file line number Diff line number Diff line change
@@ -1,68 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Numerics;
using System.Text;

namespace MsSqlCdc;

internal static class DataConvert
{
/// <summary>
/// Converts a a collection of columns represented as Tuple<string, object> to ChangeData<dynamic> representation.
/// </summary>
/// <param name="columnFields">List of tuples with Item1 being the name column and Item2 being the column value</param>
/// <param name="captureInstance">The tablename of the column.</param>
/// <returns>Returns the CDC column as a ChangeData record.</returns>
/// <exception cref="Exception"></exception>
public static ChangeRow ConvertCdcColumn(
IReadOnlyDictionary<string, object> columnFields,
string captureInstance)
{
bool isDefaultCdcField(string fieldName) =>
fieldName == CdcFieldName.StartLsn ||
fieldName == CdcFieldName.SeqVal ||
fieldName == CdcFieldName.Operation ||
fieldName == CdcFieldName.UpdateMask;

if (columnFields.Where(x => isDefaultCdcField(x.Key)).Count() < 4)
throw new ArgumentException(
$"The column fields does not contain all the default CDC column fields.");

var nonDefaultCdcFields = columnFields
.Where(x => !isDefaultCdcField(x.Key))
.ToDictionary(x => x.Key, x => x.Value);

var startLsn = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.StartLsn]);
var seqVal = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.SeqVal]);
var operation = ConvertOperation((int)columnFields[CdcFieldName.Operation]);
var updateMask = Encoding.UTF8.GetString((byte[])columnFields[CdcFieldName.UpdateMask]);

return new ChangeRow(
startLsn,
seqVal,
operation,
updateMask,
captureInstance,
nonDefaultCdcFields);
}

/// <summary>
/// Converts the number representation to an Enum representation of the value.
/// </summary>
/// <param name="representation">The number representation of the Operation.</param>
/// <returns>Enum representation of the number representation.</returns>
/// <exception cref="ArgumentException"></exception>
public static Operation ConvertOperation(int representation)
=> representation switch
{
1 => Operation.Delete,
2 => Operation.Insert,
3 => Operation.BeforeUpdate,
4 => Operation.AfterUpdate,
_ => throw new ArgumentException($"Not valid representation value '{representation}'")
};

/// <summary>
/// Converts RelationOperator enum to a string representation to be used in MS-SQL.
/// </summary>
Expand Down
74 changes: 74 additions & 0 deletions src/MsSqlCdc/NetChangeRow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.Numerics;

namespace MsSqlCdc;

public enum NetChangeOperation
{
Delete = 1,
Insert = 2,
Update = 4,
InsertOrUpdate = 5
}

public record NetChangeRow
{
/// <summary>
/// All changes committed in the same transaction share the same commit LSN.
/// For example, if an update operation on the source table modifies two columns in two rows,
/// the change table will contain four rows, each with the same __$start_lsnvalue.
/// </summary>
public BigInteger StartLineSequenceNumber { get; init; }

/// <summary>
///Identifies the data manipulation language (DML) operation needed to apply the row of
/// change data to the target data source.
/// If the value of the row_filter_option parameter is all or all with mask,
/// the value in this column can be one of the following values:
/// 1 = Delete
/// 2 = Insert
/// 4 = Update
/// If the value of the row_filter_option parameter is all with merge,
/// the value in this column can be one of the following values:
/// 1 = Delete
/// 5 = Insert or update
/// </summary>
public NetChangeOperation Operation { get; init; }

/// <summary>
/// A bit mask with a bit corresponding to each captured column identified for the capture instance.
/// This value has all defined bits set to 1 when __$operation = 1 or 2. When __$operation = 3 or 4,
/// only those bits corresponding to columns that changed are set to 1.
/// </summary>
public string? UpdateMask { get; init; }

/// <summary>
/// The name of the capture instance associated with the change.
/// </summary>
public string CaptureInstance { get; set; }

/// <summary>
/// The row fields.
/// </summary>
public IReadOnlyDictionary<string, object> Fields { get; init; }

public NetChangeRow(
BigInteger startLineSequenceNumber,
NetChangeOperation operation,
string? updateMask,
string captureInstance,
IReadOnlyDictionary<string, object> fields)
{
if (fields is null)
throw new ArgumentNullException($"{nameof(fields)} cannot be null.");
if (string.IsNullOrWhiteSpace(captureInstance))
throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace.");

StartLineSequenceNumber = startLineSequenceNumber;
Operation = operation;
UpdateMask = updateMask;
CaptureInstance = captureInstance;
Fields = fields;
}
}
64 changes: 64 additions & 0 deletions src/MsSqlCdc/NetChangeRowFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Numerics;
using System.Text;

namespace MsSqlCdc;

internal static class NetChangeRowFactory
{
/// <summary>
/// Converts a a collection of columns represented as Dictionary<string, object> to ChangeData representation.
/// </summary>
/// <param name="fields">Dictionary of field name and field value.</param>
/// <param name="captureInstance">The capture instance.</param>
/// <returns>Returns the CDC column as a ChangeData record.</returns>
/// <exception cref="Exception"></exception>
public static NetChangeRow Create(
IReadOnlyDictionary<string, object> fields,
string captureInstance)
{
if (GetRequiredFields(fields).Count() < 3)
throw new ArgumentException($"The column fields does not contain all the default CDC column fields.");

return new NetChangeRow(
GetStartLsn(fields),
GetOperation(fields),
GetUpdateMask(fields),
captureInstance,
GetAdditionalFields(fields));
}

private static BigInteger GetStartLsn(IReadOnlyDictionary<string, object> fields) =>
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]);

private static string? GetUpdateMask(IReadOnlyDictionary<string, object> fields) =>
fields[CdcFieldName.UpdateMask] != DBNull.Value
? Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask])
: null;

private static bool IsRequiredField(string fieldName) =>
fieldName == CdcFieldName.StartLsn ||
fieldName == CdcFieldName.Operation ||
fieldName == CdcFieldName.UpdateMask;

private static IEnumerable<KeyValuePair<string, object>> GetRequiredFields(
IReadOnlyDictionary<string, object> fields) => fields.Where(x => IsRequiredField(x.Key));

private static Dictionary<string, object> GetAdditionalFields(IReadOnlyDictionary<string, object> fields)
=> fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value);

private static NetChangeOperation GetOperation(IReadOnlyDictionary<string, object> fields) =>
ConvertOperation((int)fields[CdcFieldName.Operation]);

private static NetChangeOperation ConvertOperation(int representation)
=> representation switch
{
1 => NetChangeOperation.Delete,
2 => NetChangeOperation.Insert,
4 => NetChangeOperation.Update,
5 => NetChangeOperation.InsertOrUpdate,
_ => throw new ArgumentException($"Not valid representation value '{representation}'")
};
}
Loading

0 comments on commit 7329e1e

Please sign in to comment.