From 12969517f2868e0e91d98af00d52611e68d1f3fe Mon Sep 17 00:00:00 2001
From: joseph-sentry <joseph.sawaya@sentry.io>
Date: Fri, 20 Dec 2024 12:42:12 -0500
Subject: [PATCH] feat: introduce v2_{processed,finished} states

i want to introduce the new finished state in the test results upload
pipeline

to do so safely i'm adding the new v2_processed and v2_finished states

the reason for this is that the meaning of processed and v2_processed
are different

processed means that test instances are in the db and persisted
but in the new pipeline v2_finished has that meaning and v2_processed
just means that the intermediate results are in redis for now
---
 services/processing/flake_processing.py    | 3 ++-
 tasks/ta_finisher.py                       | 8 ++------
 tasks/ta_processor.py                      | 7 ++++---
 tasks/tests/unit/test_process_flakes.py    | 8 +++++---
 tasks/tests/unit/test_ta_processor_task.py | 8 ++++----
 5 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/services/processing/flake_processing.py b/services/processing/flake_processing.py
index 88765b8ca..7d231b744 100644
--- a/services/processing/flake_processing.py
+++ b/services/processing/flake_processing.py
@@ -22,8 +22,9 @@ def process_flake_for_repo_commit(
 ):
     uploads = ReportSession.objects.filter(
         report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
+        report__commit__repository__repoid=repo_id,
         report__commit__commitid=commit_id,
-        state="processed",
+        state__in=["processed", "v2_finished"],
     ).all()
 
     curr_flakes = fetch_curr_flakes(repo_id)
diff --git a/tasks/ta_finisher.py b/tasks/ta_finisher.py
index 76f17c3dc..2380d3a58 100644
--- a/tasks/ta_finisher.py
+++ b/tasks/ta_finisher.py
@@ -232,17 +232,13 @@ def process_impl_within_lock(
             .filter(
                 CommitReport.commit_id == commit.id,
                 CommitReport.report_type == ReportType.TEST_RESULTS.value,
+                Upload.state == "v2_processed",
             )
             .all()
         )
 
         redis_client = get_redis_connection()
 
-        tests_to_write: dict[str, dict[str, Any]] = {}
-        test_instances_to_write: list[dict[str, Any]] = []
-        daily_totals: dict[str, DailyTotals] = dict()
-        test_flag_bridge_data: list[dict] = []
-
         repo_flakes: list[Flake] = (
             db_session.query(Flake.testid)
             .filter(Flake.repoid == repoid, Flake.end_date.is_(None))
@@ -364,7 +360,7 @@ def process_impl_within_lock(
                     extra=dict(upload_id=upload.id),
                 )
 
-            upload.state = "finished"
+            upload.state = "v2_finished"
             db_session.commit()
 
             redis_client.delete(intermediate_key)
diff --git a/tasks/ta_processor.py b/tasks/ta_processor.py
index bd3c9d560..8c15ba4f6 100644
--- a/tasks/ta_processor.py
+++ b/tasks/ta_processor.py
@@ -84,7 +84,7 @@ def process_individual_upload(
         upload_id = upload.id
 
         log.info("Processing individual upload", extra=dict(upload_id=upload_id))
-        if upload.state == "processed" or upload.state == "has_failed":
+        if upload.state == "v2_processed" or upload.state == "v2_failed":
             return False
 
         payload_bytes = archive_service.read_file(upload.storage_path)
@@ -102,16 +102,17 @@ def process_individual_upload(
                 ),
             )
             sentry_sdk.capture_exception(exc, tags={"upload_state": upload.state})
-            upload.state = "has_failed"
+            upload.state = "v2_failed"
             db_session.commit()
             return False
         else:
             redis_client.set(
                 f"ta/intermediate/{repository.repoid}/{commitid}/{upload_id}",
                 bytes(msgpacked),
+                ex=60 * 60,
             )
 
-            upload.state = "processed"
+            upload.state = "v2_processed"
             db_session.commit()
 
             log.info(
diff --git a/tasks/tests/unit/test_process_flakes.py b/tasks/tests/unit/test_process_flakes.py
index b77bb2e57..4b30d42ef 100644
--- a/tasks/tests/unit/test_process_flakes.py
+++ b/tasks/tests/unit/test_process_flakes.py
@@ -311,8 +311,10 @@ def test_it_handles_only_passes(transactional_db):
 def test_it_creates_flakes_from_processed_uploads(transactional_db):
     rs = RepoSimulator()
     c1 = rs.create_commit()
-    rs.add_test_instance(c1)
-    rs.add_test_instance(c1, outcome=TestInstance.Outcome.FAILURE.value)
+    rs.add_test_instance(c1, state="v2_finished")
+    rs.add_test_instance(
+        c1, outcome=TestInstance.Outcome.FAILURE.value, state="processed"
+    )
 
     rs.run_task(rs.repo.repoid, c1.commitid)
 
@@ -329,7 +331,7 @@ def test_it_creates_flakes_from_processed_uploads(transactional_db):
 def test_it_does_not_create_flakes_from_flake_processed_uploads(transactional_db):
     rs = RepoSimulator()
     c1 = rs.create_commit()
-    rs.add_test_instance(c1)
+    rs.add_test_instance(c1, state="v2_processed")
     rs.add_test_instance(
         c1, outcome=TestInstance.Outcome.FAILURE.value, state="flake_processed"
     )
diff --git a/tasks/tests/unit/test_ta_processor_task.py b/tasks/tests/unit/test_ta_processor_task.py
index 4c8df6fea..a46b24954 100644
--- a/tasks/tests/unit/test_ta_processor_task.py
+++ b/tasks/tests/unit/test_ta_processor_task.py
@@ -65,7 +65,7 @@ def test_ta_processor_task_call(
         )
 
         assert result is True
-        assert upload.state == "processed"
+        assert upload.state == "v2_processed"
 
         redis = get_redis_connection()
         a = redis.get(
@@ -195,7 +195,7 @@ def test_ta_processor_task_error_parsing_file(
         )
 
         assert result is False
-        assert upload.state == "has_failed"
+        assert upload.state == "v2_failed"
 
     @pytest.mark.integration
     def test_ta_processor_task_delete_archive(
@@ -325,7 +325,7 @@ def test_ta_processor_task_call_already_processed(
         with open(here.parent.parent / "samples" / "sample_test.json") as f:
             content = f.read()
             mock_storage.write_file("archive", url, content)
-        upload = UploadFactory.create(storage_path=url, state="processed")
+        upload = UploadFactory.create(storage_path=url, state="v2_processed")
         dbsession.add(upload)
         dbsession.flush()
         argument = {"url": url, "upload_id": upload.id_}
@@ -415,4 +415,4 @@ def test_ta_processor_task_call_already_processed_with_junit(
         )
 
         assert result is True
-        assert upload.state == "processed"
+        assert upload.state == "v2_processed"