Skip to content

Commit

Permalink
Merge pull request #282 from Johnabell/support-created-at-override
Browse files Browse the repository at this point in the history
Add support for overriding created_at timestamps for copy transform workflows
  • Loading branch information
drteeth authored Nov 27, 2024
2 parents 2882ce6 + 705a8e2 commit 253ae6e
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 12 deletions.
4 changes: 4 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,17 @@ defmodule EventStore do
Supervisor.stop(supervisor, :normal, timeout)
end

@accepted_overrides_append_to_stream [:created_at_override]

def append_to_stream(stream_uuid, expected_version, events, opts \\ [])

def append_to_stream(@all_stream, _expected_version, _events, _opts),
do: {:error, :cannot_append_to_all_stream}

def append_to_stream(stream_uuid, expected_version, events, opts) do
overrides = Keyword.take(opts, @accepted_overrides_append_to_stream)
{conn, opts} = parse_opts(opts)
opts = Keyword.merge(opts, overrides)

Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ defmodule EventStore.Sql.Statements do
for {fun, args} <- [
{:count_streams, [:schema]},
{:create_stream, [:schema]},
{:insert_events, [:schema, :stream_id, :number_of_events]},
{:insert_events_any_version, [:schema, :stream_id, :number_of_events]},
{:insert_events, [:schema, :stream_id, :number_of_events, :created_at]},
{:insert_events_any_version, [:schema, :stream_id, :number_of_events, :created_at]},
{:insert_link_events, [:schema, :number_of_events]},
{:soft_delete_stream, [:schema]},
{:hard_delete_stream, [:schema]},
Expand Down
9 changes: 7 additions & 2 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ WITH
<% end %>
),
stream AS (
<%= if stream_id do %>
<%= cond do %>
<% stream_id -> %>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = $1::bigint
returning stream_id
<% else %>
<% created_at -> %>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at)
VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>)
returning stream_id
<% true -> %>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version)
VALUES ($1, $2::bigint)
returning stream_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
WITH
stream AS (
<%= if stream_id do %>
<%= cond do %>
<% stream_id -> %>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = $1::bigint
returning stream_id, stream_version - $2::bigint as initial_stream_version
<% else %>
<% created_at -> %>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at)
VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>)
returning stream_id, stream_version - $2::bigint as initial_stream_version
<% true -> %>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version)
VALUES ($1, $2::bigint)
returning stream_id, stream_version - $2::bigint as initial_stream_version
Expand Down
17 changes: 14 additions & 3 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,23 @@ defmodule EventStore.Storage.Appender do
defp insert_event_batch(conn, stream_id, stream_uuid, events, event_count, opts) do
{schema, opts} = Keyword.pop(opts, :schema)
{expected_version, opts} = Keyword.pop(opts, :expected_version)
{created_at, opts} = Keyword.pop(opts, :created_at_override)

statement =
case expected_version do
:any_version -> Statements.insert_events_any_version(schema, stream_id, event_count)
_expected_version -> Statements.insert_events(schema, stream_id, event_count)
:any_version ->
Statements.insert_events_any_version(schema, stream_id, event_count, created_at)

_expected_version ->
Statements.insert_events(schema, stream_id, event_count, created_at)
end

stream_id_or_uuid = stream_id || stream_uuid
params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)

params =
[stream_id_or_uuid, event_count]
|> Enum.concat(build_insert_parameters(events))
|> append_if(!stream_id, created_at)

case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} ->
Expand All @@ -123,6 +131,9 @@ defmodule EventStore.Storage.Appender do
end
end

defp append_if(params, true, value) when not is_nil(value), do: params ++ [value]
defp append_if(params, _, _), do: params

defp build_insert_parameters(events) do
events
|> Enum.with_index(1)
Expand Down
6 changes: 3 additions & 3 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,16 @@ defmodule EventStore.Streams.Stream do
serializer,
opts
) do
prepared_events = prepare_events(events, stream, serializer)
prepared_events = prepare_events(events, stream, serializer, opts)

write_to_stream(conn, prepared_events, stream, expected_version, opts)
end

defp prepare_events(events, %StreamInfo{} = stream, serializer) do
defp prepare_events(events, %StreamInfo{} = stream, serializer, opts) do
%StreamInfo{stream_uuid: stream_uuid, stream_version: stream_version} = stream

events
|> Enum.map(&map_to_recorded_event(&1, utc_now(), serializer))
|> Enum.map(&map_to_recorded_event(&1, opts[:created_at_override] || utc_now(), serializer))
|> Enum.with_index(1)
|> Enum.map(fn {recorded_event, index} ->
%RecordedEvent{
Expand Down
78 changes: 78 additions & 0 deletions test/event_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,84 @@ defmodule EventStore.EventStoreTest do
assert recorded_event.data.event == unicode_text
end

test "override created_at" do

Check failure on line 206 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at (EventStore.EventStoreTest)

Check failure on line 206 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at (EventStore.EventStoreTest)
created_at = DateTime.utc_now() |> DateTime.add(-1, :day)
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)

:ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at_override: created_at)

[recorded_event] = EventStore.stream_all_forward() |> Enum.to_list()
{:ok, stream_info} = EventStore.stream_info(stream_uuid)

assert recorded_event.created_at == created_at
assert stream_info.created_at == created_at
end

test "override created_at existing stream" do

Check failure on line 220 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at existing stream (EventStore.EventStoreTest)

Check failure on line 220 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at existing stream (EventStore.EventStoreTest)

Check failure on line 220 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at existing stream (EventStore.EventStoreTest)
created_at = DateTime.utc_now() |> DateTime.add(-1, :day)
created_at2 = DateTime.utc_now() |> DateTime.add(-1, :hour)
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)
events2 = EventFactory.create_events(1)

:ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at_override: created_at)

:ok =
EventStore.append_to_stream(stream_uuid, :any_version, events2,
created_at_override: created_at2
)

[event1, event2] = EventStore.stream_all_forward() |> Enum.to_list()
{:ok, stream_info} = EventStore.stream_info(stream_uuid)

assert event1.created_at == created_at
assert stream_info.created_at == created_at
assert event2.created_at == created_at2
end

test "override created_at any_version" do

Check failure on line 242 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at any_version (EventStore.EventStoreTest)

Check failure on line 242 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at any_version (EventStore.EventStoreTest)
created_at = DateTime.utc_now() |> DateTime.add(-1, :day)
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)

:ok =
EventStore.append_to_stream(stream_uuid, :any_version, events,
created_at_override: created_at
)

[recorded_event] = EventStore.stream_all_forward() |> Enum.to_list()
{:ok, stream_info} = EventStore.stream_info(stream_uuid)

assert recorded_event.created_at == created_at
assert stream_info.created_at == created_at
end

test "override created_at any_version existing stream" do

Check failure on line 259 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at any_version existing stream (EventStore.EventStoreTest)

Check failure on line 259 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at any_version existing stream (EventStore.EventStoreTest)

Check failure on line 259 in test/event_store_test.exs

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

test override created_at any_version existing stream (EventStore.EventStoreTest)
created_at = DateTime.utc_now() |> DateTime.add(-1, :day)
created_at2 = DateTime.utc_now() |> DateTime.add(-1, :hour)
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)
events2 = EventFactory.create_events(1)

:ok =
EventStore.append_to_stream(stream_uuid, :any_version, events,
created_at_override: created_at
)

:ok =
EventStore.append_to_stream(stream_uuid, :any_version, events2,
created_at_override: created_at2
)

[event1, event2] = EventStore.stream_all_forward() |> Enum.to_list()
{:ok, stream_info} = EventStore.stream_info(stream_uuid)

assert event1.created_at == created_at
assert stream_info.created_at == created_at
assert event2.created_at == created_at2
end

describe "transient subscription" do
test "should notify subscribers after event persisted to stream" do
stream_uuid = UUID.uuid4()
Expand Down

0 comments on commit 253ae6e

Please sign in to comment.