Skip to content

Commit

Permalink
adds filter row option (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Dec 29, 2021
1 parent 13d8109 commit 279592d
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 11 deletions.
3 changes: 2 additions & 1 deletion examples/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public static void Main(string[] args)
var changes = new List<ChangeRow<dynamic>>();
foreach (var table in tables)
{
var changeSets = await Cdc.GetAllChanges(connection, table, lowBoundLsn, highBoundLsn);
var changeSets = await Cdc.GetAllChanges(
connection, table, lowBoundLsn, highBoundLsn, AllChangesRowFilterOption.AllUpdateOld);
changes.AddRange(changeSets);
}
Expand Down
63 changes: 59 additions & 4 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

namespace MsSqlCdc;

/// <summary>
/// Is used to identify a distinct LSN value in within the cdc.lsn_time_mapping table
/// with an associated tran_end_time that satisfies the relation when compared to the tracking_time value.
/// </summary>
public enum RelationalOperator
{
LargestLessThan,
Expand All @@ -15,6 +19,52 @@ public enum RelationalOperator
SmallestGreaterThanOrEqual
}

/// <summary>
/// An option that governs the content of the metadata columns as well as the rows returned in the result set.
/// </summary>
public enum NetChangesRowFilterOption
{
/// <summary>
/// Returns the LSN of the final change to the row and the operation needed
/// to apply the row in the metadata columns __$start_lsn and __$operation.
/// The column __$update_mask is always NULL.
/// </summary>
All,
/// <summary>
/// Returns the LSN of the final change to the row and the operation
/// needed to apply the row in the metadata columns __$start_lsn and __$operation.
/// In addition, when an update operation returns (__$operation = 4)
/// the captured columns modified in the update are marked in the value returned in __$update_mask.
/// </summary>
AllWithMask,
/// <summary>
/// Returns the LSN of the final change to the row in the metadata columns __$start_lsn.
/// The column __$operation will be one of two values: 1 for delete and 5 to indicate
/// that the operation needed to apply the change is either an insert or an update.
/// The column __$update_mask is always NULL.
/// </summary>
AllWithMerge,
}

/// <summary>
/// An option that governs the content of the metadata columns as well as the rows returned in the result set.
/// </summary>
public enum AllChangesRowFilterOption
{
/// <summary>
/// Returns all changes within the specified LSN range.
/// For changes due to an update operation, this option only returns
/// the row containing the new values after the update is applied.
/// </summary>
All,
/// <summary>
/// Returns all changes within the specified LSN range.
/// For changes due to an update operation, this option returns both the row containing
/// the column values before the update and the row containing the column values after the update.
/// </summary>
AllUpdateOld
}

public static class Cdc
{
/// <summary>
Expand Down Expand Up @@ -217,9 +267,12 @@ public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetNetChanges(
SqlConnection connection,
string captureInstance,
long fromLsn,
long toLsn)
long toLsn,
NetChangesRowFilterOption netChangesRowFilterOption = NetChangesRowFilterOption.All)
{
var cdcColumns = await CdcDatabase.GetNetChanges(connection, captureInstance, fromLsn, toLsn);
var filterOption = DataConvert.NetChangesRowFilterOptionToStringRepresentation(netChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetNetChanges(
connection, captureInstance, fromLsn, toLsn, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
}

Expand All @@ -239,9 +292,11 @@ public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetAllChanges(
SqlConnection connection,
string captureInstance,
long beginLsn,
long endLsn)
long endLsn,
AllChangesRowFilterOption allChangesRowFilterOption = AllChangesRowFilterOption.All)
{
var cdcColumns = await CdcDatabase.GetAllChanges(connection, captureInstance, beginLsn, endLsn);
var filterOption = DataConvert.AllChangesRowFilterOptionToStringRepresentation(allChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetAllChanges(connection, captureInstance, beginLsn, endLsn, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
}
}
28 changes: 22 additions & 6 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,52 @@ internal static class CdcDatabase
SqlConnection connection,
string captureInstance,
long beginLsn,
long endLsn)
long endLsn,
string filterOption)
{
return await GetChanges(connection, "cdc.fn_cdc_get_all_changes", captureInstance, beginLsn, endLsn);
return await GetChanges(
connection,
"cdc.fn_cdc_get_all_changes",
captureInstance,
beginLsn,
endLsn,
filterOption);
}

public static async Task<List<List<(string fieldName, object fieldValue)>>> GetNetChanges(
SqlConnection connection,
string captureInstance,
long beginLsn,
long endLsn)
long endLsn,
string filterOption)
{
return await GetChanges(connection, "cdc.fn_cdc_get_net_changes", captureInstance, beginLsn, endLsn);
return await GetChanges(
connection,
"cdc.fn_cdc_get_net_changes",
captureInstance,
beginLsn,
endLsn,
filterOption);
}

private static async Task<List<List<(string fieldName, object fieldValue)>>> GetChanges(
SqlConnection connection,
string cdcFunction,
string captureInstance,
long beginLsn,
long endLsn)
long endLsn,
string filterOption)
{
var builder = new SqlCommandBuilder();
// We have to do this here, since we cannot pass the function as command parameter.
var function = builder.UnquoteIdentifier($"{cdcFunction}_{captureInstance}");

var sql = $"SELECT * FROM {function}(@begin_lsn, @end_lsn, 'all update old')";
var sql = $"SELECT * FROM {function}(@begin_lsn, @end_lsn, @filter_option)";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@begin_lsn", beginLsn);
command.Parameters.AddWithValue("@end_lsn", endLsn);
command.Parameters.AddWithValue("@filter_option", filterOption);

var changes = new List<List<(string name, object value)>>();
using var reader = await command.ExecuteReaderAsync();
Expand Down
35 changes: 35 additions & 0 deletions src/MsSqlCdc/DataConvert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public static Operation ConvertIntOperation(int representation)
_ => throw new ArgumentException($"Not valid representation value '{representation}'")
};

/// <summary>
/// Converts the RelationOperator enum to a string representation to be used in MS-SQL.
/// </summary>
/// <param name="representation">The enum representation of RelationOperator.</param>
/// <returns>String representation of RelationOperator.</returns>
/// <exception cref="ArgumentException"></exception>
public static string RelationOperatorToStringRepresentation(RelationalOperator relationalOperator)
=> relationalOperator switch
{
Expand All @@ -77,4 +83,33 @@ public static string RelationOperatorToStringRepresentation(RelationalOperator r
RelationalOperator.SmallestGreaterThanOrEqual => "smallest greater than or equal",
_ => throw new ArgumentException($"Not valid representation value '{relationalOperator}'")
};

/// <summary>
/// Converts the NetChangesRowFilterOption enum to a string representation to be used in MS-SQL.
/// </summary>
/// <param name="representation">The enum representation of NetChangesRowFilterOption.</param>
/// <returns>String representation of NetChangesRowfilterOption.</returns>
/// <exception cref="ArgumentException"></exception>
public static string NetChangesRowFilterOptionToStringRepresentation(
NetChangesRowFilterOption netChangesRowFilterOption) => netChangesRowFilterOption switch
{
NetChangesRowFilterOption.All => "all",
NetChangesRowFilterOption.AllWithMask => "all with mask",
NetChangesRowFilterOption.AllWithMerge => "all with merge",
_ => throw new ArgumentException($"Not valid representation value '{netChangesRowFilterOption}'")
};

/// <summary>
/// Converts the AllChangesRowFilterOption enum to a string representation to be used in MS-SQL.
/// </summary>
/// <param name="representation">The enum representation of AllChangesRowFilterOption.</param>
/// <returns>String representation of AllChangesRowFilterOption.</returns>
/// <exception cref="ArgumentException"></exception>
public static string AllChangesRowFilterOptionToStringRepresentation(
AllChangesRowFilterOption allChangesRowFilterOption) => allChangesRowFilterOption switch
{
AllChangesRowFilterOption.All => "all",
AllChangesRowFilterOption.AllUpdateOld => "all update old",
_ => throw new ArgumentException($"Not valid representation value '{allChangesRowFilterOption}'")
};
}
23 changes: 23 additions & 0 deletions test/MsSqlCdc.Tests/DataConvertTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,27 @@ public void RelationOperatorToStringRepresentation(RelationalOperator relational
var stringRepresentation = DataConvert.RelationOperatorToStringRepresentation(relationalOperator);
stringRepresentation.Should().Be(expected);
}

[Theory]
[InlineData(NetChangesRowFilterOption.All, "all")]
[InlineData(NetChangesRowFilterOption.AllWithMask, "all with mask")]
[InlineData(NetChangesRowFilterOption.AllWithMerge, "all with merge")]
public void NetChangesRowFilterOptionToStringRepresentation(
NetChangesRowFilterOption filterOption,
string expected)
{
var stringRepresentation = DataConvert.NetChangesRowFilterOptionToStringRepresentation(filterOption);
stringRepresentation.Should().Be(expected);
}

[Theory]
[InlineData(AllChangesRowFilterOption.All, "all")]
[InlineData(AllChangesRowFilterOption.AllUpdateOld, "all update old")]
public void AllChangesRowFilterOptionToStringRepresentation(
AllChangesRowFilterOption filterOption,
string expected)
{
var stringRepresentation = DataConvert.AllChangesRowFilterOptionToStringRepresentation(filterOption);
stringRepresentation.Should().Be(expected);
}
}

0 comments on commit 279592d

Please sign in to comment.