From f7c08a74a5e27515076ec5fd34ee277c320efa02 Mon Sep 17 00:00:00 2001 From: Rune Nielsen Date: Sat, 15 Jan 2022 20:52:11 +0100 Subject: [PATCH] adds integration test suite (#49) * now does integration tests for get min and max lsn * adds integration tests for get previous and next lsn * adds integration tests for map time to lsn and map lsn to time * adds integration tests for get all changes * adds mssql external service to enable integration tests in circleci --- .circleci/config.yml | 29 ++- README.md | 68 +++---- src/MsSqlCdc/Cdc.cs | 2 +- test/MsSqlCdc.Tests/CdcTests.cs | 217 ++++++++++++++++++++++ test/MsSqlCdc.Tests/DatabaseFixture.cs | 72 +++++++ test/MsSqlCdc.Tests/MsSqlCdc.Tests.csproj | 6 + test/MsSqlCdc.Tests/Scripts/SetupDB.sql | 33 ++++ 7 files changed, 391 insertions(+), 36 deletions(-) create mode 100644 test/MsSqlCdc.Tests/CdcTests.cs create mode 100644 test/MsSqlCdc.Tests/DatabaseFixture.cs create mode 100644 test/MsSqlCdc.Tests/Scripts/SetupDB.sql diff --git a/.circleci/config.yml b/.circleci/config.yml index 899772b..424da8f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,22 @@ executors: auth: username: $DOCKER_LOGIN password: $DOCKER_ACCESSTOKEN + + dotnet-core-sdk-and-mssql-service: + docker: + - image: mcr.microsoft.com/dotnet/sdk:6.0 + auth: + username: $DOCKER_LOGIN + password: $DOCKER_ACCESSTOKEN + - image: mcr.microsoft.com/mssql/server:2019-CU13-ubuntu-20.04 # For integration testing + auth: + username: $DOCKER_LOGIN + password: $DOCKER_ACCESSTOKEN + environment: + SA_PASSWORD: myAwesomePassword1 + MSSQL_AGENT_ENABLED: True + ACCEPT_EULA: Y + jobs: build-app: executor: dotnet-core-sdk @@ -17,9 +33,20 @@ jobs: command: dotnet build test-app: - executor: dotnet-core-sdk + executor: dotnet-core-sdk-and-mssql-service steps: - checkout + - run: + name: install dockerize + command: | + wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz + tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz + rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz + environment: + DOCKERIZE_VERSION: v0.3.0 + - run: + name: Wait for db + command: dockerize -wait tcp://localhost:1433 -timeout 1m - run: name: Test command: dotnet test diff --git a/README.md b/README.md index 73c2923..33b68f2 100644 --- a/README.md +++ b/README.md @@ -14,115 +14,115 @@ Usage examples can be found under the [example folder](https://github.com/DAXGRI ## API -### Get is bit set +### Get min LSN -Indicates whether a captured column has been updated by checking whether its ordinal position is set within a provided bitmask. +Get the start_lsn column value for the specified capture instance from the cdc.change_tables system table. This value represents the low endpoint of the validity interval for the capture instance. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var columnOrdinal = await Cdc.GetColumnOrdinal(connection, "dbo_Employee", "Salary"); -var isBitSet = await Cdc.IsBitSet(connection, columnOrdinal, "my_update_mask"); +var minLsn = await Cdc.GetMinLsn(connection, "dbo_Employee"); ``` -### Has column changed +### Get max LSN -Identifies whether the update mask on the specified column has been updated in the associated change row. +Get the maximum log sequence number (LSN) from the start_lsn column in the cdc.lsn_time_mapping system table. You can use this function to return the high endpoint of the change data capture timeline for any capture instance. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -await Cdc.HasColumnChanged(connection, "dbo_Employee", "Salary", "my_update_mask"); +var maxLsn = await Cdc.GetMaxLsn(connection); ``` -### Get column ordinal +### Get previous LSN -Get the column ordinal of the specified column as it appears in the change table associated with the specified capture instance. +Get the previous log sequence number (LSN) in the sequence based upon the specified LSN. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var columnOrdinal = await Cdc.GetColumnOrdinal(connection, "dbo_Employee", "Salary"); +var previousLsn = await Cdc.GetPreviousLsn(connection, 120000); ``` -### Map time to LSN +### Get next LSN -Map the log sequence number (LSN) value from the start_lsn column in the cdc.lsn_time_mapping system table for the specified time. +Get the next log sequence number (LSN) in the sequence based upon the specified LSN. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var lsn = await Cdc.MapTimeToLsn(connection, DateTime.UtcNow, RelationalOperator.LargestLessThan); +var nextLsn = await Cdc.GetNextLsn(connection, 120000); ``` -### Map LSN to time +### Map time to LSN -Map date and time value from the tran_end_time column in the cdc.lsn_time_mapping system table for the specified log sequence number (LSN). You can use this function to systematically map LSN ranges to date ranges in a change table. +Map the log sequence number (LSN) value from the start_lsn column in the cdc.lsn_time_mapping system table for the specified time. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var time = await Cdc.MapLsnToTime(connection, 120000); +var lsn = await Cdc.MapTimeToLsn(connection, DateTime.UtcNow, RelationalOperator.LargestLessThan); ``` -### Get min LSN +### Map LSN to time -Get the start_lsn column value for the specified capture instance from the cdc.change_tables system table. This value represents the low endpoint of the validity interval for the capture instance. +Map date and time value from the tran_end_time column in the cdc.lsn_time_mapping system table for the specified log sequence number (LSN). You can use this function to systematically map LSN ranges to date ranges in a change table. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var minLsn = await Cdc.GetMinLsn(connection, "dbo_Employee"); +var time = await Cdc.MapLsnToTime(connection, 120000); ``` -### Get max LSN +### Get all changes -Get the maximum log sequence number (LSN) from the start_lsn column in the cdc.lsn_time_mapping system table. You can use this function to return the high endpoint of the change data capture timeline for any capture instance. +Get one row for each change applied to the source table within the specified log sequence number (LSN) range. If a source row had multiple changes during the interval, each change is represented in the returned result set. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var maxLsn = await Cdc.GetMaxLsn(connection); +var allChanges = await Cdc.GetAllChanges(connection, "dbo_Employee", 120000, 120020); ``` -### Get previous LSN +### Get net changes -Get the previous log sequence number (LSN) in the sequence based upon the specified LSN. +Get one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var previousLsn = await Cdc.GetPreviousLsn(connection, 120000); +var netChanges = await Cdc.GetNetChanges(connection, "dbo_Employee", 120000, 120020); ``` -### Get next LSN +### Get is bit set -Get the next log sequence number (LSN) in the sequence based upon the specified LSN. +Indicates whether a captured column has been updated by checking whether its ordinal position is set within a provided bitmask. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var nextLsn = await Cdc.GetNextLsn(connection, 120000); +var columnOrdinal = await Cdc.GetColumnOrdinal(connection, "dbo_Employee", "Salary"); +var isBitSet = await Cdc.IsBitSet(connection, columnOrdinal, "my_update_mask"); ``` -### Get net changes +### Has column changed -Get one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range. +Identifies whether the update mask on the specified column has been updated in the associated change row. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var netChanges = await Cdc.GetNetChanges(connection, "dbo_Employee", 120000, 120020); +await Cdc.HasColumnChanged(connection, "dbo_Employee", "Salary", "my_update_mask"); ``` -### Get all changes +### Get column ordinal -Get one row for each change applied to the source table within the specified log sequence number (LSN) range. If a source row had multiple changes during the interval, each change is represented in the returned result set. +Get the column ordinal of the specified column as it appears in the change table associated with the specified capture instance. ```c# using var connection = new SqlConnection("myConnectionString"); await connection.OpenAsync(); -var allChanges = await Cdc.GetAllChanges(connection, "dbo_Employee", 120000, 120020); +var columnOrdinal = await Cdc.GetColumnOrdinal(connection, "dbo_Employee", "Salary"); ``` ## Setup CDC on MS-SQL Server diff --git a/src/MsSqlCdc/Cdc.cs b/src/MsSqlCdc/Cdc.cs index 8ebdce5..34d24cf 100644 --- a/src/MsSqlCdc/Cdc.cs +++ b/src/MsSqlCdc/Cdc.cs @@ -167,7 +167,7 @@ public static async Task MapTimeToLsn( var lsnBytes = await CdcDatabase.MapTimeToLsn(connection, trackingTime, convertedRelationOperator); if (lsnBytes is null) throw new Exception(@$"Could not map time to lsn using values {nameof(trackingTime)}: '${trackingTime}' - and {nameof(relationalOperator)}: '${convertedRelationOperator}. + and {nameof(relationalOperator)}: '{convertedRelationOperator}. Response was empty."); return DataConvert.ConvertBinaryLsn(lsnBytes); } diff --git a/test/MsSqlCdc.Tests/CdcTests.cs b/test/MsSqlCdc.Tests/CdcTests.cs new file mode 100644 index 0000000..958b948 --- /dev/null +++ b/test/MsSqlCdc.Tests/CdcTests.cs @@ -0,0 +1,217 @@ +using System; +using System.Linq; +using System.Numerics; +using System.Threading.Tasks; +using FluentAssertions; +using FluentAssertions.Execution; +using Microsoft.Data.SqlClient; +using Xunit; + +namespace MsSqlCdc.Tests; + +public class CdcTests : IClassFixture +{ + private readonly DatabaseFixture _databaseFixture; + + public CdcTests(DatabaseFixture databaseFixture) + { + _databaseFixture = databaseFixture; + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_min_lsn() + { + var captureInstance = "dbo_Employee"; + using var connection = await CreateOpenSqlConnection(); + + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + + minLsn.Should().NotBe(default(BigInteger)); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_max_lsn() + { + using var connection = await CreateOpenSqlConnection(); + + var maxLsn = await Cdc.GetMaxLsn(connection); + + maxLsn.Should().NotBe(default(BigInteger)); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_previous_lsn() + { + using var connection = await CreateOpenSqlConnection(); + // We use the max LSN to get an realistic LSN number for testing. + var maxLsn = await Cdc.GetMaxLsn(connection); + + var previousLsn = await Cdc.GetPreviousLsn(connection, maxLsn); + + previousLsn.Should() + .BeLessThan(maxLsn).And + .NotBe(default(BigInteger)); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_next_lsn() + { + using var connection = await CreateOpenSqlConnection(); + // We use the max LSN to get an realistic LSN number for testing. + var maxLsn = await Cdc.GetMaxLsn(connection); + + var previousLsn = await Cdc.GetNextLsn(connection, maxLsn); + + previousLsn.Should() + .BeGreaterThan(maxLsn).And + .BeGreaterThan(default(BigInteger)); + } + + [Theory] + [InlineData(RelationalOperator.LargestLessThan, 0)] + [InlineData(RelationalOperator.LargestLessThanOrEqual, 0)] + [InlineData(RelationalOperator.SmallestGreaterThan, -100)] + [InlineData(RelationalOperator.SmallestGreaterThanOrEqual, -100)] + [Trait("Category", "Integration")] + public async Task Map_time_to_lsn(RelationalOperator relationalOperator, int secondsFromNow) + { + using var connection = await CreateOpenSqlConnection(); + var now = DateTime.UtcNow.AddSeconds(secondsFromNow); + + var lsn = await Cdc.MapTimeToLsn(connection, now, relationalOperator); + + lsn.Should().NotBe(default(BigInteger)); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Map_lsn_to_time() + { + using var connection = await CreateOpenSqlConnection(); + // We use the max LSN to get an realistic LSN number for testing. + var maxLsn = await Cdc.GetMaxLsn(connection); + + var time = await Cdc.MapLsnToTime(connection, maxLsn); + + time.ToUniversalTime().Should() + .NotBe(default(DateTime)).And + .BeBefore(DateTime.UtcNow); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_all_changes_rowfilter_all() + { + var captureInstance = "dbo_Employee"; + using var connection = await CreateOpenSqlConnection(); + + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + var maxLsn = await Cdc.GetMaxLsn(connection); + + var netChanges = (await Cdc.GetAllChanges( + connection, + captureInstance, + minLsn, + maxLsn, + AllChangesRowFilterOption.All)).ToArray(); + + var insert = netChanges[0]; + var afterUpdate = netChanges[1]; + + using (var scope = new AssertionScope()) + { + netChanges.Length.Should().Be(2); + insert.CaptureInstance.Should().Be(captureInstance); + insert.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + insert.SequenceValue.Should().BeGreaterThan(default(BigInteger)); + insert.Operation.Should().Be(Operation.Insert); + insert.Fields["id"].Should().NotBeNull(); + insert.Fields["first_name"].Should().Be("Rune"); + insert.Fields["last_name"].Should().Be("Nielsen"); + + afterUpdate.CaptureInstance.Should().Be(captureInstance); + afterUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + afterUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); + afterUpdate.Operation.Should().Be(Operation.AfterUpdate); + afterUpdate.Fields["id"].Should().NotBeNull(); + afterUpdate.Fields["first_name"].Should().Be("Rune"); + afterUpdate.Fields["last_name"].Should().Be("Jensen"); + } + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_all_changes_rowfilter_all_update_old() + { + var captureInstance = "dbo_Employee"; + using var connection = await CreateOpenSqlConnection(); + + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + var maxLsn = await Cdc.GetMaxLsn(connection); + + var netChanges = (await Cdc.GetAllChanges( + connection, + captureInstance, + minLsn, + maxLsn, + AllChangesRowFilterOption.AllUpdateOld)).ToArray(); + + var insert = netChanges[0]; + var beforeUpdate = netChanges[1]; + var afterUpdate = netChanges[2]; + + using (var scope = new AssertionScope()) + { + netChanges.Length.Should().Be(3); + insert.CaptureInstance.Should().Be(captureInstance); + insert.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + insert.SequenceValue.Should().BeGreaterThan(default(BigInteger)); + insert.Operation.Should().Be(Operation.Insert); + insert.Fields["id"].Should().NotBeNull(); + insert.Fields["first_name"].Should().Be("Rune"); + insert.Fields["last_name"].Should().Be("Nielsen"); + + beforeUpdate.CaptureInstance.Should().Be(captureInstance); + beforeUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + beforeUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); + beforeUpdate.Operation.Should().Be(Operation.BeforeUpdate); + beforeUpdate.Fields["id"].Should().NotBeNull(); + beforeUpdate.Fields["first_name"].Should().Be("Rune"); + beforeUpdate.Fields["last_name"].Should().Be("Nielsen"); + + afterUpdate.CaptureInstance.Should().Be(captureInstance); + afterUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + afterUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); + afterUpdate.Operation.Should().Be(Operation.AfterUpdate); + afterUpdate.Fields["id"].Should().NotBeNull(); + afterUpdate.Fields["first_name"].Should().Be("Rune"); + afterUpdate.Fields["last_name"].Should().Be("Jensen"); + } + } + + private async Task CreateOpenSqlConnection() + { + var connection = new SqlConnection(_databaseFixture.ConnectionString); + await connection.OpenAsync(); + return connection; + } + + private async Task InsertEmployee(Guid id, string firstName, string lastName) + { + using var connection = await CreateOpenSqlConnection(); + var sql = @" + INSERT INTO [dbo].[employee] ([id], [first_name], [last_name]) + VALUES(@id, @first_name, @last_name)"; + + using var cmd = new SqlCommand(sql, connection); + cmd.Parameters.AddWithValue("@id", id); + cmd.Parameters.AddWithValue("@first_name", firstName); + cmd.Parameters.AddWithValue("@last_name", lastName); + + await cmd.ExecuteNonQueryAsync(); + } +} diff --git a/test/MsSqlCdc.Tests/DatabaseFixture.cs b/test/MsSqlCdc.Tests/DatabaseFixture.cs new file mode 100644 index 0000000..42263f0 --- /dev/null +++ b/test/MsSqlCdc.Tests/DatabaseFixture.cs @@ -0,0 +1,72 @@ +using System; +using System.IO; +using System.Threading; +using Microsoft.Data.SqlClient; +using Microsoft.SqlServer.Management.Common; +using Microsoft.SqlServer.Management.Smo; + +namespace MsSqlCdc.Tests; + +public class DatabaseFixture +{ + private const string MasterDatabaseName = "master"; + private const string TestDatabaseName = "mssql_cdc_test"; + public string ConnectionString => CreateConnectionString(TestDatabaseName); + + public DatabaseFixture() + { + DeleteDatabase(); + SetupDatabase(); + + // We do this because the setup process is quite intensive for the SQL database. + // So before it can be used in tests, we want to make sure that the CDC tables are ready to be consumed. + Thread.Sleep(3000); + } + + private void SetupDatabase() + { + using var connection = new SqlConnection(CreateConnectionString(MasterDatabaseName)); + connection.Open(); + var setupSql = File.ReadAllText(GetRootPath("Scripts/SetupDB.sql")); + var server = new Server(new ServerConnection(connection)); + server.ConnectionContext.ExecuteNonQuery(setupSql); + } + + private void DeleteDatabase() + { + var deleteDatabaseSql = $@" + IF DB_ID('{TestDatabaseName}') IS NOT NULL + BEGIN + ALTER DATABASE {TestDatabaseName} SET SINGLE_USER WITH ROLLBACK IMMEDIATE; + DROP DATABASE {TestDatabaseName}; + END;"; + + using var connection = new SqlConnection(CreateConnectionString(MasterDatabaseName)); + connection.Open(); + using var cmd = new SqlCommand(deleteDatabaseSql, connection); + cmd.ExecuteNonQuery(); + } + + private static string GetRootPath(string filePath) + { + var absolutePath = Path.IsPathRooted(filePath) + ? filePath + : Path.GetRelativePath(Directory.GetCurrentDirectory(), filePath); + + if (!File.Exists(absolutePath)) + throw new ArgumentException($"Could not find file at path: {absolutePath}"); + + return absolutePath; + } + + private static string CreateConnectionString(string initialCatalog) + { + var builder = new SqlConnectionStringBuilder(); + builder.DataSource = "localhost"; + builder.UserID = "sa"; + builder.Password = "myAwesomePassword1"; + builder.InitialCatalog = initialCatalog; + builder.Encrypt = false; + return builder.ConnectionString; + } +} diff --git a/test/MsSqlCdc.Tests/MsSqlCdc.Tests.csproj b/test/MsSqlCdc.Tests/MsSqlCdc.Tests.csproj index 86171f8..f0a5c90 100644 --- a/test/MsSqlCdc.Tests/MsSqlCdc.Tests.csproj +++ b/test/MsSqlCdc.Tests/MsSqlCdc.Tests.csproj @@ -11,6 +11,8 @@ + + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -24,5 +26,9 @@ + + + + diff --git a/test/MsSqlCdc.Tests/Scripts/SetupDB.sql b/test/MsSqlCdc.Tests/Scripts/SetupDB.sql new file mode 100644 index 0000000..87f915a --- /dev/null +++ b/test/MsSqlCdc.Tests/Scripts/SetupDB.sql @@ -0,0 +1,33 @@ +CREATE DATABASE mssql_cdc_test +GO + +USE mssql_cdc_test +GO + +-- Employee TABLE +CREATE TABLE [dbo].[employee]( + [id] [uniqueidentifier] NOT NULL, + [first_name] [varchar](50) NULL, + [last_name] [varchar](50) NULL, + PRIMARY KEY (id)); +GO + +-- Enable CDC on Database +EXEC sys.sp_cdc_enable_db; +GO + +-- Enable CDC on employee table +EXECUTE sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'employee', + @role_name = N'null'; +GO + +INSERT INTO [dbo].[employee] ([id], [first_name], [last_name]) +VALUES('653f11df-ee89-4e17-ac01-d6542f007ea1', 'Rune', 'Nielsen'); +GO + +UPDATE [dbo].[employee] +SET [last_Name] = 'Jensen' +WHERE [id] = '653f11df-ee89-4e17-ac01-d6542f007ea1'; +GO