From d1005aa61ade346309d916283539a05b9ace2b90 Mon Sep 17 00:00:00 2001 From: Klaus Wuestefeld Date: Tue, 21 May 2024 16:03:49 -0300 Subject: [PATCH] WIP --- src/prevayler_clj_aws/core.clj | 73 +++++++++++++++------------- test/prevayler_clj_aws/core_test.clj | 4 +- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/src/prevayler_clj_aws/core.clj b/src/prevayler_clj_aws/core.clj index 256ae95..cf2dc34 100644 --- a/src/prevayler_clj_aws/core.clj +++ b/src/prevayler_clj_aws/core.clj @@ -110,41 +110,48 @@ snapshot-path "snapshot" page-size 1000}} aws-opts _ (println "Reading snapshot bucket...") - {state :state previous-snapshot-index :partkey} (read-snapshot s3-client s3-bucket snapshot-path) + {state :state snapshot-index :partkey} (read-snapshot s3-client s3-bucket snapshot-path) _ (println "Reading snapshot bucket done.") state-atom (atom (or state initial-state)) - snapshot-index-atom (atom (inc previous-snapshot-index)) - order-atom (atom 0)] - + snapshot-index-atom (atom snapshot-index)] + (println "Restoring events...") - (restore-events! dynamodb-client business-fn state-atom dynamodb-table previous-snapshot-index page-size) + (restore-events! dynamodb-client business-fn state-atom dynamodb-table @snapshot-index-atom page-size) (println "Restoring events done.") - ; since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state - ; and restore events from the previous partkey - (save-snapshot! s3-client s3-bucket snapshot-path {:state @state-atom :partkey @snapshot-index-atom}) - (println "Saving snapshot done.") - - (reify - Prevayler - (handle! [this event] - (locking this ; (I)solation: strict serializability. - (let [current-state @state-atom - timestamp (timestamp-fn) - new-state (business-fn current-state event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.) - (when-not (identical? new-state current-state) - (write-event! dynamodb-client dynamodb-table @snapshot-index-atom - (swap! order-atom inc) ; Skips an order number if there is an exception, but that's OK. - [timestamp event (hash new-state)]) ; (D)urability - (reset! state-atom new-state)) ; (A)tomicity - new-state))) - - #_(snapshot! [this] - (locking this - (start-new-journal! journal-file data-out-atom @state-atom backup))) - - (timestamp [_] (timestamp-fn)) - - IDeref (deref [_] @state-atom) - - Closeable (close [_] (aws/stop dynamodb-client))))) + (let [order-atom (atom 0) + snapshot-fn! (fn [] + (println "Saving snapshot to bucket...") + ; Since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state + ; and restore events from the previous partkey + (save-snapshot! s3-client s3-bucket snapshot-path {:state @state-atom + :partkey (inc @snapshot-index-atom)}) + (println "Snapshot done.") + (swap! snapshot-index-atom inc) + (reset! order-atom 0))] + + (snapshot-fn!) + + (reify + Prevayler + (handle! [this event] + (locking this ; (I)solation: strict serializability. + (let [current-state @state-atom + timestamp (timestamp-fn) + new-state (business-fn current-state event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.) + (when-not (identical? new-state current-state) + (write-event! dynamodb-client dynamodb-table @snapshot-index-atom + (swap! order-atom inc) ; Skips an order number if there is an exception, but that's OK. + [timestamp event (hash new-state)]) ; (D)urability + (reset! state-atom new-state)) ; (A)tomicity + new-state))) + + (snapshot! [this] + (locking this + (snapshot-fn!))) + + (timestamp [_] (timestamp-fn)) + + IDeref (deref [_] @state-atom) + + Closeable (close [_] (aws/stop dynamodb-client)))))) diff --git a/test/prevayler_clj_aws/core_test.clj b/test/prevayler_clj_aws/core_test.clj index cf8346d..20ed38d 100644 --- a/test/prevayler_clj_aws/core_test.clj +++ b/test/prevayler_clj_aws/core_test.clj @@ -27,8 +27,8 @@ dynamodb-table (gen-name) hostname (or (System/getenv "LOCALSTACK_HOST") "localhost") endpoint-override {:protocol "http" :hostname hostname :port (localstack-port)} - s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override #_#_:region "us-east-1"}) - dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override #_#_:region "us-east-1"})] + s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override}) + dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override})] (util/aws-invoke s3-cli {:op :CreateBucket :request {:Bucket s3-bucket}}) (util/aws-invoke dynamodb-cli {:op :CreateTable :request {:TableName dynamodb-table :AttributeDefinitions [{:AttributeName "partkey"