diff --git a/src/MsSqlCdc/Cdc.cs b/src/MsSqlCdc/Cdc.cs index df51385..25125d3 100644 --- a/src/MsSqlCdc/Cdc.cs +++ b/src/MsSqlCdc/Cdc.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Collections.Generic; using Microsoft.Data.SqlClient; +using System.Data; namespace MsSqlCdc; @@ -57,7 +58,13 @@ public static async Task HasColumnChanged( string columnName, string updateMask) { - return await CdcDatabase.HasColumnChanged(connection, captureInstance, columnName, updateMask); + var hasColumnChanged = await CdcDatabase.HasColumnChanged(connection, captureInstance, columnName, updateMask); + if (!hasColumnChanged.HasValue) + throw new Exception(@$"No returned value from 'IsBitSet' + using values {nameof(captureInstance)}: '{captureInstance}', + {nameof(columnName)}: '{columnName}', + {nameof(updateMask)}: '{updateMask}'."); + return hasColumnChanged.Value; } /// @@ -78,7 +85,12 @@ public static async Task GetColumnOrdinal( string captureInstance, string columnName) { - return await CdcDatabase.GetColumnOrdinal(connection, captureInstance, columnName); + var columnOrdinal = await CdcDatabase.GetColumnOrdinal(connection, captureInstance, columnName); + if (!columnOrdinal.HasValue) + throw new Exception(@$"Could not get column ordinal on values {nameof(captureInstance)}: '{captureInstance}' + and {nameof(columnName)}: '{columnName}'."); + + return columnOrdinal.Value; } /// @@ -102,6 +114,10 @@ public static async Task MapTimeToLsn( { var convertedRelationOperator = DataConvert.RelationOperatorToStringRepresentation(relationalOperator); var lsnBytes = await CdcDatabase.MapTimeToLsn(connection, trackingTime, convertedRelationOperator); + if (lsnBytes is null) + throw new Exception(@$"Could not map time to lsn using values {nameof(trackingTime)}: '${trackingTime}' + and {nameof(relationalOperator)}: '${convertedRelationOperator}. + Response was empty."); return DataConvert.ConvertBinaryLsn(lsnBytes); } @@ -118,7 +134,11 @@ public static async Task MapTimeToLsn( /// public static async Task MapLsnToTime(SqlConnection connection, long lsn) { - return await CdcDatabase.MapLsnToTime(connection, lsn); + var lsnToTime = await CdcDatabase.MapLsnToTime(connection, lsn); + if (!lsnToTime.HasValue) + throw new Exception($"Could not convert LSN to time with LSN being '{lsn}'"); + + return lsnToTime.Value; } /// @@ -131,18 +151,25 @@ public static async Task MapLsnToTime(SqlConnection connection, long l public static async Task GetMinLsn(SqlConnection connection, string captureInstance) { var minLsnBytes = await CdcDatabase.GetMinLsn(connection, captureInstance); + if (minLsnBytes is null) + throw new Exception(@$"Could get min LSN using values {nameof(captureInstance)}: '${captureInstance}'"); + return DataConvert.ConvertBinaryLsn(minLsnBytes); } /// /// Get the maximum log sequence number (LSN) from the start_lsn column in the cdc.lsn_time_mapping system table. - /// You can use this function to return the high endpoint of the change data capture timeline for any capture instance. + /// You can use this function to return the high endpoint of the change + /// data capture timeline for any capture instance. /// /// An open connection to a MS-SQL database. /// Return the high endpoint of the change data capture timeline for any capture instance. public static async Task GetMaxLsn(SqlConnection connection) { var maxLsnBytes = await CdcDatabase.GetMaxLsn(connection); + if (maxLsnBytes is null) + throw new Exception($"Could not get max LSN."); + return DataConvert.ConvertBinaryLsn(maxLsnBytes); } @@ -155,6 +182,9 @@ public static async Task GetMaxLsn(SqlConnection connection) public static async Task GetPreviousLsn(SqlConnection connection, long lsn) { var previousLsnBytes = await CdcDatabase.DecrementLsn(connection, lsn); + if (previousLsnBytes is null) + throw new Exception($"Could not get previous lsn on {nameof(lsn)}: '{lsn}'."); + return DataConvert.ConvertBinaryLsn(previousLsnBytes); } @@ -167,6 +197,9 @@ public static async Task GetPreviousLsn(SqlConnection connection, long lsn public static async Task GetNextLsn(SqlConnection connection, long lsn) { var nextLsnBytes = await CdcDatabase.IncrementLsn(connection, lsn); + if (nextLsnBytes is null) + throw new Exception($"Could not get next lsn on {nameof(lsn)}: '{lsn}'."); + return DataConvert.ConvertBinaryLsn(nextLsnBytes); } diff --git a/src/MsSqlCdc/CdcDatabase.cs b/src/MsSqlCdc/CdcDatabase.cs index f43b771..7508c27 100644 --- a/src/MsSqlCdc/CdcDatabase.cs +++ b/src/MsSqlCdc/CdcDatabase.cs @@ -18,163 +18,90 @@ internal static class CdcDatabase return (bool?)(await command.ExecuteScalarAsync()); } - public static async Task HasColumnChanged( + public static async Task HasColumnChanged( SqlConnection connection, string captureInstance, string columnName, string updateMask) { - var sql = "sys.fn_cdc_has_column_changed(@capture_instance, @column_name, @update_mask) AS has_column_changed"; - + var sql = "sys.fn_cdc_has_column_changed(@capture_instance, @column_name, @update_mask)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@capture_instance", captureInstance); command.Parameters.AddWithValue("@column_name", columnName); command.Parameters.AddWithValue("@update_mask", updateMask); - using var reader = await command.ExecuteReaderAsync(); - - var hasColumnChanged = false; - while (await reader.ReadAsync()) - { - hasColumnChanged = (bool)reader["has_column_changed"]; - } - - return hasColumnChanged; + return (bool?)(await command.ExecuteScalarAsync()); } - public static async Task GetColumnOrdinal( + public static async Task GetColumnOrdinal( SqlConnection connection, string captureInstance, string columnName) { - var sql = "SELECT sys.fn_cdc_get_column_ordinal(@capture_instance, @column_name) AS column_ordinal"; - + var sql = "SELECT sys.fn_cdc_get_column_ordinal(@capture_instance, @column_name)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@capture_instance", captureInstance); command.Parameters.AddWithValue("@column_name", columnName); - using var reader = await command.ExecuteReaderAsync(); - - var columnOrdinal = -1; - while (await reader.ReadAsync()) - { - columnOrdinal = (int)reader["column_ordinal"]; - } - - return columnOrdinal; + return (int?)(await command.ExecuteScalarAsync()); } - public static async Task MapLsnToTime(SqlConnection connection, long lsn) + public static async Task MapLsnToTime(SqlConnection connection, long lsn) { - var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn) AS lsn_time"; + var sql = "SELECT sys.fn_cdc_map_lsn_to_time(@lsn)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - using var reader = await command.ExecuteReaderAsync(); - - var lsnTime = default(DateTime); - while (await reader.ReadAsync()) - { - lsnTime = (DateTime)reader["lsn_time"]; - } - - if (lsnTime == default(DateTime)) - throw new Exception($"Could not convert LSN to time with LSN being '{lsn}'"); - - return lsnTime; + return (DateTime?)(await command.ExecuteScalarAsync()); } - public static async Task MapTimeToLsn( + public static async Task MapTimeToLsn( SqlConnection connection, DateTime trackingTime, string relationOperator) { - var sql = "SELECT sys.fn_cdc_map_time_to_lsn(@relational_operator, @tracking_time) AS lsn"; - + var sql = "SELECT sys.fn_cdc_map_time_to_lsn(@relational_operator, @tracking_time)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@relational_operator", relationOperator); command.Parameters.AddWithValue("@tracking_time", trackingTime); - using var reader = await command.ExecuteReaderAsync(); - - var lsn = new byte[10]; - while (await reader.ReadAsync()) - { - lsn = (byte[])reader["lsn"]; - } - - return lsn; + return (byte[]?)(await command.ExecuteScalarAsync()); } - public static async Task GetMinLsn(SqlConnection connection, string captureInstance) + public static async Task GetMinLsn(SqlConnection connection, string captureInstance) { - var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance) AS min_lsn"; - + var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance)"; using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@capture_instance", captureInstance); - using var reader = await command.ExecuteReaderAsync(); - - var minLsn = new byte[10]; - while (await reader.ReadAsync()) - { - minLsn = (byte[])reader["min_lsn"]; - } - - return minLsn; + return (byte[]?)(await command.ExecuteScalarAsync()); } - public static async Task GetMaxLsn(SqlConnection connection) + public static async Task GetMaxLsn(SqlConnection connection) { var sql = "SELECT sys.fn_cdc_get_max_lsn() AS max_lsn"; - using var command = new SqlCommand(sql, connection); - using SqlDataReader reader = await command.ExecuteReaderAsync(); - var maxLsn = new byte[10]; - while (await reader.ReadAsync()) - { - maxLsn = (byte[])reader["max_lsn"]; - } - - return maxLsn; + return (byte[]?)(await command.ExecuteScalarAsync()); } - public static async Task DecrementLsn(SqlConnection connection, long lsn) + public static async Task DecrementLsn(SqlConnection connection, long lsn) { var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn) AS previous_lsn"; - using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - using var reader = await command.ExecuteReaderAsync(); - - var nextLsn = new byte[10]; - while (await reader.ReadAsync()) - { - nextLsn = (byte[])reader["previous_lsn"]; - } - - return nextLsn; + return (byte[]?)(await command.ExecuteScalarAsync()); } - public static async Task IncrementLsn(SqlConnection connection, long lsn) + public static async Task IncrementLsn(SqlConnection connection, long lsn) { var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn) AS next_lsn"; - using var command = new SqlCommand(sql, connection); command.Parameters.AddWithValue("@lsn", lsn); - using var reader = await command.ExecuteReaderAsync(); - - var nextLsn = new byte[10]; - while (await reader.ReadAsync()) - { - nextLsn = (byte[])reader["next_lsn"]; - } - - return nextLsn; + return (byte[]?)(await command.ExecuteScalarAsync()); } public static async Task>> GetAllChanges(