Skip to content

Commit

Permalink
Improved Postgres Checkpoint store, to make it fully idempotent
Browse files Browse the repository at this point in the history
Added tests for that
  • Loading branch information
oskardudycz committed May 19, 2024
1 parent 7f452fa commit 345e841
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 26 deletions.
3 changes: 3 additions & 0 deletions Core.EventStoreDB.Tests/Core.EventStoreDB.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<!-- For PostgreSQL checkpointing tests -->
<PackageReference Include="Testcontainers.PostgreSql" Version="3.8.0" />
<PackageReference Include="Npgsql" Version="8.0.3" />
</ItemGroup>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Npgsql;
using Testcontainers.PostgreSql;
using Xunit;

namespace Core.EventStoreDB.Tests.Subscriptions.Checkpoints;

public class PostgresContainerFixture: IAsyncLifetime
{
private readonly PostgreSqlContainer container = new PostgreSqlBuilder()
.WithReuse(true)
.Build();

public NpgsqlDataSource DataSource { get; private set; } = default!;

public async Task InitializeAsync()
{
await container.StartAsync();
DataSource = new NpgsqlDataSourceBuilder(container.GetConnectionString()).Build();
}

public async Task DisposeAsync()
{
await DataSource.DisposeAsync();
await container.StopAsync();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using Core.EventStoreDB.Subscriptions.Checkpoints;
using Npgsql;
using Xunit;

namespace Core.EventStoreDB.Tests.Subscriptions.Checkpoints;
using static ISubscriptionCheckpointRepository;

public class PostgresSubscriptionCheckpointRepositoryTests(PostgresContainerFixture fixture)
: IClassFixture<PostgresContainerFixture>
{
private readonly string subscriptionId = Guid.NewGuid().ToString("N");

private readonly Func<CancellationToken, ValueTask<NpgsqlConnection>> connectionFactory =
_ =>
{
var connection = fixture.DataSource.CreateConnection();
connection.Open();
return ValueTask.FromResult(connection);
};

[Fact]
public async Task Store_InitialInsert_Success()
{
var checkpointTableCreator = new PostgresSubscriptionCheckpointSetup(fixture.DataSource);
var repository = new PostgresSubscriptionCheckpointRepository(connectionFactory, checkpointTableCreator);

var result = await repository.Store(subscriptionId, 1, Checkpoint.None, CancellationToken.None);

Assert.IsType<StoreResult.Success>(result);
}

[Fact]
public async Task Store_UpdatePosition_Success()
{
var checkpointTableCreator = new PostgresSubscriptionCheckpointSetup(fixture.DataSource);
var repository = new PostgresSubscriptionCheckpointRepository(connectionFactory, checkpointTableCreator);

await repository.Store(subscriptionId, 1, Checkpoint.None, CancellationToken.None);
var result = await repository.Store(subscriptionId, 2, Checkpoint.From(1), CancellationToken.None);

Assert.IsType<StoreResult.Success>(result);
}

[Fact]
public async Task Store_IdempotentCheck_ReturnsZero()
{
var checkpointTableCreator = new PostgresSubscriptionCheckpointSetup(fixture.DataSource);
var repository = new PostgresSubscriptionCheckpointRepository(connectionFactory, checkpointTableCreator);

await repository.Store(subscriptionId, 1, Checkpoint.None, CancellationToken.None);
await repository.Store(subscriptionId, 2, Checkpoint.From(1), CancellationToken.None);
var result = await repository.Store(subscriptionId, 2, Checkpoint.From(1), CancellationToken.None);

Assert.IsType<StoreResult.Ignored>(result);
}

[Fact]
public async Task Store_InvalidUpdate_Failure()
{
var checkpointTableCreator = new PostgresSubscriptionCheckpointSetup(fixture.DataSource);
var repository = new PostgresSubscriptionCheckpointRepository(connectionFactory, checkpointTableCreator);

await repository.Store(subscriptionId, 1, Checkpoint.None, CancellationToken.None);
await repository.Store(subscriptionId, 2, Checkpoint.From(1), CancellationToken.None);
var result = await repository.Store(subscriptionId, 1, Checkpoint.From(3), CancellationToken.None);

Assert.IsType<StoreResult.Mismatch>(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface ISubscriptionCheckpointRepository
public record StoreResult
{
public record Success(Checkpoint Checkpoint): StoreResult;

public record Ignored: StoreResult;
public record Mismatch: StoreResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ public class PostgresSubscriptionCheckpointRepository(
// to update projection in the same transaction in the same transaction as checkpointing
// to help handling idempotency
Func<CancellationToken, ValueTask<NpgsqlConnection>> connectionFactory,
PostgresSubscriptionCheckpointTableCreator checkpointTableCreator
PostgresSubscriptionCheckpointSetup checkpointSetup
): ISubscriptionCheckpointRepository
{
public async ValueTask<Checkpoint> Load(string subscriptionId, CancellationToken ct)
{
var connection = await connectionFactory(ct).ConfigureAwait(false);
await checkpointTableCreator.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);
await checkpointSetup.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);

await using var command = new NpgsqlCommand(SelectCheckpointSql, connection);
command.Parameters.AddWithValue(subscriptionId);
Expand All @@ -32,24 +32,35 @@ public async ValueTask<Checkpoint> Load(string subscriptionId, CancellationToken
public async ValueTask<StoreResult> Store(
string subscriptionId,
ulong position,
Checkpoint previousCheckpoint, CancellationToken ct)
Checkpoint previousCheckpoint,
CancellationToken ct
)
{
var connection = await connectionFactory(ct).ConfigureAwait(false);
await checkpointTableCreator.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);
await checkpointSetup.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);

await using var command = new NpgsqlCommand(StoreCheckpointSql, connection);
await using var command = new NpgsqlCommand("SELECT store_subscription_checkpoint($1, $2, $3)", connection);
command.Parameters.AddWithValue(subscriptionId);
command.Parameters.AddWithValue(position);
command.Parameters.AddWithValue((long)position);
command.Parameters.AddWithValue(previousCheckpoint != Checkpoint.None
? (long)previousCheckpoint.Position
: DBNull.Value);

return await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false) == 1
? new StoreResult.Success(Checkpoint.From(position))
: new StoreResult.Mismatch();
var result = await command.ExecuteScalarAsync(ct).ConfigureAwait(false);

return result switch
{
1 => new StoreResult.Success(Checkpoint.From(position)),
0 => new StoreResult.Ignored(),
2 => new StoreResult.Mismatch(),
_ => new StoreResult.Mismatch(),
};
}

public async ValueTask<Checkpoint> Reset(string subscriptionId, CancellationToken ct)
{
var connection = await connectionFactory(ct).ConfigureAwait(false);
await checkpointTableCreator.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);
await checkpointSetup.EnsureCheckpointsTableExist(ct).ConfigureAwait(false);

await using var command = new NpgsqlCommand(ResetCheckpointSql, connection);
command.Parameters.AddWithValue(subscriptionId);
Expand Down Expand Up @@ -82,10 +93,10 @@ public async ValueTask<Checkpoint> Reset(string subscriptionId, CancellationToke
""";
}

public abstract class PostgresSubscriptionCheckpointTableCreator(NpgsqlDataSource dataSource)
public class PostgresSubscriptionCheckpointSetup(NpgsqlDataSource dataSource)
{
private bool wasCreated;
private readonly SemaphoreSlim tableLock = new(0, 1);
private readonly SemaphoreSlim tableLock = new(1, 1);

public async ValueTask EnsureCheckpointsTableExist(CancellationToken ct)
{
Expand All @@ -99,7 +110,7 @@ public async ValueTask EnsureCheckpointsTableExist(CancellationToken ct)

try
{
await using var cmd = dataSource.CreateCommand(CreateCheckpointsTableSql);
await using var cmd = dataSource.CreateCommand(SetupSql);
await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
wasCreated = false;
}
Expand All @@ -109,11 +120,71 @@ public async ValueTask EnsureCheckpointsTableExist(CancellationToken ct)
}
}

private const string SetupSql = $"{CreateCheckpointsTableSql}\n{CreateStoreCheckpointsProcedureSql}";

private const string CreateCheckpointsTableSql =
"""
CREATE TABLE "subscription_checkpoints" (
CREATE TABLE IF NOT EXISTS "subscription_checkpoints" (
"id" VARCHAR(100) PRIMARY KEY,
"position" BIGINT NULL
"position" BIGINT NULL,
"revision" BIGINT
);
""";

private const string CreateStoreCheckpointsProcedureSql =
"""
CREATE OR REPLACE FUNCTION store_subscription_checkpoint(
p_id VARCHAR(100),
p_position BIGINT,
check_position BIGINT DEFAULT NULL
) RETURNS INT AS $$
DECLARE
current_position BIGINT;
BEGIN
-- Handle the case when check_position is provided
IF check_position IS NOT NULL THEN
-- Try to update if the position matches check_position
UPDATE "subscription_checkpoints"
SET "position" = p_position
WHERE "id" = p_id AND "position" = check_position;

IF FOUND THEN
RETURN 1; -- Successfully updated
END IF;

-- Retrieve the current position
SELECT "position" INTO current_position
FROM "subscription_checkpoints"
WHERE "id" = p_id;

-- Return appropriate codes based on current position
IF current_position = p_position THEN
RETURN 0; -- Idempotent check: position already set
ELSIF current_position > check_position THEN
RETURN 2; -- Failure: current position is greater
ELSE
RETURN 2; -- Default failure case for mismatched positions
END IF;
END IF;

-- Handle the case when check_position is NULL: Insert if not exists
BEGIN
INSERT INTO "subscription_checkpoints"("id", "position")
VALUES (p_id, p_position);
RETURN 1; -- Successfully inserted
EXCEPTION WHEN unique_violation THEN
-- If insertion failed, it means the row already exists
SELECT "position" INTO current_position
FROM "subscription_checkpoints"
WHERE "id" = p_id;

IF current_position = p_position THEN
RETURN 0; -- Idempotent check: position already set
ELSE
RETURN 2; -- Insertion failed, row already exists with different position
END IF;
END;
END;
$$ LANGUAGE plpgsql;
""";
}
13 changes: 3 additions & 10 deletions EventSourcing.NetCore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Core.Tests", "Core.Tests\Core.Tests.csproj", "{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventSourcing.Integration.Tests", "EventSourcing.Integration.Tests\EventSourcing.Integration.Tests.csproj", "{215691EC-51FB-4BD2-A46C-396094727054}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker", "docker", "{821D8306-C110-45AB-BDF7-AA80FDA942CE}"
ProjectSection(SolutionItems) = preProject
docker-compose.yml = docker-compose.yml
Expand Down Expand Up @@ -498,10 +496,6 @@ Global
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61}.Release|Any CPU.Build.0 = Release|Any CPU
{215691EC-51FB-4BD2-A46C-396094727054}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{215691EC-51FB-4BD2-A46C-396094727054}.Debug|Any CPU.Build.0 = Debug|Any CPU
{215691EC-51FB-4BD2-A46C-396094727054}.Release|Any CPU.ActiveCfg = Release|Any CPU
{215691EC-51FB-4BD2-A46C-396094727054}.Release|Any CPU.Build.0 = Release|Any CPU
{1ED20C13-1748-4D53-8D62-C66405F1FCE0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1ED20C13-1748-4D53-8D62-C66405F1FCE0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1ED20C13-1748-4D53-8D62-C66405F1FCE0}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1107,8 +1101,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61} = {107B18E3-65C8-4A6C-976D-7EEED6A6D318}
{215691EC-51FB-4BD2-A46C-396094727054} = {107B18E3-65C8-4A6C-976D-7EEED6A6D318}
{F755E316-140C-4A73-9586-06860521C1B6} = {A8E25331-55E9-4D1A-87A4-136EC4D2A4B5}
{3C628C0A-71AD-4C41-9977-A33F89C0751B} = {A7186B6B-D56D-4AEF-B6B7-FAA827764C34}
{1ED20C13-1748-4D53-8D62-C66405F1FCE0} = {3C628C0A-71AD-4C41-9977-A33F89C0751B}
Expand Down Expand Up @@ -1194,8 +1186,6 @@ Global
{0C3B70E2-26A6-4707-A537-74D649EDC2B8} = {A7186B6B-D56D-4AEF-B6B7-FAA827764C34}
{7B7E8DF0-8D21-4BC0-8A61-966A34E77DD8} = {0C3B70E2-26A6-4707-A537-74D649EDC2B8}
{B8C099AE-3585-4228-9C79-81BB340A5215} = {0C3B70E2-26A6-4707-A537-74D649EDC2B8}
{1738AF23-FD36-4457-B9F9-2593207CDEB5} = {107B18E3-65C8-4A6C-976D-7EEED6A6D318}
{C6383AC1-2D24-4E7A-810F-642920C857EA} = {107B18E3-65C8-4A6C-976D-7EEED6A6D318}
{25D0A470-4A59-49F3-9148-942F88E401B1} = {58104871-E205-45F1-9E24-BED4967E602B}
{14C7B928-9D6C-441A-8A1F-0C49173E73EB} = {CEEE5F74-121E-437F-B3B4-4E7C65482644}
{65F6E2BE-B2D4-4E56-B0CB-3062C4882B9E} = {14C7B928-9D6C-441A-8A1F-0C49173E73EB}
Expand Down Expand Up @@ -1306,6 +1296,9 @@ Global
{1C310D3A-BFC5-4984-BB11-4579067CFFB5} = {006643C6-E0B6-48E6-ABC6-9BE3DCB293D8}
{C3D0553F-5786-4417-97FD-B65440620274} = {006643C6-E0B6-48E6-ABC6-9BE3DCB293D8}
{16B7A55B-E2E6-4CE2-896A-EF0F02259482} = {006643C6-E0B6-48E6-ABC6-9BE3DCB293D8}
{1738AF23-FD36-4457-B9F9-2593207CDEB5} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
{C6383AC1-2D24-4E7A-810F-642920C857EA} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A5F55604-2FF3-43B7-B657-4F18E6E95D3B}
Expand Down

0 comments on commit 345e841

Please sign in to comment.