Skip to content

Commit

Permalink
better handling of sql results being empty (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Dec 23, 2021
1 parent 34d6f4b commit 883c265
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 98 deletions.
41 changes: 37 additions & 4 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Data;

namespace MsSqlCdc;

Expand Down Expand Up @@ -57,7 +58,13 @@ public static async Task<bool> HasColumnChanged(
string columnName,
string updateMask)
{
return await CdcDatabase.HasColumnChanged(connection, captureInstance, columnName, updateMask);
var hasColumnChanged = await CdcDatabase.HasColumnChanged(connection, captureInstance, columnName, updateMask);
if (!hasColumnChanged.HasValue)
throw new Exception(@$"No returned value from 'IsBitSet'
using values {nameof(captureInstance)}: '{captureInstance}',
{nameof(columnName)}: '{columnName}',
{nameof(updateMask)}: '{updateMask}'.");
return hasColumnChanged.Value;
}

/// <summary>
Expand All @@ -78,7 +85,12 @@ public static async Task<int> GetColumnOrdinal(
string captureInstance,
string columnName)
{
return await CdcDatabase.GetColumnOrdinal(connection, captureInstance, columnName);
var columnOrdinal = await CdcDatabase.GetColumnOrdinal(connection, captureInstance, columnName);
if (!columnOrdinal.HasValue)
throw new Exception(@$"Could not get column ordinal on values {nameof(captureInstance)}: '{captureInstance}'
and {nameof(columnName)}: '{columnName}'.");

return columnOrdinal.Value;
}

/// <summary>
Expand All @@ -102,6 +114,10 @@ public static async Task<long> MapTimeToLsn(
{
var convertedRelationOperator = DataConvert.RelationOperatorToStringRepresentation(relationalOperator);
var lsnBytes = await CdcDatabase.MapTimeToLsn(connection, trackingTime, convertedRelationOperator);
if (lsnBytes is null)
throw new Exception(@$"Could not map time to lsn using values {nameof(trackingTime)}: '${trackingTime}'
and {nameof(relationalOperator)}: '${convertedRelationOperator}.
Response was empty.");
return DataConvert.ConvertBinaryLsn(lsnBytes);
}

Expand All @@ -118,7 +134,11 @@ public static async Task<long> MapTimeToLsn(
/// </returns>
public static async Task<DateTime> MapLsnToTime(SqlConnection connection, long lsn)
{
return await CdcDatabase.MapLsnToTime(connection, lsn);
var lsnToTime = await CdcDatabase.MapLsnToTime(connection, lsn);
if (!lsnToTime.HasValue)
throw new Exception($"Could not convert LSN to time with LSN being '{lsn}'");

return lsnToTime.Value;
}

/// <summary>
Expand All @@ -131,18 +151,25 @@ public static async Task<DateTime> MapLsnToTime(SqlConnection connection, long l
public static async Task<long> GetMinLsn(SqlConnection connection, string captureInstance)
{
var minLsnBytes = await CdcDatabase.GetMinLsn(connection, captureInstance);
if (minLsnBytes is null)
throw new Exception(@$"Could get min LSN using values {nameof(captureInstance)}: '${captureInstance}'");

return DataConvert.ConvertBinaryLsn(minLsnBytes);
}

/// <summary>
/// Get the maximum log sequence number (LSN) from the start_lsn column in the cdc.lsn_time_mapping system table.
/// You can use this function to return the high endpoint of the change data capture timeline for any capture instance.
/// You can use this function to return the high endpoint of the change
/// data capture timeline for any capture instance.
/// </summary>
/// <param name="connection">An open connection to a MS-SQL database.</param>
/// <returns>Return the high endpoint of the change data capture timeline for any capture instance.</returns>
public static async Task<long> GetMaxLsn(SqlConnection connection)
{
var maxLsnBytes = await CdcDatabase.GetMaxLsn(connection);
if (maxLsnBytes is null)
throw new Exception($"Could not get max LSN.");

return DataConvert.ConvertBinaryLsn(maxLsnBytes);
}

Expand All @@ -155,6 +182,9 @@ public static async Task<long> GetMaxLsn(SqlConnection connection)
public static async Task<long> GetPreviousLsn(SqlConnection connection, long lsn)
{
var previousLsnBytes = await CdcDatabase.DecrementLsn(connection, lsn);
if (previousLsnBytes is null)
throw new Exception($"Could not get previous lsn on {nameof(lsn)}: '{lsn}'.");

return DataConvert.ConvertBinaryLsn(previousLsnBytes);
}

Expand All @@ -167,6 +197,9 @@ public static async Task<long> GetPreviousLsn(SqlConnection connection, long lsn
public static async Task<long> GetNextLsn(SqlConnection connection, long lsn)
{
var nextLsnBytes = await CdcDatabase.IncrementLsn(connection, lsn);
if (nextLsnBytes is null)
throw new Exception($"Could not get next lsn on {nameof(lsn)}: '{lsn}'.");

return DataConvert.ConvertBinaryLsn(nextLsnBytes);
}

Expand Down
115 changes: 21 additions & 94 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,163 +18,90 @@ internal static class CdcDatabase
return (bool?)(await command.ExecuteScalarAsync());
}

public static async Task<bool> HasColumnChanged(
public static async Task<bool?> HasColumnChanged(
SqlConnection connection,
string captureInstance,
string columnName,
string updateMask)
{
var sql = "sys.fn_cdc_has_column_changed(@capture_instance, @column_name, @update_mask) AS has_column_changed";

var sql = "sys.fn_cdc_has_column_changed(@capture_instance, @column_name, @update_mask)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@capture_instance", captureInstance);
command.Parameters.AddWithValue("@column_name", columnName);
command.Parameters.AddWithValue("@update_mask", updateMask);

using var reader = await command.ExecuteReaderAsync();

var hasColumnChanged = false;
while (await reader.ReadAsync())
{
hasColumnChanged = (bool)reader["has_column_changed"];
}

return hasColumnChanged;
return (bool?)(await command.ExecuteScalarAsync());
}

public static async Task<int> GetColumnOrdinal(
public static async Task<int?> GetColumnOrdinal(
SqlConnection connection,
string captureInstance,
string columnName)
{
var sql = "SELECT sys.fn_cdc_get_column_ordinal(@capture_instance, @column_name) AS column_ordinal";

var sql = "SELECT sys.fn_cdc_get_column_ordinal(@capture_instance, @column_name)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@capture_instance", captureInstance);
command.Parameters.AddWithValue("@column_name", columnName);

using var reader = await command.ExecuteReaderAsync();

var columnOrdinal = -1;
while (await reader.ReadAsync())
{
columnOrdinal = (int)reader["column_ordinal"];
}

return columnOrdinal;
return (int?)(await command.ExecuteScalarAsync());
}

public static async Task<DateTime> MapLsnToTime(SqlConnection connection, long lsn)
public static async Task<DateTime?> MapLsnToTime(SqlConnection connection, long lsn)
{
var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn) AS lsn_time";
var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn)";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

using var reader = await command.ExecuteReaderAsync();

var lsnTime = default(DateTime);
while (await reader.ReadAsync())
{
lsnTime = (DateTime)reader["lsn_time"];
}

if (lsnTime == default(DateTime))
throw new Exception($"Could not convert LSN to time with LSN being '{lsn}'");

return lsnTime;
return (DateTime?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]> MapTimeToLsn(
public static async Task<byte[]?> MapTimeToLsn(
SqlConnection connection,
DateTime trackingTime,
string relationOperator)
{
var sql = "SELECT sys.fn_cdc_map_time_to_lsn(@relational_operator, @tracking_time) AS lsn";

var sql = "SELECT sys.fn_cdc_map_time_to_lsn(@relational_operator, @tracking_time)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@relational_operator", relationOperator);
command.Parameters.AddWithValue("@tracking_time", trackingTime);

using var reader = await command.ExecuteReaderAsync();

var lsn = new byte[10];
while (await reader.ReadAsync())
{
lsn = (byte[])reader["lsn"];
}

return lsn;
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]> GetMinLsn(SqlConnection connection, string captureInstance)
public static async Task<byte[]?> GetMinLsn(SqlConnection connection, string captureInstance)
{
var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance) AS min_lsn";

var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@capture_instance", captureInstance);

using var reader = await command.ExecuteReaderAsync();

var minLsn = new byte[10];
while (await reader.ReadAsync())
{
minLsn = (byte[])reader["min_lsn"];
}

return minLsn;
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]> GetMaxLsn(SqlConnection connection)
public static async Task<byte[]?> GetMaxLsn(SqlConnection connection)
{
var sql = "SELECT sys.fn_cdc_get_max_lsn() AS max_lsn";

using var command = new SqlCommand(sql, connection);
using SqlDataReader reader = await command.ExecuteReaderAsync();

var maxLsn = new byte[10];
while (await reader.ReadAsync())
{
maxLsn = (byte[])reader["max_lsn"];
}

return maxLsn;
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]> DecrementLsn(SqlConnection connection, long lsn)
public static async Task<byte[]?> DecrementLsn(SqlConnection connection, long lsn)
{
var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn) AS previous_lsn";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

using var reader = await command.ExecuteReaderAsync();

var nextLsn = new byte[10];
while (await reader.ReadAsync())
{
nextLsn = (byte[])reader["previous_lsn"];
}

return nextLsn;
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<byte[]> IncrementLsn(SqlConnection connection, long lsn)
public static async Task<byte[]?> IncrementLsn(SqlConnection connection, long lsn)
{
var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn) AS next_lsn";

using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@lsn", lsn);

using var reader = await command.ExecuteReaderAsync();

var nextLsn = new byte[10];
while (await reader.ReadAsync())
{
nextLsn = (byte[])reader["next_lsn"];
}

return nextLsn;
return (byte[]?)(await command.ExecuteScalarAsync());
}

public static async Task<List<List<(string fieldName, object fieldValue)>>> GetAllChanges(
Expand Down

0 comments on commit 883c265

Please sign in to comment.