Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add FlareCleanupTask #947

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from shared.celery_config import (
BaseCeleryConfig,
brolly_stats_rollup_task_name,
# flare_cleanup_task_name,
gh_app_webhook_check_task_name,
health_check_task_name,
profiling_finding_task_name,
Expand Down Expand Up @@ -88,12 +89,18 @@ def _beat_schedule():
},
"trial_expiration_cron": {
"task": trial_expiration_cron_task_name,
# 4 UTC is 12am EDT
"schedule": crontab(minute="0", hour="4"),
"schedule": crontab(minute="0", hour="4"), # 4 UTC is 12am EDT
"kwargs": {
"cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format)
},
},
# "flare_cleanup": {
# "task": flare_cleanup_task_name,
# "schedule": crontab(minute="0", hour="5"), # every day, 5am UTC (10pm PDT)
# "kwargs": {
# "cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format)
# },
# },
}

if get_config("setup", "find_uncollected_profilings", "enabled", default=True):
Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
https://github.com/codecov/test-results-parser/archive/c840502d1b4dd7d05b2efc2c1328affaf2acd27c.tar.gz#egg=test-results-parser
https://github.com/codecov/shared/archive/2674ae99811767e63151590906691aed4c5ce1f9.tar.gz#egg=shared
https://github.com/codecov/shared/archive/741151b2ebf8a71485ddb5a4b3f60bdc5207f7b4.tar.gz#egg=shared
https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring
asgiref>=3.7.2
analytics-python==1.3.0b1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ sentry-sdk==2.13.0
# shared
setuptools==75.6.0
# via nodeenv
shared @ https://github.com/codecov/shared/archive/2674ae99811767e63151590906691aed4c5ce1f9.tar.gz#egg=shared
shared @ https://github.com/codecov/shared/archive/741151b2ebf8a71485ddb5a4b3f60bdc5207f7b4.tar.gz#egg=shared
# via -r requirements.in
six==1.16.0
# via
Expand Down
98 changes: 98 additions & 0 deletions tasks/flare_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging

from shared.api_archive.archive import ArchiveService
from shared.celery_config import flare_cleanup_task_name
from shared.django_apps.core.models import Pull, PullStates

from app import celery_app
from tasks.crontasks import CodecovCronTask

log = logging.getLogger(__name__)


class FlareCleanupTask(CodecovCronTask, name=flare_cleanup_task_name):
"""
Flare is a field on a Pull object.
Flare is used to draw static graphs (see GraphHandler view in api) and can be large.
The majority of flare graphs are used in pr comments, so we keep the (maybe large) flare "available"
in either the db or Archive storage while the pull is OPEN.
If the pull is not OPEN, we dump the flare to save space.
If we need to generate a flare graph for a non-OPEN pull, we build_report_from_commit
and generate fresh flare from that report (see GraphHandler view in api).
"""

@classmethod
def get_min_seconds_interval_between_executions(cls):
return 72000 # 20h

def run_cron_task(self, db_session, batch_size=1000, limit=10000, *args, **kwargs):
# for any Pull that is not OPEN, clear the flare field(s), targeting older data
non_open_pulls = Pull.objects.exclude(state=PullStates.OPEN.value).order_by(
"updatestamp"
)

log.info("Starting FlareCleanupTask")

# clear in db
non_open_pulls_with_flare_in_db = non_open_pulls.filter(
_flare__isnull=False
).exclude(_flare={})

# Process in batches using an offset
total_updated = 0
offset = 0
while offset < limit:
batch = non_open_pulls_with_flare_in_db.values_list("id", flat=True)[
offset : offset + batch_size
]
if not batch:
break
n_updated = non_open_pulls_with_flare_in_db.filter(id__in=batch).update(
_flare=None
)
total_updated += n_updated
offset += batch_size

log.info(f"FlareCleanupTask cleared {total_updated} database flares")

# clear in Archive
non_open_pulls_with_flare_in_archive = non_open_pulls.filter(
_flare_storage_path__isnull=False
)

# Process archive deletions in batches using an offset
total_updated = 0
offset = 0
while offset < limit:
batch = non_open_pulls_with_flare_in_archive.values_list("id", flat=True)[
offset : offset + batch_size
]
if not batch:
break
flare_paths_from_batch = Pull.objects.filter(id__in=batch).values_list(
"_flare_storage_path", flat=True
)
try:
archive_service = ArchiveService(repository=None)
archive_service.delete_files(flare_paths_from_batch)
except Exception as e:
# if something fails with deleting from archive, leave the _flare_storage_path on the pull object.
# only delete _flare_storage_path if the deletion from archive was successful.
log.error(f"FlareCleanupTask failed to delete archive files: {e}")
continue

# Update the _flare_storage_path field for successfully processed pulls
n_updated = Pull.objects.filter(id__in=batch).update(
_flare_storage_path=None
)
total_updated += n_updated
offset += batch_size

log.info(f"FlareCleanupTask cleared {total_updated} Archive flares")

def manual_run(self, db_session=None, limit=1000, *args, **kwargs):
self.run_cron_task(db_session, limit=limit, *args, **kwargs)


RegisteredFlareCleanupTask = celery_app.register_task(FlareCleanupTask())
flare_cleanup_task = celery_app.tasks[RegisteredFlareCleanupTask.name]
125 changes: 125 additions & 0 deletions tasks/tests/unit/test_flare_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
from unittest.mock import call

from shared.django_apps.core.models import Pull, PullStates
from shared.django_apps.core.tests.factories import PullFactory, RepositoryFactory

from tasks.flare_cleanup import FlareCleanupTask


class TestFlareCleanupTask(object):
def test_get_min_seconds_interval_between_executions(self):
assert isinstance(
FlareCleanupTask.get_min_seconds_interval_between_executions(),
int,
)
assert FlareCleanupTask.get_min_seconds_interval_between_executions() > 17000

def test_successful_run(self, transactional_db, mocker):
mock_logs = mocker.patch("logging.Logger.info")
mock_archive_service = mocker.patch(
"shared.django_apps.utils.model_utils.ArchiveService"
)
Comment on lines +20 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of mocking the ArchiveService, you could use the mock_storage fixture (I believe that should be hooked up to the shared archive service).

That way, you can actually store files (in memory), and assert that those files are truely being deleted from the storage.

archive_value_for_flare = {"some": "data"}
mock_archive_service.return_value.read_file.return_value = json.dumps(
archive_value_for_flare
)
mock_path = "path/to/written/object"
mock_archive_service.return_value.write_json_data_to_storage.return_value = (
mock_path
)
mock_archive_service_in_task = mocker.patch(
"tasks.flare_cleanup.ArchiveService"
)
mock_archive_service_in_task.return_value.delete_file.return_value = None

local_value_for_flare = {"test": "test"}
open_pull_with_local_flare = PullFactory(
state=PullStates.OPEN.value,
_flare=local_value_for_flare,
repository=RepositoryFactory(),
)
assert open_pull_with_local_flare.flare == local_value_for_flare
assert open_pull_with_local_flare._flare == local_value_for_flare
assert open_pull_with_local_flare._flare_storage_path is None

closed_pull_with_local_flare = PullFactory(
state=PullStates.CLOSED.value,
_flare=local_value_for_flare,
repository=RepositoryFactory(),
)
assert closed_pull_with_local_flare.flare == local_value_for_flare
assert closed_pull_with_local_flare._flare == local_value_for_flare
assert closed_pull_with_local_flare._flare_storage_path is None

open_pull_with_archive_flare = PullFactory(
state=PullStates.OPEN.value,
_flare=None,
_flare_storage_path=mock_path,
repository=RepositoryFactory(),
)
assert open_pull_with_archive_flare.flare == archive_value_for_flare
assert open_pull_with_archive_flare._flare is None
assert open_pull_with_archive_flare._flare_storage_path == mock_path

merged_pull_with_archive_flare = PullFactory(
state=PullStates.MERGED.value,
_flare=None,
_flare_storage_path=mock_path,
repository=RepositoryFactory(),
)
assert merged_pull_with_archive_flare.flare == archive_value_for_flare
assert merged_pull_with_archive_flare._flare is None
assert merged_pull_with_archive_flare._flare_storage_path == mock_path

task = FlareCleanupTask()
task.run_cron_task(transactional_db)

mock_logs.assert_has_calls(
[
call("Starting FlareCleanupTask"),
call("FlareCleanupTask cleared 1 database flares"),
call("FlareCleanupTask cleared 1 Archive flares"),
]
)

# there is a cache for flare on the object (all ArchiveFields have this),
# so get a fresh copy of each object without the cached value
open_pull_with_local_flare = Pull.objects.get(id=open_pull_with_local_flare.id)
assert open_pull_with_local_flare.flare == local_value_for_flare
assert open_pull_with_local_flare._flare == local_value_for_flare
assert open_pull_with_local_flare._flare_storage_path is None

closed_pull_with_local_flare = Pull.objects.get(
id=closed_pull_with_local_flare.id
)
assert closed_pull_with_local_flare.flare == {}
assert closed_pull_with_local_flare._flare is None
assert closed_pull_with_local_flare._flare_storage_path is None

open_pull_with_archive_flare = Pull.objects.get(
id=open_pull_with_archive_flare.id
)
assert open_pull_with_archive_flare.flare == archive_value_for_flare
assert open_pull_with_archive_flare._flare is None
assert open_pull_with_archive_flare._flare_storage_path == mock_path

merged_pull_with_archive_flare = Pull.objects.get(
id=merged_pull_with_archive_flare.id
)
assert merged_pull_with_archive_flare.flare == {}
assert merged_pull_with_archive_flare._flare is None
assert merged_pull_with_archive_flare._flare_storage_path is None

mock_logs.reset_mock()
# check that once these pulls are corrected they are not corrected again
task = FlareCleanupTask()
task.run_cron_task(transactional_db)

mock_logs.assert_has_calls(
[
call("Starting FlareCleanupTask"),
call("FlareCleanupTask cleared 0 database flares"),
call("FlareCleanupTask cleared 0 Archive flares"),
]
)
Loading