Skip to content

Commit

Permalink
updates example with reading changes (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Dec 13, 2021
1 parent 4079f5d commit 4fdb8b6
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 16 deletions.
4 changes: 4 additions & 0 deletions examples/Example/Example.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
<ProjectReference Include="..\..\src\MsSqlCdc\MsSqlCdc.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
Expand Down
107 changes: 91 additions & 16 deletions examples/Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using MsSqlCdc;
Expand All @@ -9,23 +13,68 @@ namespace Example;

public class Program
{
public static async Task Main(string[] args)
public static void Main(string[] args)
{
Console.WriteLine("Starting to listen");
var connectionString = CreateConnectionString();
var pollingIntervalMs = 100;
var tables = new List<string> { "dbo_Employee" };
var cdcCancellation = new CancellationTokenSource();
var cdcCancellationToken = cdcCancellation.Token;

SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
builder.DataSource = "localhost";
builder.UserID = "sa";
builder.Password = "myAwesomePassword1";
builder.InitialCatalog = "TestDb";
builder.Encrypt = false;
var myChannel = Channel.CreateUnbounded<IReadOnlyCollection<ChangeData<dynamic>>>();
_ = Task.Factory.StartNew(async () =>
{
long lowBoundLsn = await GetStartLsn(connectionString);
while (true)
{
if (cdcCancellationToken.IsCancellationRequested)
{
// We mark the channel as completed to notify that all consumers should
// read the last elements and stop.
myChannel.Writer.Complete();
break;
}
var connectionString = builder.ConnectionString;
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync();
var highBoundLsn = await Cdc.GetMaxLsn(connection);
if (lowBoundLsn <= highBoundLsn)
{
Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}");
var minLsn = await Cdc.GetMinLsn(connection, "dbo_Employee");
var maxLsn = await Cdc.GetMaxLsn(connection);
var changes = new List<ChangeData<dynamic>>();
foreach (var table in tables)
{
var changeSets = await Cdc.GetAllChanges(connection, table, lowBoundLsn, highBoundLsn);
changes.AddRange(changeSets);
}
var orderedChanges = changes.OrderBy(x => x.SequenceValue).ToList();
await myChannel.Writer.WriteAsync(orderedChanges);
lowBoundLsn = await Cdc.GetNextLsn(connection, highBoundLsn);
}
else
{
// No changes
Console.WriteLine($"No changes since last poll '{lowBoundLsn}'");
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
finally
{
await connection.CloseAsync();
await Task.Delay(pollingIntervalMs);
}
}
}, cdcCancellationToken);

var options = new JsonSerializerOptions
{
Expand All @@ -36,9 +85,35 @@ public static async Task Main(string[] args)
}
};

var changeSets = await Cdc.GetAllChanges(connection, "dbo_Employee", minLsn, maxLsn);
_ = Task.Factory.StartNew(async () =>
{
await foreach (var changes in myChannel.Reader.ReadAllAsync())
{
var changeDataJson = JsonSerializer.Serialize(changes, options);
Console.WriteLine(changeDataJson + "\n");
}
});

Console.ReadKey();
cdcCancellation.Cancel();
}

private static async Task<long> GetStartLsn(string connectionString)
{
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
var currentMaxLsn = await Cdc.GetMaxLsn(connection);
return await Cdc.GetNextLsn(connection, currentMaxLsn);
}

var result = JsonSerializer.Serialize(changeSets, options);
Console.WriteLine(result);
private static string CreateConnectionString()
{
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
builder.DataSource = "localhost";
builder.UserID = "sa";
builder.Password = "myAwesomePassword1";
builder.InitialCatalog = "TestDb";
builder.Encrypt = false;
return builder.ConnectionString;
}
}

0 comments on commit 4fdb8b6

Please sign in to comment.