Skip to content

Commit

Permalink
feat: add TestResultsFlow for measuring time to notification (#439)
Browse files Browse the repository at this point in the history
Signed-off-by: joseph-sentry <[email protected]>
  • Loading branch information
joseph-sentry authored Jun 19, 2024
1 parent 708f3db commit 0705c36
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
20 changes: 20 additions & 0 deletions helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,23 @@ class UploadFlow(BaseFlow):
NOTIF_TOO_MANY_RETRIES = auto()
NOTIF_STALE_HEAD = auto()
NOTIF_ERROR_NO_REPORT = auto()


@failure_events("TEST_RESULTS_ERROR")
@success_events("TEST_RESULTS_BEGIN")
@subflows(
("test_results_notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"),
("flake_notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"),
(
"test_results_processing_time",
"TEST_RESULTS_BEGIN",
"TEST_RESULTS_FINISHER_BEGIN",
),
)
@reliability_counters
class TestResultsFlow(BaseFlow):
TEST_RESULTS_BEGIN = auto()
TEST_RESULTS_NOTIFY = auto()
FLAKE_DETECTION_NOTIFY = auto()
TEST_RESULTS_ERROR = auto()
TEST_RESULTS_FINISHER_BEGIN = auto()
12 changes: 11 additions & 1 deletion tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from app import celery_app
from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError
from database.models import Commit, TestResultReportTotals
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow
from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths
from rollouts import FLAKY_TEST_DETECTION
from services.failure_normalizer import FailureNormalizer
Expand Down Expand Up @@ -135,6 +137,10 @@ def process_impl_within_lock(
),
)

checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs)

checkpoints.log(TestResultsFlow.TEST_RESULTS_FINISHER_BEGIN)

commit: Commit = (
db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first()
)
Expand Down Expand Up @@ -270,6 +276,7 @@ def process_impl_within_lock(
)

with metrics.timing("test_results.finisher.notification"):
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)

log.info(
Expand Down Expand Up @@ -301,7 +308,7 @@ def process_impl_within_lock(
)
with metrics.timing("test_results.finisher.run_flaky_test_detection"):
success, reason = self.run_flaky_test_detection(
db_session, repoid, notifier, payload
db_session, repoid, notifier, payload, checkpoints=checkpoints
)

metrics.incr(
Expand All @@ -321,6 +328,7 @@ def run_flaky_test_detection(
repoid,
notifier: TestResultsNotifier,
payload: TestResultsNotificationPayload,
checkpoints=None,
):
ignore_predefined = read_yaml_field(
"test_analytics", "ignore_predefined", _else=False
Expand Down Expand Up @@ -375,6 +383,8 @@ def run_flaky_test_detection(
)
db_session.flush()

if checkpoints:
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)
log.info(
"Added flaky test information to the PR comment",
Expand Down
1 change: 1 addition & 0 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def test_upload_task_call_test_results(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={"codecov": {"max_report_age": "1y ago"}},
checkpoints_TestResultsFlow=None,
)
)

Expand Down
29 changes: 17 additions & 12 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.parallel_upload_processing import get_parallel_session_ids
Expand Down Expand Up @@ -258,6 +258,10 @@ def run_impl(
checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log(
UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True
)
elif report_type == ReportType.TEST_RESULTS.value:
checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log(
TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True
)

repoid = int(repoid)
log.info(
Expand Down Expand Up @@ -628,7 +632,7 @@ def schedule_task(
)
elif commit_report.report_type == ReportType.TEST_RESULTS.value:
res = self._schedule_test_results_processing_task(
commit, commit_yaml, argument_list, commit_report
commit, commit_yaml, argument_list, commit_report, checkpoints
)

if res:
Expand Down Expand Up @@ -850,11 +854,7 @@ def _schedule_bundle_analysis_processing_task(
return res

def _schedule_test_results_processing_task(
self,
commit,
commit_yaml,
argument_list,
commit_report,
self, commit, commit_yaml, argument_list, commit_report, checkpoints=None
):
processor_task_group = []
for i in range(0, len(argument_list), CHUNK_SIZE):
Expand All @@ -872,15 +872,20 @@ def _schedule_test_results_processing_task(
)
processor_task_group.append(sig)
if processor_task_group:
checkpoint_data = None
if checkpoints:
checkpoint_data = checkpoints.data
kwargs = {
"repoid": commit.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml,
_kwargs_key(TestResultsFlow): checkpoint_data,
}
res = chord(
processor_task_group,
test_results_finisher_task.signature(
args=(),
kwargs=dict(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml=commit_yaml,
),
kwargs=kwargs,
),
).apply_async()

Expand Down

0 comments on commit 0705c36

Please sign in to comment.