Skip to content

Commit

Permalink
convert biginteger to endian binary array (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Jan 12, 2022
1 parent 1fc381c commit 79c05fd
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
18 changes: 13 additions & 5 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public static async Task<BigInteger> MapTimeToLsn(
/// </returns>
public static async Task<DateTime> MapLsnToTime(SqlConnection connection, BigInteger lsn)
{
var lsnToTime = await CdcDatabase.MapLsnToTime(connection, lsn);
var binaryLsn = DataConvert.ConvertLsnBigEridian(lsn);
var lsnToTime = await CdcDatabase.MapLsnToTime(connection, binaryLsn);
if (!lsnToTime.HasValue)
throw new Exception($"Could not convert LSN to time with LSN being '{lsn}'");

Expand Down Expand Up @@ -232,7 +233,8 @@ public static async Task<BigInteger> GetMaxLsn(SqlConnection connection)
/// <returns>Return the high endpoint of the change data capture timeline for any capture instance.</returns>
public static async Task<BigInteger> GetPreviousLsn(SqlConnection connection, BigInteger lsn)
{
var previousLsnBytes = await CdcDatabase.DecrementLsn(connection, lsn);
var binaryLsn = DataConvert.ConvertLsnBigEridian(lsn);
var previousLsnBytes = await CdcDatabase.DecrementLsn(connection, binaryLsn);
if (previousLsnBytes is null)
throw new Exception($"Could not get previous lsn on {nameof(lsn)}: '{lsn}'.");

Expand All @@ -247,7 +249,8 @@ public static async Task<BigInteger> GetPreviousLsn(SqlConnection connection, Bi
/// <returns>Get the next log sequence number (LSN) in the sequence based upon the specified LSN.</returns>
public static async Task<BigInteger> GetNextLsn(SqlConnection connection, BigInteger lsn)
{
var nextLsnBytes = await CdcDatabase.IncrementLsn(connection, lsn);
var lsnBinary = DataConvert.ConvertLsnBigEridian(lsn);
var nextLsnBytes = await CdcDatabase.IncrementLsn(connection, lsnBinary);
if (nextLsnBytes is null)
throw new Exception($"Could not get next lsn on {nameof(lsn)}: '{lsn}'.");

Expand All @@ -271,9 +274,11 @@ public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetNetChanges(
BigInteger toLsn,
NetChangesRowFilterOption netChangesRowFilterOption = NetChangesRowFilterOption.All)
{
var beginLsnBinary = DataConvert.ConvertLsnBigEridian(fromLsn);
var endLsnBinary = DataConvert.ConvertLsnBigEridian(toLsn);
var filterOption = DataConvert.ConvertNetChangesRowFilterOption(netChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetNetChanges(
connection, captureInstance, fromLsn, toLsn, filterOption);
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
}

Expand All @@ -296,8 +301,11 @@ public static async Task<IReadOnlyCollection<ChangeRow<dynamic>>> GetAllChanges(
BigInteger endLsn,
AllChangesRowFilterOption allChangesRowFilterOption = AllChangesRowFilterOption.All)
{
var beginLsnBinary = DataConvert.ConvertLsnBigEridian(beginLsn);
var endLsnBinary = DataConvert.ConvertLsnBigEridian(endLsn);
var filterOption = DataConvert.ConvertAllChangesRowFilterOption(allChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetAllChanges(connection, captureInstance, beginLsn, endLsn, filterOption);
var cdcColumns = await CdcDatabase.GetAllChanges(
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption);
return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList();
}
}
31 changes: 14 additions & 17 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ internal static class CdcDatabase
public static async Task<bool?> IsBitSet(SqlConnection connection, int position, string updateMask)
{
var sql = "sys.fn_cdc_is_bit_set(@position, @update_mask )";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@position", position);
command.Parameters.AddWithValue("@updateMask", updateMask);
Expand Down Expand Up @@ -47,12 +46,11 @@ internal static class CdcDatabase
return (int?)(await command.ExecuteScalarAsync());
}

public static async Task<DateTime?> MapLsnToTime(SqlConnection connection, BigInteger lsn)
public static async Task<DateTime?> MapLsnToTime(SqlConnection connection, byte[] lsn)
{
var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn)";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn.ToByteArray());
command.Parameters.AddWithValue("@lsn", lsn);

return (DateTime?)(await command.ExecuteScalarAsync());
}
Expand Down Expand Up @@ -87,29 +85,29 @@ internal static class CdcDatabase
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]?> DecrementLsn(SqlConnection connection, BigInteger lsn)
public static async Task<byte[]?> DecrementLsn(SqlConnection connection, byte[] lsn)
{
var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn.ToByteArray());
command.Parameters.AddWithValue("@lsn", lsn);

return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]?> IncrementLsn(SqlConnection connection, BigInteger lsn)
public static async Task<byte[]?> IncrementLsn(SqlConnection connection, byte[] lsn)
{
var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn.ToByteArray());
command.Parameters.AddWithValue("@lsn", lsn);

return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<List<List<(string fieldName, object fieldValue)>>> GetAllChanges(
SqlConnection connection,
string captureInstance,
BigInteger beginLsn,
BigInteger endLsn,
byte[] beginLsn,
byte[] endLsn,
string filterOption)
{
return await GetChanges(
Expand All @@ -124,8 +122,8 @@ internal static class CdcDatabase
public static async Task<List<List<(string fieldName, object fieldValue)>>> GetNetChanges(
SqlConnection connection,
string captureInstance,
BigInteger beginLsn,
BigInteger endLsn,
byte[] beginLsn,
byte[] endLsn,
string filterOption)
{
return await GetChanges(
Expand All @@ -141,15 +139,14 @@ internal static class CdcDatabase
SqlConnection connection,
string cdcFunction,
string captureInstance,
BigInteger beginLsn,
BigInteger endLsn,
byte[] beginLsn,
byte[] endLsn,
string filterOption)
{
var sql = $"SELECT * FROM {cdcFunction}_{captureInstance}(@begin_lsn, @end_lsn, @filter_option)";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@begin_lsn", beginLsn.ToByteArray());
command.Parameters.AddWithValue("@end_lsn", endLsn.ToByteArray());
command.Parameters.AddWithValue("@begin_lsn", beginLsn);
command.Parameters.AddWithValue("@end_lsn", endLsn);
command.Parameters.AddWithValue("@filter_option", filterOption);

var changes = new List<List<(string name, object value)>>();
Expand Down
7 changes: 7 additions & 0 deletions src/MsSqlCdc/DataConvert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public static string ConvertAllChangesRowFilterOption(
_ => throw new ArgumentException($"Not valid representation value '{allChangesRowFilterOption}'")
};

/// <summary>
/// Convert LSN BigInteger to ByteArray in BigEndian format.
/// </summary>
/// <param name="representation">BigInteger representation of LSN.</param>
/// <returns>Binary array of BigInteger LSN.</returns>
public static byte[] ConvertLsnBigEridian(BigInteger lsn) => lsn.ToByteArray().Reverse().ToArray();

/// <summary>
/// Convert the binary representation of the line-sequence-number to BigInteger.
/// Automatically handle endianness doing the conversion.
Expand Down

0 comments on commit 79c05fd

Please sign in to comment.