Skip to content

Commit

Permalink
now uses dictionary instead of dynamic for change row fields (#47)
Browse files Browse the repository at this point in the history
* now uses dictionary instead of dynamic for row fields

* now uses fluent assertion instead of json hack
  • Loading branch information
runeanielsen authored Jan 13, 2022
1 parent 8ea8086 commit 498e584
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 151 deletions.
4 changes: 2 additions & 2 deletions examples/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static void Main(string[] args)
var cdcCancellation = new CancellationTokenSource();
var cdcCancellationToken = cdcCancellation.Token;

var changeDataChannel = Channel.CreateUnbounded<IReadOnlyCollection<ChangeRow<dynamic>>>();
var changeDataChannel = Channel.CreateUnbounded<IReadOnlyCollection<ChangeRow>>();
_ = Task.Factory.StartNew(async () =>
{
var lowBoundLsn = await GetStartLsn(connectionString);
Expand All @@ -47,7 +47,7 @@ public static void Main(string[] args)
{
Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}");
var changes = new List<ChangeRow<dynamic>>();
var changes = new List<ChangeRow>();
foreach (var table in tables)
{
var changeSets = await Cdc.GetAllChanges(
Expand Down
4 changes: 2 additions & 2 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public static async Task<BigInteger> GetNextLsn(SqlConnection connection, BigInt
/// <returns>
/// Returns one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range.
/// </returns>
public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetNetChanges(
public static async Task<IReadOnlyCollection<ChangeRow>> GetNetChanges(
SqlConnection connection,
string captureInstance,
BigInteger fromLsn,
Expand All @@ -294,7 +294,7 @@ public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetNetChanges(
/// Returns one row for each change applied to the source table within the specified log sequence number (LSN) range.
/// If a source row had multiple changes during the interval, each change is represented in the returned result set.
/// </returns>
public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetAllChanges(
public static async Task<IReadOnlyCollection<ChangeRow>> GetAllChanges(
SqlConnection connection,
string captureInstance,
BigInteger beginLsn,
Expand Down
26 changes: 8 additions & 18 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;

Expand All @@ -14,7 +13,6 @@ internal static class CdcDatabase
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@position", position);
command.Parameters.AddWithValue("@updateMask", updateMask);

return (bool?)(await command.ExecuteScalarAsync());
}

Expand All @@ -29,7 +27,6 @@ internal static class CdcDatabase
command.Parameters.AddWithValue("@capture_instance", captureInstance);
command.Parameters.AddWithValue("@column_name", columnName);
command.Parameters.AddWithValue("@update_mask", updateMask);

return (bool?)(await command.ExecuteScalarAsync());
}

Expand All @@ -42,7 +39,6 @@ internal static class CdcDatabase
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@capture_instance", captureInstance);
command.Parameters.AddWithValue("@column_name", columnName);

return (int?)(await command.ExecuteScalarAsync());
}

Expand All @@ -51,7 +47,6 @@ internal static class CdcDatabase
var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

return (DateTime?)(await command.ExecuteScalarAsync());
}

Expand All @@ -64,7 +59,6 @@ internal static class CdcDatabase
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@relational_operator", relationOperator);
command.Parameters.AddWithValue("@tracking_time", trackingTime);

return (byte[]?)(await command.ExecuteScalarAsync());
}

Expand All @@ -73,15 +67,13 @@ internal static class CdcDatabase
var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@capture_instance", captureInstance);

return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]?> GetMaxLsn(SqlConnection connection)
{
var sql = "SELECT sys.fn_cdc_get_max_lsn()";
using var command = new SqlCommand(sql, connection);

return (byte[]?)(await command.ExecuteScalarAsync());
}

Expand All @@ -90,7 +82,6 @@ internal static class CdcDatabase
var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

return (byte[]?)(await command.ExecuteScalarAsync());
}

Expand All @@ -99,11 +90,10 @@ internal static class CdcDatabase
var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<List<List<(string fieldName, object fieldValue)>>> GetAllChanges(
public static async Task<List<IReadOnlyDictionary<string, object>>> GetAllChanges(
SqlConnection connection,
string captureInstance,
byte[] beginLsn,
Expand All @@ -119,7 +109,7 @@ internal static class CdcDatabase
filterOption);
}

public static async Task<List<List<(string fieldName, object fieldValue)>>> GetNetChanges(
public static async Task<List<IReadOnlyDictionary<string, object>>> GetNetChanges(
SqlConnection connection,
string captureInstance,
byte[] beginLsn,
Expand All @@ -135,7 +125,7 @@ internal static class CdcDatabase
filterOption);
}

private static async Task<List<List<(string fieldName, object fieldValue)>>> GetChanges(
private static async Task<List<IReadOnlyDictionary<string, object>>> GetChanges(
SqlConnection connection,
string cdcFunction,
string captureInstance,
Expand All @@ -149,19 +139,19 @@ internal static class CdcDatabase
command.Parameters.AddWithValue("@end_lsn", endLsn);
command.Parameters.AddWithValue("@filter_option", filterOption);

var changes = new List<List<(string name, object value)>>();
var columns = new List<IReadOnlyDictionary<string, object>>();
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var column = new List<(string fieldName, object fieldValue)>();
var column = new Dictionary<string, object>();
for (var i = 0; i < reader.FieldCount; i++)
{
column.Add((reader.GetName(i), reader.GetValue(i)));
column.Add(reader.GetName(i), reader.GetValue(i));
}

changes.Add(column);
columns.Add(column);
}

return changes;
return columns;
}
}
15 changes: 8 additions & 7 deletions src/MsSqlCdc/ChangeRow.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Numerics;
using System.Collections.Generic;

namespace MsSqlCdc;

Expand All @@ -11,7 +12,7 @@ public enum Operation
AfterUpdate = 4
}

public record ChangeRow<T>
public record ChangeRow
{
/// <summary>
/// Commit LSN associated with the change that preserves the commit order of the change.
Expand Down Expand Up @@ -43,20 +44,20 @@ public record ChangeRow<T>
public string CaptureInstance { get; set; }

/// <summary>
/// Dynamic column fields.
/// The row fields.
/// </summary>
public T Body { get; init; }
public IReadOnlyDictionary<string, object> Fields { get; init; }

public ChangeRow(
BigInteger startLineSequenceNumber,
BigInteger sequenceValue,
Operation operation,
string updateMask,
string captureInstance,
T body)
IReadOnlyDictionary<string, object> fields)
{
if (body is null)
throw new ArgumentNullException($"{nameof(body)} cannot be null.");
if (fields is null)
throw new ArgumentNullException($"{nameof(fields)} cannot be null.");
if (string.IsNullOrWhiteSpace(captureInstance))
throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace.");

Expand All @@ -65,6 +66,6 @@ public ChangeRow(
Operation = operation;
UpdateMask = updateMask;
CaptureInstance = captureInstance;
Body = body;
Fields = fields;
}
}
30 changes: 12 additions & 18 deletions src/MsSqlCdc/DataConvert.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Dynamic;
using System.Linq;
using System.Numerics;
using System.Text;
Expand All @@ -16,8 +15,8 @@ internal static class DataConvert
/// <param name="captureInstance">The tablename of the column.</param>
/// <returns>Returns the CDC column as a ChangeData record.</returns>
/// <exception cref="Exception"></exception>
public static ChangeRow<dynamic> ConvertCdcColumn(
List<(string fieldName, object fieldValue)> columnFields,
public static ChangeRow ConvertCdcColumn(
IReadOnlyDictionary<string, object> columnFields,
string captureInstance)
{
bool isDefaultCdcField(string fieldName) =>
Expand All @@ -26,31 +25,26 @@ bool isDefaultCdcField(string fieldName) =>
fieldName == CdcFieldName.Operation ||
fieldName == CdcFieldName.UpdateMask;

if (columnFields.Where(x => isDefaultCdcField(x.fieldName)).Count() < 4)
if (columnFields.Where(x => isDefaultCdcField(x.Key)).Count() < 4)
throw new ArgumentException(
$"The column fields does not contain all the default CDC column fields.");

var startLsn = ConvertBinaryLsn(
(byte[])columnFields.First(x => x.fieldName == CdcFieldName.StartLsn).fieldValue);
var seqVal = ConvertBinaryLsn(
(byte[])columnFields.First(x => x.fieldName == CdcFieldName.SeqVal).fieldValue);
var operation = ConvertOperation(
(int)columnFields.First(x => x.fieldName == CdcFieldName.Operation).fieldValue);
var updateMask = Encoding.UTF8.GetString(
(byte[])columnFields.First(x => x.fieldName == CdcFieldName.UpdateMask).fieldValue);
var nonDefaultCdcFields = columnFields
.Where(x => !isDefaultCdcField(x.Key))
.ToDictionary(x => x.Key, x => x.Value);

var body = columnFields
.Where(x => !isDefaultCdcField(x.fieldName))
.Aggregate(new ExpandoObject() as IDictionary<string, object>,
(acc, x) => { acc[x.fieldName] = x.fieldValue; return acc; }) as dynamic;
var startLsn = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.StartLsn]);
var seqVal = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.SeqVal]);
var operation = ConvertOperation((int)columnFields[CdcFieldName.Operation]);
var updateMask = Encoding.UTF8.GetString((byte[])columnFields[CdcFieldName.UpdateMask]);

return new ChangeRow<dynamic>(
return new ChangeRow(
startLsn,
seqVal,
operation,
updateMask,
captureInstance,
body);
nonDefaultCdcFields);
}

/// <summary>
Expand Down
Loading

0 comments on commit 498e584

Please sign in to comment.