diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index 49bcae15..3d444fca 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -57,20 +57,24 @@ WITH SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version + ), + linked_stream_events AS ( + INSERT INTO "<%= schema %>".stream_events + ( + event_id, + stream_id, + stream_version, + original_stream_id, + original_stream_version + ) + SELECT + new_events_indexes.event_id, + 0, + linked_stream.initial_stream_version + new_events_indexes.index, + stream.stream_id, + new_events_indexes.stream_version + FROM + new_events_indexes, linked_stream, stream ) -INSERT INTO "<%= schema %>".stream_events -( - event_id, - stream_id, - stream_version, - original_stream_id, - original_stream_version -) -SELECT - new_events_indexes.event_id, - 0, - linked_stream.initial_stream_version + new_events_indexes.index, - stream.stream_id, - new_events_indexes.stream_version -FROM - new_events_indexes, linked_stream, stream; + +SELECT stream_id from stream; diff --git a/lib/event_store/sql/statements/insert_events_any_version.sql.eex b/lib/event_store/sql/statements/insert_events_any_version.sql.eex index 7be918e9..cfc250a0 100644 --- a/lib/event_store/sql/statements/insert_events_any_version.sql.eex +++ b/lib/event_store/sql/statements/insert_events_any_version.sql.eex @@ -57,20 +57,24 @@ WITH SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version + ), + linked_stream_events AS ( + INSERT INTO "<%= schema %>".stream_events + ( + event_id, + stream_id, + stream_version, + original_stream_id, + original_stream_version + ) + SELECT + new_events_indexes.event_id, + 0, + linked_stream.initial_stream_version + new_events_indexes.index, + stream.stream_id, + stream.initial_stream_version + new_events_indexes.index + FROM + new_events_indexes, linked_stream, stream ) -INSERT INTO "<%= schema %>".stream_events -( - event_id, - stream_id, - stream_version, - original_stream_id, - original_stream_version -) -SELECT - new_events_indexes.event_id, - 0, - linked_stream.initial_stream_version + new_events_indexes.index, - stream.stream_id, - stream.initial_stream_version + new_events_indexes.index -FROM - new_events_indexes, linked_stream, stream; + +SELECT stream_id from stream; diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index ffd6b5c2..c88c740b 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -20,17 +20,19 @@ defmodule EventStore.Storage.Appender do events |> Stream.map(&encode_uuids/1) |> Stream.chunk_every(1_000) - |> Enum.each(fn batch -> + |> Enum.reduce(stream_id, fn batch, stream_id -> event_count = length(batch) - with :ok <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do + with {:ok, new_stream_id} <- + insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do Logger.debug("Appended #{event_count} event(s) to stream #{inspect(stream_uuid)}") - - :ok + new_stream_id else {:error, error} -> throw({:error, error}) end end) + + :ok catch {:error, error} = reply -> Logger.warning( @@ -110,9 +112,14 @@ defmodule EventStore.Storage.Appender do params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events) case Postgrex.query(conn, statement, params, opts) do - {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} - {:ok, %Postgrex.Result{}} -> :ok - {:error, error} -> handle_error(error) + {:ok, %Postgrex.Result{num_rows: 0}} -> + {:error, :not_found} + + {:ok, %Postgrex.Result{rows: [[stream_id]]}} -> + {:ok, stream_id} + + {:error, error} -> + handle_error(error) end end