From 8c8d351def528b678cdefbe79a05ecd746245b9c Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 14 Nov 2025 16:10:12 -0800 Subject: [PATCH 01/13] feat: add post/get endpoints to create/fetch ta export data --- .../api/sentry/tests/test_views.py | 433 +++++++++++------- apps/codecov-api/api/sentry/urls.py | 14 +- apps/codecov-api/api/sentry/views.py | 228 +++++++-- libs/shared/shared/celery_config.py | 14 + libs/shared/shared/utils/enums.py | 1 + 5 files changed, 474 insertions(+), 216 deletions(-) diff --git a/apps/codecov-api/api/sentry/tests/test_views.py b/apps/codecov-api/api/sentry/tests/test_views.py index 13b93759fe..9998e3f192 100644 --- a/apps/codecov-api/api/sentry/tests/test_views.py +++ b/apps/codecov-api/api/sentry/tests/test_views.py @@ -11,7 +11,6 @@ from shared.django_apps.codecov_auth.models import ( GithubAppInstallation, Owner, - Service, ) from shared.django_apps.codecov_auth.tests.factories import ( AccountFactory, @@ -19,8 +18,6 @@ PlanFactory, TierFactory, ) -from shared.django_apps.core.tests.factories import RepositoryFactory -from shared.django_apps.ta_timeseries.tests.factories import TestrunFactory from shared.plan.constants import PlanName, TierName @@ -831,15 +828,24 @@ def test_account_unlink_authentication_failure(self): self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) -class TestAnalyticsEuViewTests(TestCase): - databases = ["default", "ta_timeseries"] +class CreateTestAnalyticsExportTests(TestCase): + """Tests for the create_ta_export endpoint""" def setUp(self): self.client = APIClient() - self.url = reverse("test-analytics-eu") + self.url = reverse("create-ta-export") + self.valid_data = { + "integration_names": ["test-integration-1", "test-integration-2"], + "gcp_project_id": "test-project-123", + "destination_bucket": "test-bucket", + "destination_prefix": "test/prefix/path", + } - def _make_authenticated_request(self, data, jwt_payload=None): + def _make_authenticated_request(self, data=None, jwt_payload=None): """Helper method to make an authenticated request with JWT payload""" + if data is None: + data = self.valid_data + with patch( "codecov_auth.permissions.get_sentry_jwt_payload" ) as mock_get_payload: @@ -851,207 +857,292 @@ def _make_authenticated_request(self, data, jwt_payload=None): self.url, data=json.dumps(data), content_type="application/json" ) - def test_test_analytics_eu_empty_integration_names(self): - """Test that empty integration_names list fails validation""" - data = {"integration_names": []} - - response = self._make_authenticated_request(data=data) + def test_create_ta_export_authentication_failure(self): + """Test create_ta_export fails without proper authentication""" + response = self.client.post( + self.url, data=json.dumps(self.valid_data), content_type="application/json" + ) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + def test_create_ta_export_invalid_inputs_missing_required_fields(self): + response = self._make_authenticated_request(data={}) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("integration_names", response.data) + self.assertIn("gcp_project_id", response.data) + self.assertIn("destination_bucket", response.data) + self.assertIn("destination_prefix", response.data) - def test_test_analytics_eu_missing_integration_names(self): - """Test that missing integration_names fails validation""" - data = {} - + def test_create_ta_export_no_integration_names_provided(self): + """Test create_ta_export with empty integration_names list""" + data = { + "integration_names": [], + "gcp_project_id": "test-project-123", + "destination_bucket": "test-bucket", + "destination_prefix": "test/prefix/path", + } response = self._make_authenticated_request(data=data) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("integration_names", response.data) - @patch("api.sentry.views.log") - def test_test_analytics_eu_owner_not_found(self, mock_log): - """Test that non-existent owner is skipped with warning log""" - data = {"integration_names": ["non-existent-org"]} + @patch("api.sentry.views.TaskService") + def test_create_ta_export_successful_scheduling(self, mock_task_service_class): + mock_task_service = mock_task_service_class.return_value + mock_result_1 = patch("celery.result.AsyncResult").start() + mock_result_1.id = "task-id-1" + mock_result_1.status = "PENDING" - response = self._make_authenticated_request(data=data) + mock_result_2 = patch("celery.result.AsyncResult").start() + mock_result_2.id = "task-id-2" + mock_result_2.status = "PENDING" - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data["test_runs_per_integration"], {}) + mock_task_service.schedule_task.side_effect = [mock_result_1, mock_result_2] - mock_log.warning.assert_called_once() - warning_call = mock_log.warning.call_args[0][0] - self.assertIn("non-existent-org", warning_call) - self.assertIn("not found", warning_call) + response = self._make_authenticated_request(data=self.valid_data) - def test_test_analytics_eu_owner_without_repositories(self): - """Test that owner without repositories returns empty dict""" - OwnerFactory(name="org-no-repos", service=Service.GITHUB) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + self.assertEqual(response.data["total_tasks"], 2) + self.assertEqual(response.data["successfully_scheduled"], 2) + self.assertEqual(len(response.data["tasks"]), 2) - data = {"integration_names": ["org-no-repos"]} + # Verify first task + task1 = response.data["tasks"][0] + self.assertEqual(task1["integration_name"], "test-integration-1") + self.assertEqual(task1["task_id"], "task-id-1") + self.assertEqual(task1["status"], "PENDING") - response = self._make_authenticated_request(data=data) + # Verify second task + task2 = response.data["tasks"][1] + self.assertEqual(task2["integration_name"], "test-integration-2") + self.assertEqual(task2["task_id"], "task-id-2") + self.assertEqual(task2["status"], "PENDING") - self.assertEqual(response.status_code, status.HTTP_200_OK) + # Verify schedule_task was called correctly + self.assertEqual(mock_task_service.schedule_task.call_count, 2) + + # Verify first call + call_kwargs_1 = mock_task_service.schedule_task.call_args_list[0][1] self.assertEqual( - response.data["test_runs_per_integration"], {"org-no-repos": {}} + call_kwargs_1["kwargs"]["integration_name"], "test-integration-1" ) - - @patch("api.sentry.views.log") - def test_test_analytics_eu_mixed_owners_found_and_not_found(self, mock_log): - """Test mix of existing and non-existing owners""" - owner = OwnerFactory(name="org-exists", service=Service.GITHUB) - repo = RepositoryFactory( - author=owner, name="test-repo", test_analytics_enabled=True + self.assertEqual(call_kwargs_1["kwargs"]["gcp_project_id"], "test-project-123") + self.assertEqual(call_kwargs_1["kwargs"]["destination_bucket"], "test-bucket") + self.assertEqual( + call_kwargs_1["kwargs"]["destination_prefix"], "test/prefix/path" ) - TestrunFactory( - repo_id=repo.repoid, - commit_sha="abc123", - outcome="pass", - name="test_example", + + @patch("api.sentry.views.TaskService") + def test_create_ta_export_failed_schedule_task(self, mock_task_service_class): + mock_task_service = mock_task_service_class.return_value + mock_task_service.schedule_task.side_effect = Exception( + "Failed to schedule task" ) - data = {"integration_names": ["org-exists", "org-not-exists"]} + data = { + "integration_names": ["failing-integration"], + "gcp_project_id": "test-project-123", + "destination_bucket": "test-bucket", + "destination_prefix": "test/prefix/path", + } response = self._make_authenticated_request(data=data) - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertIn("org-exists", response.data["test_runs_per_integration"]) - self.assertNotIn("org-not-exists", response.data["test_runs_per_integration"]) - - # Verify warning log was called for non-existent owner - mock_log.warning.assert_called_once() - warning_call = mock_log.warning.call_args[0][0] - self.assertIn("org-not-exists", warning_call) - - def test_test_analytics_eu_filters_by_test_analytics_enabled(self): - """Test that only repositories with test_analytics_enabled=True are included""" - owner = OwnerFactory(name="org-with-repos", service=Service.GITHUB) - - repo_enabled = RepositoryFactory( - author=owner, - name="repo-enabled", - test_analytics_enabled=True, - ) - TestrunFactory( - repo_id=repo_enabled.repoid, - commit_sha="abc123", - outcome="pass", - name="test_enabled", - ) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + self.assertEqual(response.data["total_tasks"], 1) + self.assertEqual(response.data["successfully_scheduled"], 0) + self.assertEqual(len(response.data["tasks"]), 1) + + # Verify failed task + task = response.data["tasks"][0] + self.assertEqual(task["integration_name"], "failing-integration") + self.assertEqual(task["error"], "Failed to schedule task") + self.assertEqual(task["status"], "FAILED_TO_SCHEDULE") + self.assertNotIn("task_id", task) + + @patch("api.sentry.views.TaskService") + def test_create_ta_export_mixed_success_and_failure(self, mock_task_service_class): + mock_task_service = mock_task_service_class.return_value + mock_result = patch("celery.result.AsyncResult").start() + mock_result.id = "task-id-success" + mock_result.status = "PENDING" + + mock_task_service.schedule_task.side_effect = [ + mock_result, + Exception("Failed to schedule second task"), + ] - repo_disabled = RepositoryFactory( - author=owner, - name="repo-disabled", - test_analytics_enabled=False, - ) + response = self._make_authenticated_request(data=self.valid_data) - data = {"integration_names": ["org-with-repos"]} + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + self.assertEqual(response.data["total_tasks"], 2) + self.assertEqual(response.data["successfully_scheduled"], 1) + self.assertEqual(len(response.data["tasks"]), 2) - response = self._make_authenticated_request(data=data) + # Verify successful task + task1 = response.data["tasks"][0] + self.assertEqual(task1["integration_name"], "test-integration-1") + self.assertEqual(task1["task_id"], "task-id-success") + self.assertEqual(task1["status"], "PENDING") + + # Verify failed task + task2 = response.data["tasks"][1] + self.assertEqual(task2["integration_name"], "test-integration-2") + self.assertEqual(task2["error"], "Failed to schedule second task") + self.assertEqual(task2["status"], "FAILED_TO_SCHEDULE") + self.assertNotIn("task_id", task2) + + +class GetTestAnalyticsExportTests(TestCase): + """Tests for the get_ta_export endpoint""" + + def setUp(self): + self.client = APIClient() + self.task_id = "test-task-id-123" + self.url = reverse("get-ta-export", kwargs={"task_id": self.task_id}) + + def _make_authenticated_request(self, task_id=None, jwt_payload=None): + """Helper method to make an authenticated request with JWT payload""" + if task_id: + url = reverse("get-ta-export", kwargs={"task_id": task_id}) + else: + url = self.url + + with patch( + "codecov_auth.permissions.get_sentry_jwt_payload" + ) as mock_get_payload: + mock_get_payload.return_value = jwt_payload or { + "g_p": "github", + "g_o": "test-org", + } + return self.client.get(url) + + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_successful_with_success_result(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "SUCCESS" + mock_result.successful.return_value = True + mock_result.result = { + "successful": True, + "integration_name": "test-integration", + "exported_files": ["file1.json", "file2.json"], + "total_records": 1000, + } + + response = self._make_authenticated_request() self.assertEqual(response.status_code, status.HTTP_200_OK) - test_runs_data = response.data["test_runs_per_integration"]["org-with-repos"] - - # Only repo-enabled should be in the response - self.assertIn("repo-enabled", test_runs_data) - self.assertNotIn("repo-disabled", test_runs_data) - - test_runs_list = test_runs_data["repo-enabled"] - self.assertEqual(len(test_runs_list), 1) - self.assertEqual(test_runs_list[0]["name"], "test_enabled") - - def test_test_analytics_eu_multiple_owners_with_multiple_repos_and_testruns(self): - """Test complex scenario with 2 owners, different repositories and test runs""" - owner1 = OwnerFactory(name="org-one", service=Service.GITHUB) - repo1 = RepositoryFactory( - author=owner1, - name="repo-one", - test_analytics_enabled=True, - ) - TestrunFactory( - repo_id=repo1.repoid, - commit_sha="commit1", - outcome="pass", - name="test_one_first", - classname="TestClass1", - ) - TestrunFactory( - repo_id=repo1.repoid, - commit_sha="commit1", - outcome="failure", - name="test_one_second", - classname="TestClass2", + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "SUCCESS") + self.assertIn("result", response.data) + self.assertTrue(response.data["result"]["successful"]) + self.assertEqual( + response.data["result"]["integration_name"], "test-integration" ) + self.assertEqual(response.data["result"]["total_records"], 1000) + + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_successful_but_reported_failure(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "SUCCESS" + mock_result.successful.return_value = True + mock_result.result = { + "successful": False, + "integration_name": "test-integration", + "error": "Failed to export data: Connection timeout", + } + + response = self._make_authenticated_request() - owner2 = OwnerFactory(name="org-two", service=Service.GITHUB) - repo2_1 = RepositoryFactory( - author=owner2, - name="repo-two-first", - test_analytics_enabled=True, + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "SUCCESS") + self.assertIn("result", response.data) + self.assertFalse(response.data["result"]["successful"]) + self.assertEqual( + response.data["result"]["integration_name"], "test-integration" ) - TestrunFactory( - repo_id=repo2_1.repoid, - commit_sha="commit2", - outcome="pass", - name="test_two_first", - classname="TestClassA", + self.assertEqual( + response.data["result"]["error"], + "Failed to export data: Connection timeout", ) - repo2_2 = RepositoryFactory( - author=owner2, - name="repo-two-second", - test_analytics_enabled=True, - ) - TestrunFactory( - repo_id=repo2_2.repoid, - commit_sha="commit3", - outcome="skip", - name="test_two_second", - classname="TestClassB", + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_failed_with_error(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "FAILURE" + mock_result.successful.return_value = False + mock_result.failed.return_value = True + + test_exception = ValueError("Database connection failed") + mock_result.info = test_exception + + response = self._make_authenticated_request() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "FAILURE") + self.assertIn("error", response.data) + self.assertEqual( + response.data["error"]["message"], "Database connection failed" ) + self.assertEqual(response.data["error"]["type"], "ValueError") - data = {"integration_names": ["org-one", "org-two"]} + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_in_progress_pending(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "PENDING" + mock_result.successful.return_value = False + mock_result.failed.return_value = False - response = self._make_authenticated_request(data=data) + response = self._make_authenticated_request() self.assertEqual(response.status_code, status.HTTP_200_OK) - test_runs_per_integration = response.data["test_runs_per_integration"] - - # Verify org-one data - self.assertIn("org-one", test_runs_per_integration) - org_one_data = test_runs_per_integration["org-one"] - self.assertIn("repo-one", org_one_data) - self.assertEqual(len(org_one_data), 1) - - repo_one_testruns = org_one_data["repo-one"] - self.assertEqual(len(repo_one_testruns), 2) - testrun_names = [tr["name"] for tr in repo_one_testruns] - self.assertIn("test_one_first", testrun_names) - self.assertIn("test_one_second", testrun_names) - - self.assertIn("org-two", test_runs_per_integration) - org_two_data = test_runs_per_integration["org-two"] - self.assertIn("repo-two-first", org_two_data) - self.assertIn("repo-two-second", org_two_data) - self.assertEqual(len(org_two_data), 2) - - repo_two_first_testruns = org_two_data["repo-two-first"] - self.assertEqual(len(repo_two_first_testruns), 1) - self.assertEqual(repo_two_first_testruns[0]["name"], "test_two_first") - self.assertEqual(repo_two_first_testruns[0]["outcome"], "pass") - - repo_two_second_testruns = org_two_data["repo-two-second"] - self.assertEqual(len(repo_two_second_testruns), 1) - self.assertEqual(repo_two_second_testruns[0]["name"], "test_two_second") - self.assertEqual(repo_two_second_testruns[0]["outcome"], "skip") - - def test_test_analytics_eu_authentication_failure(self): - """Test that the endpoint requires authentication""" - data = {"integration_names": ["test-org"]} + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "PENDING") + self.assertNotIn("result", response.data) + self.assertNotIn("error", response.data) - response = self.client.post( - self.url, data=json.dumps(data), content_type="application/json" - ) + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_in_progress_retry(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "RETRY" + mock_result.successful.return_value = False + mock_result.failed.return_value = False - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + response = self._make_authenticated_request() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "RETRY") + self.assertNotIn("result", response.data) + self.assertNotIn("error", response.data) + + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_successful_with_non_dict_result(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "SUCCESS" + mock_result.successful.return_value = True + mock_result.result = "Simple string result" + + response = self._make_authenticated_request() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "SUCCESS") + self.assertIn("result", response.data) + self.assertEqual(response.data["result"], "Simple string result") + + @patch("api.sentry.views.AsyncResult") + def test_get_ta_export_failed_with_no_error_info(self, mock_async_result): + mock_result = mock_async_result.return_value + mock_result.status = "FAILURE" + mock_result.successful.return_value = False + mock_result.failed.return_value = True + mock_result.info = None + + response = self._make_authenticated_request() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["task_id"], self.task_id) + self.assertEqual(response.data["status"], "FAILURE") + self.assertIn("error", response.data) + self.assertEqual(response.data["error"]["type"], "Unknown") diff --git a/apps/codecov-api/api/sentry/urls.py b/apps/codecov-api/api/sentry/urls.py index 471a85db67..1ece3921a3 100644 --- a/apps/codecov-api/api/sentry/urls.py +++ b/apps/codecov-api/api/sentry/urls.py @@ -1,9 +1,19 @@ from django.urls import path -from .views import account_link, account_unlink, test_analytics_eu +from .views import ( + account_link, + account_unlink, + create_ta_export, + get_ta_export, +) urlpatterns = [ path("internal/account/link/", account_link, name="account-link"), path("internal/account/unlink/", account_unlink, name="account-unlink"), - path("internal/test-analytics/eu/", test_analytics_eu, name="test-analytics-eu"), + path("internal/test-analytics/exports/", create_ta_export, name="create-ta-export"), + path( + "internal/test-analytics/exports//", + get_ta_export, + name="get-ta-export", + ), ] diff --git a/apps/codecov-api/api/sentry/views.py b/apps/codecov-api/api/sentry/views.py index b07445ac14..39f9903699 100644 --- a/apps/codecov-api/api/sentry/views.py +++ b/apps/codecov-api/api/sentry/views.py @@ -1,9 +1,8 @@ import logging -from itertools import groupby import sentry_sdk +from celery.result import AsyncResult from django.conf import settings -from django.views.decorators.gzip import gzip_page from rest_framework import serializers, status from rest_framework.decorators import ( api_view, @@ -12,17 +11,16 @@ ) from rest_framework.response import Response -from api.public.v2.test_results.serializers import TestrunSerializer from codecov_auth.models import Account from codecov_auth.permissions import JWTAuthenticationPermission +from services.task import TaskService, celery_app +from shared.celery_config import export_test_analytics_data_task_name from shared.django_apps.codecov_auth.models import ( GithubAppInstallation, Owner, Plan, Service, ) -from shared.django_apps.core.models import Repository -from shared.django_apps.ta_timeseries.models import Testrun from shared.plan.constants import PlanName log = logging.getLogger(__name__) @@ -250,67 +248,211 @@ def account_unlink(request, *args, **kwargs): ) -class SentryTestAnalyticsEuSerializer(serializers.Serializer): - """Serializer for test analytics EU endpoint""" +class CreateTestAnalyticsExportSerializer(serializers.Serializer): + """Serializer for create test analytics export endpoint""" integration_names = serializers.ListField( child=serializers.CharField(), help_text="The Sentry integration names", min_length=1, ) + gcp_project_id = serializers.CharField( + help_text="The GCP project ID", + required=True, + ) + destination_bucket = serializers.CharField( + help_text="The destination bucket", + required=True, + ) + destination_prefix = serializers.CharField( + help_text="The destination prefix", + required=True, + ) -@gzip_page @api_view(["POST"]) @authentication_classes([]) @permission_classes([JWTAuthenticationPermission]) -def test_analytics_eu(request, *args, **kwargs): - serializer = SentryTestAnalyticsEuSerializer(data=request.data) +def create_ta_export(request, *args, **kwargs): + """ + Starts a Celery task to export test analytics data. + + POST /sentry/internal/test-analytics/exports/ + + Returns: + { + "tasks": [ + { + "integration_name": "sample-integration-name", + "task_id": "sample-celery-task-id", + "status": "PENDING" + } + ], + "total_tasks": 1, + "successfully_scheduled": 1, + "status": "PENDING" + } + """ + serializer = CreateTestAnalyticsExportSerializer(data=request.data) serializer.is_valid(raise_exception=True) integration_names = serializer.validated_data["integration_names"] + gcp_project_id = serializer.validated_data["gcp_project_id"] + destination_bucket = serializer.validated_data["destination_bucket"] + destination_prefix = serializer.validated_data["destination_prefix"] + + log.info( + "Starting data export for the following integrations", + extra={ + "integrations": integration_names, + "integration_count": len(integration_names), + }, + ) + + task_service = TaskService() + task_results = [] - # For every integration name, determine if an Owner record exist by filtering by name and service=github - test_runs_per_integration = {} - for name in integration_names: + for integration_name in integration_names: try: - owner = Owner.objects.get(name=name, service=Service.GITHUB) - except Owner.DoesNotExist: - log.warning( - f"Owner with name {name} and service {Service.GITHUB} not found" + result = task_service.schedule_task( + task_name=export_test_analytics_data_task_name, + kwargs={ + "integration_name": integration_name, + "gcp_project_id": gcp_project_id, + "destination_bucket": destination_bucket, + "destination_prefix": destination_prefix, + }, + apply_async_kwargs={}, + ) + task_id = result.id + task_status = result.status + + log.info( + "Scheduled TA export task", + extra={ + "integration_name": integration_name, + "task_id": task_id, + "task_status": task_status, + }, ) - continue - # Only fetch name and repoid fields - repo_id_to_name = dict( - Repository.objects.filter( - author=owner, test_analytics_enabled=True - ).values_list("repoid", "name") - ) + task_results.append( + { + "integration_name": integration_name, + "task_id": task_id, + "status": task_status, + } + ) + except Exception as e: + log.error( + "Failed to schedule TA export task", + extra={ + "integration_name": integration_name, + "error": str(e), + }, + exc_info=True, + ) + task_results.append( + { + "integration_name": integration_name, + "error": str(e), + "status": "FAILED_TO_SCHEDULE", + } + ) - if not repo_id_to_name: - test_runs_per_integration[name] = {} - continue + successful_tasks = [task for task in task_results if "task_id" in task] + log.info( + "Completed data export scheduling for test analytics", + extra={ + "total_requested": len(integration_names), + "successfully_scheduled": len(successful_tasks), + "failed_to_schedule": len(integration_names) - len(successful_tasks), + "task_ids": [task.get("task_id") for task in successful_tasks], + }, + ) - # Fetch all test runs for all repositories in a single query - test_runs = Testrun.objects.filter(repo_id__in=repo_id_to_name.keys()).order_by( - "repo_id", "-timestamp" - ) + return Response( + { + "tasks": task_results, + "total_tasks": len(task_results), + "successfully_scheduled": len(successful_tasks), + }, + status=status.HTTP_202_ACCEPTED, + ) - # Group by repo_id (data is already ordered by repo_id) and serialize each group - test_runs_per_repository = {} - for repo_id, group in groupby(test_runs, key=lambda tr: tr.repo_id): - repo_name = repo_id_to_name[repo_id] # Safe: we only fetch these repo_ids - test_runs_list = list(group) - test_runs_per_repository[repo_name] = TestrunSerializer( - test_runs_list, many=True - ).data - # Store each test_runs_per_repository in a dictionary - test_runs_per_integration[name] = test_runs_per_repository +@api_view(["GET"]) +@authentication_classes([]) +@permission_classes([JWTAuthenticationPermission]) +def get_ta_export(request, task_id, *args, **kwargs): + """ + Check the status of a test analytics export task. - return Response( + GET /sentry/internal/test-analytics/exports/ + + Returns: { - "test_runs_per_integration": test_runs_per_integration, + "task_id": "celery-task-id-here", + "status": "SUCCESS|PENDING|FAILURE|RETRY|STARTED", + "result": {...} # Only present if status is SUCCESS } + """ + log.info( + "Checking status for test analytics export task", extra={"task_id": task_id} ) + + result = AsyncResult(task_id, app=celery_app) + + response_data = { + "task_id": task_id, + "status": result.status, + } + + if result.successful(): + task_result = result.result + response_data["result"] = task_result + + if isinstance(task_result, dict) and not task_result.get("successful", True): + log.warning( + "Test analytics export task completed but reported failure", + extra={ + "task_id": task_id, + "integration_name": task_result.get("integration_name"), + "error": task_result.get("error"), + }, + ) + else: + log.info( + "Test analytics export task successful", + extra={ + "task_id": task_id, + "integration_name": task_result.get("integration_name") + if isinstance(task_result, dict) + else None, + }, + ) + elif result.failed(): + error_info = result.info + response_data["error"] = { + "message": str(error_info), + "type": type(error_info).__name__ if error_info else "Unknown", + } + + log.error( + "Test analytics export task failed with exception", + extra={ + "task_id": task_id, + "error_type": type(error_info).__name__ if error_info else "Unknown", + "error_message": str(error_info), + }, + ) + else: + log.info( + "TA export task in progress", + extra={ + "task_id": task_id, + "status": result.status, + }, + ) + + return Response(response_data, status=status.HTTP_200_OK) diff --git a/libs/shared/shared/celery_config.py b/libs/shared/shared/celery_config.py index d0b009d885..1733d4f6d7 100644 --- a/libs/shared/shared/celery_config.py +++ b/libs/shared/shared/celery_config.py @@ -41,6 +41,11 @@ process_owners_to_be_deleted_cron_task_name = ( f"app.tasks.{TaskConfigGroup.delete_owner.value}.ProcessOwnersToBeDeletedCron" ) + +# Export tasks +export_test_analytics_data_task_name = ( + f"app.tasks.{TaskConfigGroup.export_test_analytics.value}.ExportTestAnalyticsData" +) mark_owner_for_deletion_task_name = ( f"app.tasks.{TaskConfigGroup.mark_owner_for_deletion.value}.MarkOwnerForDeletion" ) @@ -539,6 +544,15 @@ class BaseCeleryConfig: default=task_default_queue, ) }, + export_test_analytics_data_task_name: { + "queue": get_config( + "setup", + "tasks", + TaskConfigGroup.export_test_analytics.value, + "queue", + default=task_default_queue, + ) + }, ghm_sync_plans_task_name: { "queue": get_config( "setup", diff --git a/libs/shared/shared/utils/enums.py b/libs/shared/shared/utils/enums.py index 3a11eba335..caa0cb3d71 100644 --- a/libs/shared/shared/utils/enums.py +++ b/libs/shared/shared/utils/enums.py @@ -29,6 +29,7 @@ class TaskConfigGroup(Enum): compute_comparison = "compute_comparison" daily = "daily" delete_owner = "delete_owner" + export_test_analytics = "export_test_analytics" flakes = "flakes" flush_repo = "flush_repo" healthcheck = "healthcheck" From 80e18ecbae3cc87e1abdae5ce161393c5478f3a5 Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 17 Nov 2025 13:33:51 -0800 Subject: [PATCH 02/13] adjust log lines --- apps/codecov-api/api/sentry/views.py | 38 +++++++++++++--------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/apps/codecov-api/api/sentry/views.py b/apps/codecov-api/api/sentry/views.py index 39f9903699..6af820448a 100644 --- a/apps/codecov-api/api/sentry/views.py +++ b/apps/codecov-api/api/sentry/views.py @@ -327,15 +327,6 @@ def create_ta_export(request, *args, **kwargs): task_id = result.id task_status = result.status - log.info( - "Scheduled TA export task", - extra={ - "integration_name": integration_name, - "task_id": task_id, - "task_status": task_status, - }, - ) - task_results.append( { "integration_name": integration_name, @@ -344,14 +335,6 @@ def create_ta_export(request, *args, **kwargs): } ) except Exception as e: - log.error( - "Failed to schedule TA export task", - extra={ - "integration_name": integration_name, - "error": str(e), - }, - exc_info=True, - ) task_results.append( { "integration_name": integration_name, @@ -361,13 +344,26 @@ def create_ta_export(request, *args, **kwargs): ) successful_tasks = [task for task in task_results if "task_id" in task] + failed_tasks = [task for task in task_results if "error" in task] + log.info( "Completed data export scheduling for test analytics", extra={ - "total_requested": len(integration_names), - "successfully_scheduled": len(successful_tasks), - "failed_to_schedule": len(integration_names) - len(successful_tasks), - "task_ids": [task.get("task_id") for task in successful_tasks], + "successful_tasks": [ + { + "integration_name": task["integration_name"], + "task_id": task["task_id"], + } + for task in successful_tasks + ], + "failed_tasks": [ + { + "integration_name": task["integration_name"], + "error": task["error"], + "status": task.get("status", "UNKNOWN"), + } + for task in failed_tasks + ], }, ) From 4d6415269a237152df4751921afdd2ae1b01161f Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 17 Nov 2025 17:30:25 -0800 Subject: [PATCH 03/13] address feedback + tests --- .../api/sentry/tests/test_views.py | 51 ++++++++++--------- apps/codecov-api/api/sentry/views.py | 50 +++++------------- 2 files changed, 39 insertions(+), 62 deletions(-) diff --git a/apps/codecov-api/api/sentry/tests/test_views.py b/apps/codecov-api/api/sentry/tests/test_views.py index 9998e3f192..5be426ca2e 100644 --- a/apps/codecov-api/api/sentry/tests/test_views.py +++ b/apps/codecov-api/api/sentry/tests/test_views.py @@ -1,5 +1,5 @@ import json -from unittest.mock import patch +from unittest.mock import Mock, patch import jwt from django.test import TestCase @@ -887,11 +887,12 @@ def test_create_ta_export_no_integration_names_provided(self): @patch("api.sentry.views.TaskService") def test_create_ta_export_successful_scheduling(self, mock_task_service_class): mock_task_service = mock_task_service_class.return_value - mock_result_1 = patch("celery.result.AsyncResult").start() + + mock_result_1 = Mock() mock_result_1.id = "task-id-1" mock_result_1.status = "PENDING" - mock_result_2 = patch("celery.result.AsyncResult").start() + mock_result_2 = Mock() mock_result_2.id = "task-id-2" mock_result_2.status = "PENDING" @@ -961,7 +962,8 @@ def test_create_ta_export_failed_schedule_task(self, mock_task_service_class): @patch("api.sentry.views.TaskService") def test_create_ta_export_mixed_success_and_failure(self, mock_task_service_class): mock_task_service = mock_task_service_class.return_value - mock_result = patch("celery.result.AsyncResult").start() + + mock_result = Mock() mock_result.id = "task-id-success" mock_result.status = "PENDING" @@ -1015,8 +1017,11 @@ def _make_authenticated_request(self, task_id=None, jwt_payload=None): } return self.client.get(url) + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_successful_with_success_result(self, mock_async_result): + def test_get_ta_export_successful_with_success_result( + self, mock_async_result, mock_celery_app + ): mock_result = mock_async_result.return_value mock_result.status = "SUCCESS" mock_result.successful.return_value = True @@ -1039,8 +1044,11 @@ def test_get_ta_export_successful_with_success_result(self, mock_async_result): ) self.assertEqual(response.data["result"]["total_records"], 1000) + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_successful_but_reported_failure(self, mock_async_result): + def test_get_ta_export_successful_but_reported_failure( + self, mock_async_result, mock_celery_app + ): mock_result = mock_async_result.return_value mock_result.status = "SUCCESS" mock_result.successful.return_value = True @@ -1065,8 +1073,9 @@ def test_get_ta_export_successful_but_reported_failure(self, mock_async_result): "Failed to export data: Connection timeout", ) + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_failed_with_error(self, mock_async_result): + def test_get_ta_export_failed_with_error(self, mock_async_result, mock_celery_app): mock_result = mock_async_result.return_value mock_result.status = "FAILURE" mock_result.successful.return_value = False @@ -1086,8 +1095,11 @@ def test_get_ta_export_failed_with_error(self, mock_async_result): ) self.assertEqual(response.data["error"]["type"], "ValueError") + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_in_progress_pending(self, mock_async_result): + def test_get_ta_export_in_progress_pending( + self, mock_async_result, mock_celery_app + ): mock_result = mock_async_result.return_value mock_result.status = "PENDING" mock_result.successful.return_value = False @@ -1101,8 +1113,9 @@ def test_get_ta_export_in_progress_pending(self, mock_async_result): self.assertNotIn("result", response.data) self.assertNotIn("error", response.data) + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_in_progress_retry(self, mock_async_result): + def test_get_ta_export_in_progress_retry(self, mock_async_result, mock_celery_app): mock_result = mock_async_result.return_value mock_result.status = "RETRY" mock_result.successful.return_value = False @@ -1116,23 +1129,11 @@ def test_get_ta_export_in_progress_retry(self, mock_async_result): self.assertNotIn("result", response.data) self.assertNotIn("error", response.data) + @patch("services.task.celery_app") @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_successful_with_non_dict_result(self, mock_async_result): - mock_result = mock_async_result.return_value - mock_result.status = "SUCCESS" - mock_result.successful.return_value = True - mock_result.result = "Simple string result" - - response = self._make_authenticated_request() - - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data["task_id"], self.task_id) - self.assertEqual(response.data["status"], "SUCCESS") - self.assertIn("result", response.data) - self.assertEqual(response.data["result"], "Simple string result") - - @patch("api.sentry.views.AsyncResult") - def test_get_ta_export_failed_with_no_error_info(self, mock_async_result): + def test_get_ta_export_failed_with_no_error_info( + self, mock_async_result, mock_celery_app + ): mock_result = mock_async_result.return_value mock_result.status = "FAILURE" mock_result.successful.return_value = False diff --git a/apps/codecov-api/api/sentry/views.py b/apps/codecov-api/api/sentry/views.py index 6af820448a..2b790c8ab9 100644 --- a/apps/codecov-api/api/sentry/views.py +++ b/apps/codecov-api/api/sentry/views.py @@ -293,6 +293,7 @@ def create_ta_export(request, *args, **kwargs): "status": "PENDING" } """ + serializer = CreateTestAnalyticsExportSerializer(data=request.data) serializer.is_valid(raise_exception=True) @@ -326,7 +327,6 @@ def create_ta_export(request, *args, **kwargs): ) task_id = result.id task_status = result.status - task_results.append( { "integration_name": integration_name, @@ -335,6 +335,10 @@ def create_ta_export(request, *args, **kwargs): } ) except Exception as e: + sentry_sdk.capture_message( + f"Integration {integration_name} has an exception", + level="error", + ) task_results.append( { "integration_name": integration_name, @@ -407,26 +411,10 @@ def get_ta_export(request, task_id, *args, **kwargs): if result.successful(): task_result = result.result response_data["result"] = task_result - - if isinstance(task_result, dict) and not task_result.get("successful", True): - log.warning( - "Test analytics export task completed but reported failure", - extra={ - "task_id": task_id, - "integration_name": task_result.get("integration_name"), - "error": task_result.get("error"), - }, - ) - else: - log.info( - "Test analytics export task successful", - extra={ - "task_id": task_id, - "integration_name": task_result.get("integration_name") - if isinstance(task_result, dict) - else None, - }, - ) + response_data["integration_name"] = task_result.get("integration_name") + if not task_result.get("successful", True): + response_data["task_reported_failure"] = True + response_data["error"] = task_result.get("error") elif result.failed(): error_info = result.info response_data["error"] = { @@ -434,21 +422,9 @@ def get_ta_export(request, task_id, *args, **kwargs): "type": type(error_info).__name__ if error_info else "Unknown", } - log.error( - "Test analytics export task failed with exception", - extra={ - "task_id": task_id, - "error_type": type(error_info).__name__ if error_info else "Unknown", - "error_message": str(error_info), - }, - ) - else: - log.info( - "TA export task in progress", - extra={ - "task_id": task_id, - "status": result.status, - }, - ) + log.info( + "Completed status check for test analytics export task", + extra=response_data, + ) return Response(response_data, status=status.HTTP_200_OK) From 0d49f2fbb230cadc8b4c788905e057da221337fc Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 18 Nov 2025 13:17:24 -0800 Subject: [PATCH 04/13] adjust exceptions --- apps/codecov-api/api/sentry/views.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/apps/codecov-api/api/sentry/views.py b/apps/codecov-api/api/sentry/views.py index 2b790c8ab9..504f653978 100644 --- a/apps/codecov-api/api/sentry/views.py +++ b/apps/codecov-api/api/sentry/views.py @@ -335,9 +335,12 @@ def create_ta_export(request, *args, **kwargs): } ) except Exception as e: - sentry_sdk.capture_message( - f"Integration {integration_name} has an exception", - level="error", + sentry_sdk.capture_exception( + e, + tags={ + "event": "test_analytics_export_scheduling_failed", + "integration_name": integration_name, + }, ) task_results.append( { @@ -411,10 +414,11 @@ def get_ta_export(request, task_id, *args, **kwargs): if result.successful(): task_result = result.result response_data["result"] = task_result - response_data["integration_name"] = task_result.get("integration_name") - if not task_result.get("successful", True): - response_data["task_reported_failure"] = True - response_data["error"] = task_result.get("error") + if isinstance(task_result, dict): + response_data["integration_name"] = task_result.get("integration_name") + if not task_result.get("successful", True): + response_data["task_reported_failure"] = True + response_data["error"] = task_result.get("error") elif result.failed(): error_info = result.info response_data["error"] = { From 4c43d6116970204b81a56255a372be1219c653fe Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 18 Nov 2025 15:33:02 -0800 Subject: [PATCH 05/13] add new task to test_celery_config test --- libs/shared/tests/unit/test_celery_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/shared/tests/unit/test_celery_config.py b/libs/shared/tests/unit/test_celery_config.py index b79ac0946a..14ebbb01e7 100644 --- a/libs/shared/tests/unit/test_celery_config.py +++ b/libs/shared/tests/unit/test_celery_config.py @@ -33,6 +33,7 @@ def test_celery_config(): "app.tasks.commit_update.CommitUpdate", "app.tasks.compute_comparison.ComputeComparison", "app.tasks.delete_owner.DeleteOwner", + "app.tasks.export_test_analytics.ExportTestAnalyticsData", "app.tasks.flakes.*", "app.tasks.flush_repo.FlushRepo", "app.tasks.mark_owner_for_deletion.MarkOwnerForDeletion", From 98441a46715f7388d093b46e336c1fb063ee129a Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 21 Nov 2025 16:36:54 -0800 Subject: [PATCH 06/13] feat: add worker task to export TA data --- apps/worker/tasks/__init__.py | 1 + .../tasks/export_test_analytics_data.py | 180 ++++++++++ .../unit/test_export_test_analytics_data.py | 333 ++++++++++++++++++ libs/shared/pyproject.toml | 1 + libs/shared/shared/storage/data_exporter.py | 76 ++++ pyproject.toml | 1 + 6 files changed, 592 insertions(+) create mode 100644 apps/worker/tasks/export_test_analytics_data.py create mode 100644 apps/worker/tasks/tests/unit/test_export_test_analytics_data.py create mode 100644 libs/shared/shared/storage/data_exporter.py diff --git a/apps/worker/tasks/__init__.py b/apps/worker/tasks/__init__.py index 293b48fd21..41be012dba 100644 --- a/apps/worker/tasks/__init__.py +++ b/apps/worker/tasks/__init__.py @@ -21,6 +21,7 @@ from tasks.compute_component_comparison import compute_component_comparison_task from tasks.delete_owner import delete_owner_task from tasks.detect_flakes import detect_flakes_task +from tasks.export_test_analytics_data import export_test_analytics_data_task from tasks.flush_repo import flush_repo from tasks.github_app_webhooks_check import gh_webhook_check_task from tasks.github_marketplace import ghm_sync_plans_task diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py new file mode 100644 index 0000000000..7966b6b10a --- /dev/null +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -0,0 +1,180 @@ +import logging +import pathlib +import tempfile +from datetime import datetime +from typing import Any + +import sentry_sdk +from google.cloud import storage +from sqlalchemy.orm import Session + +from app import celery_app +from shared.celery_config import export_test_analytics_data_task_name +from shared.django_apps.codecov_auth.models import Owner, Service +from shared.django_apps.core.models import Repository +from shared.django_apps.ta_timeseries.models import Testrun +from shared.storage.data_exporter import _Archiver +from tasks.base import BaseCodecovTask + +log = logging.getLogger(__name__) + + +def serialize_test_run(test_run: dict) -> list: + """ + Convert a test run dict to compact list format. This is done + instead of django serializers/dictionaries because it saves + space, and we're likely dealing with a lot of data. + + Args: + test_run: Dictionary containing test run data + + Returns: + List of values in a consistent field order + """ + return [ + test_run.get("filename"), + test_run["timestamp"].isoformat() if test_run.get("timestamp") else None, + test_run.get("testsuite"), + test_run.get("outcome"), + test_run.get("duration_seconds"), + test_run.get("failure_message"), + test_run.get("framework"), + test_run.get("commit_sha"), + test_run.get("branch"), + test_run.get("flags"), + ] + + +class ExportTestAnalyticsDataTask( + BaseCodecovTask, name=export_test_analytics_data_task_name +): + """ + This task exports test analytics data to a tarfile and uploads it to GCP. + """ + + # Override the global task_ignore_result=True setting + # so we can track this task's status and result + ignore_result = False + + def run_impl( + self, + _db_session: Session, + integration_name: str, + gcp_project_id: str, + destination_bucket: str, + destination_prefix: str, + **kwargs: Any, + ): + log.info( + "Received export test analytics data task", + extra={ + "integration_name": integration_name, + }, + ) + + try: + owner = Owner.objects.get(name=integration_name, service=Service.GITHUB) + except Owner.DoesNotExist: + log.warning( + f"Owner with name {integration_name} and service {Service.GITHUB} not found" + ) + return { + "successful": False, + "error": f"Owner with name {integration_name} and service {Service.GITHUB} not found", + } + + repo_id_to_name = dict( + Repository.objects.filter( + author=owner, test_analytics_enabled=True + ).values_list("repoid", "name") + ) + + if not repo_id_to_name: + log.warning(f"No repositories found for owner {integration_name}") + return { + "successful": False, + "error": f"No repositories found for owner {integration_name}", + } + + # Initialize GCS client and bucket + gcs_client = storage.Client(project=gcp_project_id) + bucket = gcs_client.bucket(destination_bucket) + + fields = [ + "filename", + "timestamp", + "testsuite", + "outcome", + "duration_seconds", + "failure_message", + "framework", + "commit_sha", + "branch", + "flags", + ] + + repositories_succeeded = [] + repositories_failed = [] + + # Process each repository and upload the data as tar.gz files using the archiver + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = pathlib.Path(temp_dir) + prefix = pathlib.PurePosixPath(destination_prefix) + + with _Archiver(temp_path, bucket, prefix) as archiver: + for repo_id, repo_name in repo_id_to_name.items(): + try: + start_time = datetime.now() + log.info(f"Processing repository: {repo_name} (ID: {repo_id})") + + test_runs_qs = ( + Testrun.objects.filter(repo_id=repo_id) + .order_by("-timestamp") + .values(*fields) + ) + test_runs_data = [ + serialize_test_run(tr) for tr in list(test_runs_qs) + ] + + repo_data = {"fields": fields, "data": test_runs_data} + blob_name = f"{integration_name}/{repo_name}.json" + archiver.upload_json(blob_name, repo_data) + + repositories_succeeded.append({"name": repo_name}) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + log.info( + f"Successfully processed repository {repo_name}: {len(test_runs_data)} test runs in {duration:.2f}s" + ) + except Exception as e: + log.error( + f"Failed to process repository {repo_name} (ID: {repo_id}): {str(e)}", + exc_info=True, + ) + sentry_sdk.capture_exception(e) + repositories_failed.append({"name": repo_name, "error": str(e)}) + + log.info( + "Export test analytics data task completed", + extra={ + "integration_name": integration_name, + "repositories_succeeded": repositories_succeeded, + "repositories_failed": repositories_failed, + }, + ) + + return { + "message": "Export test analytics data task completed", + "integration_name": integration_name, + "repositories_succeeded": repositories_succeeded, + "repositories_failed": repositories_failed, + } + + +RegisteredExportTestAnalyticsDataTask = celery_app.register_task( + ExportTestAnalyticsDataTask() +) +export_test_analytics_data_task = celery_app.tasks[ + RegisteredExportTestAnalyticsDataTask.name +] diff --git a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py new file mode 100644 index 0000000000..dfd5b6dd99 --- /dev/null +++ b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py @@ -0,0 +1,333 @@ +from datetime import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from shared.django_apps.codecov_auth.models import Service +from shared.django_apps.core.tests.factories import OwnerFactory, RepositoryFactory +from tasks.export_test_analytics_data import ( + ExportTestAnalyticsDataTask, + serialize_test_run, +) + + +@pytest.fixture +def test_owner(): + owner = OwnerFactory.create( + name="test_owner", service=Service.GITHUB.value, username="test_owner" + ) + return owner + + +@pytest.fixture +def test_repo(test_owner): + """Create a Django Repository model using factory""" + repo = RepositoryFactory.create( + author=test_owner, name="test_repo", test_analytics_enabled=True + ) + return repo + + +@pytest.fixture +def mock_gcs_and_archiver(): + with ( + patch("tasks.export_test_analytics_data.storage.Client") as mock_storage_client, + patch("tasks.export_test_analytics_data._Archiver") as mock_archiver, + ): + mock_bucket = MagicMock() + mock_storage_client.return_value.bucket.return_value = mock_bucket + + mock_archiver_instance = MagicMock() + mock_archiver.return_value.__enter__.return_value = mock_archiver_instance + mock_archiver.return_value.__exit__.return_value = False + + yield { + "storage_client": mock_storage_client, + "archiver": mock_archiver, + "archiver_instance": mock_archiver_instance, + "bucket": mock_bucket, + } + + +@pytest.fixture +def sample_test_run_data(): + return [ + { + "filename": "test_file.py", + "timestamp": datetime(2024, 1, 15, 10, 30, 0), + "testsuite": "TestSuite", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + } + ] + + +@pytest.fixture +def multiple_test_runs_data(): + return [ + { + "filename": "test_file1.py", + "timestamp": datetime(2024, 1, 15, 10, 30, 0), + "testsuite": "TestSuite1", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + }, + { + "filename": "test_file2.py", + "timestamp": datetime(2024, 1, 15, 10, 31, 0), + "testsuite": "TestSuite2", + "outcome": "failed", + "duration_seconds": 2.0, + "failure_message": "Assertion error", + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["integration"], + }, + { + "filename": "test_file3.py", + "timestamp": datetime(2024, 1, 15, 10, 32, 0), + "testsuite": "TestSuite3", + "outcome": "passed", + "duration_seconds": 0.8, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + }, + ] + + +class TestSerializeTestRun: + def test_serialize_test_run_all_fields(self): + timestamp = datetime(2024, 1, 15, 10, 30, 0) + test_run = { + "filename": "test_file.py", + "timestamp": timestamp, + "testsuite": "TestSuite", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + } + + result = serialize_test_run(test_run) + + assert result == [ + "test_file.py", + "2024-01-15T10:30:00", + "TestSuite", + "passed", + 1.5, + None, + "pytest", + "abc123", + "main", + ["unit"], + ] + + +@pytest.mark.django_db +class TestExportTestAnalyticsDataTask: + def test_owner_does_not_exist(self, dbsession): + integration_name = "nonexistent_owner" + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name=integration_name, + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["successful"] is False + assert ( + "Owner with name nonexistent_owner and service github not found" + in result["error"] + ) + + def test_owner_has_no_repos(self, dbsession, test_owner): + # Create a repo without test_analytics_enabled + repo = RepositoryFactory.create( + author=test_owner, name="test_repo", test_analytics_enabled=False + ) + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["successful"] is False + assert "No repositories found for owner test_owner" in result["error"] + + @patch("tasks.export_test_analytics_data.Testrun") + def test_successful_export_test_run_data( + self, + mock_testrun, + dbsession, + test_repo, + mock_gcs_and_archiver, + sample_test_run_data, + ): + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = sample_test_run_data + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 1 + assert result["repositories_succeeded"][0]["name"] == "test_repo" + assert len(result["repositories_failed"]) == 0 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + archiver_instance.upload_json.assert_called_once() + call_args = archiver_instance.upload_json.call_args + assert call_args[0][0] == "test_owner/test_repo.json" + assert "fields" in call_args[0][1] + assert "data" in call_args[0][1] + + @patch("tasks.export_test_analytics_data.Testrun") + @patch("tasks.export_test_analytics_data.sentry_sdk") + def test_failed_export_test_run_data( + self, mock_sentry, mock_testrun, dbsession, test_repo, mock_gcs_and_archiver + ): + mock_testrun.objects.filter.side_effect = Exception("Database error") + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 0 + assert len(result["repositories_failed"]) == 1 + assert result["repositories_failed"][0]["name"] == "test_repo" + assert "Database error" in result["repositories_failed"][0]["error"] + mock_sentry.capture_exception.assert_called_once() + + @patch("tasks.export_test_analytics_data.Testrun") + @patch("tasks.export_test_analytics_data.sentry_sdk") + def test_mixed_success_and_failure( + self, + mock_sentry, + mock_testrun, + dbsession, + test_owner, + mock_gcs_and_archiver, + sample_test_run_data, + ): + repo1 = RepositoryFactory.create( + author=test_owner, name="successful_repo", test_analytics_enabled=True + ) + repo2 = RepositoryFactory.create( + author=test_owner, name="failing_repo", test_analytics_enabled=True + ) + repo3 = RepositoryFactory.create( + author=test_owner, + name="another_successful_repo", + test_analytics_enabled=True, + ) + + # Mock test run data - make it fail for repo2 only + def filter_side_effect(*args, **kwargs): + repo_id = kwargs.get("repo_id") + if repo_id == repo2.repoid: + raise Exception("Processing error for failing_repo") + else: + mock_result = MagicMock() + mock_result.order_by.return_value.values.return_value = ( + sample_test_run_data + ) + return mock_result + + mock_testrun.objects.filter.side_effect = filter_side_effect + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 2 + assert len(result["repositories_failed"]) == 1 + + successful_names = [repo["name"] for repo in result["repositories_succeeded"]] + assert "successful_repo" in successful_names + assert "another_successful_repo" in successful_names + + assert result["repositories_failed"][0]["name"] == "failing_repo" + assert "Processing error" in result["repositories_failed"][0]["error"] + assert mock_sentry.capture_exception.call_count == 1 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + assert archiver_instance.upload_json.call_count == 2 + + @patch("tasks.export_test_analytics_data.Testrun") + def test_export_with_multiple_test_runs( + self, + mock_testrun, + dbsession, + test_repo, + mock_gcs_and_archiver, + multiple_test_runs_data, + ): + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = multiple_test_runs_data + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 1 + assert len(result["repositories_failed"]) == 0 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + archiver_instance.upload_json.assert_called_once() + call_args = archiver_instance.upload_json.call_args + uploaded_data = call_args[0][1] + + assert len(uploaded_data["data"]) == 3 + assert uploaded_data["fields"] == [ + "filename", + "timestamp", + "testsuite", + "outcome", + "duration_seconds", + "failure_message", + "framework", + "commit_sha", + "branch", + "flags", + ] diff --git a/libs/shared/pyproject.toml b/libs/shared/pyproject.toml index dedad2638a..37babb520c 100644 --- a/libs/shared/pyproject.toml +++ b/libs/shared/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.0", "google-cloud-pubsub>=2.18.4", + "google-cloud-storage>=2.18.0", "httpx>=0.23.0", "ijson>=3.2.3", "minio>=7.1.13", diff --git a/libs/shared/shared/storage/data_exporter.py b/libs/shared/shared/storage/data_exporter.py new file mode 100644 index 0000000000..fd3ea7f18e --- /dev/null +++ b/libs/shared/shared/storage/data_exporter.py @@ -0,0 +1,76 @@ +import io +import json +import pathlib +import tarfile +import time +from types import TracebackType +from typing import Literal, Self + +from google.cloud import storage + +ARCHIVE_SIZE_THRESHOLD = 4 * 1024 * 1024 * 1024 # 4GB + + +class _Archiver: + """Upload files to GCS as chunked tar.gz archives.""" + + def __init__( + self, + local_dir: pathlib.Path, + bucket: storage.Bucket, + prefix: pathlib.PurePosixPath, + ) -> None: + self.current_archive = local_dir / "current_archive.tar.gz" + self.bucket = bucket + self.prefix = prefix + self.time = time.time() + self.counter = -1 + self._next_chunk() + + def _next_chunk(self) -> None: + self.counter += 1 + self.entries = 0 + self.archive = tarfile.open(name=self.current_archive, mode="w:gz") + + def _upload(self) -> None: + self.archive.close() + blob_name = str(self.prefix / f"{self.counter}.tar.gz") + blob = self.bucket.blob(blob_name) + blob.upload_from_filename(self.current_archive, content_type="application/gzip") + + def _add_file( + self, name: str, file_obj: io.BufferedIOBase, size: int | None = None + ) -> None: + info = tarfile.TarInfo(name) + if size is None: + file_obj.seek(0, 2) + info.size = file_obj.tell() + file_obj.seek(0, 0) + else: + info.size = size + info.mode = 0o600 + info.mtime = self.time + self.archive.addfile(info, file_obj) + + self.entries += 1 + if self.current_archive.stat().st_size >= ARCHIVE_SIZE_THRESHOLD: + self._upload() + self._next_chunk() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + if exc_type is None and self.entries > 0: + self._upload() + return False + + def upload_json(self, blob_name: str, data: dict) -> None: + json_str = json.dumps(data, indent=2) + json_bytes = json_str.encode("utf-8") + self._add_file(blob_name, io.BytesIO(json_bytes), size=len(json_bytes)) diff --git a/pyproject.toml b/pyproject.toml index 148d365b11..2b620fd86b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ shared = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.1", "google-cloud-pubsub>=2.27.1", + "google-cloud-storage>=2.18.0", "httpx>=0.23.1", "ijson>=3.2.3", "minio>=7.2.15", From c9f62f280c61c59d70cf163f1f30bf66fcec33e6 Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 21 Nov 2025 17:04:53 -0800 Subject: [PATCH 07/13] revert changes meant for other branch --- apps/worker/tasks/__init__.py | 1 - .../tasks/export_test_analytics_data.py | 180 ---------- .../unit/test_export_test_analytics_data.py | 333 ------------------ libs/shared/pyproject.toml | 1 - libs/shared/shared/storage/data_exporter.py | 76 ---- pyproject.toml | 1 - 6 files changed, 592 deletions(-) delete mode 100644 apps/worker/tasks/export_test_analytics_data.py delete mode 100644 apps/worker/tasks/tests/unit/test_export_test_analytics_data.py delete mode 100644 libs/shared/shared/storage/data_exporter.py diff --git a/apps/worker/tasks/__init__.py b/apps/worker/tasks/__init__.py index 41be012dba..293b48fd21 100644 --- a/apps/worker/tasks/__init__.py +++ b/apps/worker/tasks/__init__.py @@ -21,7 +21,6 @@ from tasks.compute_component_comparison import compute_component_comparison_task from tasks.delete_owner import delete_owner_task from tasks.detect_flakes import detect_flakes_task -from tasks.export_test_analytics_data import export_test_analytics_data_task from tasks.flush_repo import flush_repo from tasks.github_app_webhooks_check import gh_webhook_check_task from tasks.github_marketplace import ghm_sync_plans_task diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py deleted file mode 100644 index 7966b6b10a..0000000000 --- a/apps/worker/tasks/export_test_analytics_data.py +++ /dev/null @@ -1,180 +0,0 @@ -import logging -import pathlib -import tempfile -from datetime import datetime -from typing import Any - -import sentry_sdk -from google.cloud import storage -from sqlalchemy.orm import Session - -from app import celery_app -from shared.celery_config import export_test_analytics_data_task_name -from shared.django_apps.codecov_auth.models import Owner, Service -from shared.django_apps.core.models import Repository -from shared.django_apps.ta_timeseries.models import Testrun -from shared.storage.data_exporter import _Archiver -from tasks.base import BaseCodecovTask - -log = logging.getLogger(__name__) - - -def serialize_test_run(test_run: dict) -> list: - """ - Convert a test run dict to compact list format. This is done - instead of django serializers/dictionaries because it saves - space, and we're likely dealing with a lot of data. - - Args: - test_run: Dictionary containing test run data - - Returns: - List of values in a consistent field order - """ - return [ - test_run.get("filename"), - test_run["timestamp"].isoformat() if test_run.get("timestamp") else None, - test_run.get("testsuite"), - test_run.get("outcome"), - test_run.get("duration_seconds"), - test_run.get("failure_message"), - test_run.get("framework"), - test_run.get("commit_sha"), - test_run.get("branch"), - test_run.get("flags"), - ] - - -class ExportTestAnalyticsDataTask( - BaseCodecovTask, name=export_test_analytics_data_task_name -): - """ - This task exports test analytics data to a tarfile and uploads it to GCP. - """ - - # Override the global task_ignore_result=True setting - # so we can track this task's status and result - ignore_result = False - - def run_impl( - self, - _db_session: Session, - integration_name: str, - gcp_project_id: str, - destination_bucket: str, - destination_prefix: str, - **kwargs: Any, - ): - log.info( - "Received export test analytics data task", - extra={ - "integration_name": integration_name, - }, - ) - - try: - owner = Owner.objects.get(name=integration_name, service=Service.GITHUB) - except Owner.DoesNotExist: - log.warning( - f"Owner with name {integration_name} and service {Service.GITHUB} not found" - ) - return { - "successful": False, - "error": f"Owner with name {integration_name} and service {Service.GITHUB} not found", - } - - repo_id_to_name = dict( - Repository.objects.filter( - author=owner, test_analytics_enabled=True - ).values_list("repoid", "name") - ) - - if not repo_id_to_name: - log.warning(f"No repositories found for owner {integration_name}") - return { - "successful": False, - "error": f"No repositories found for owner {integration_name}", - } - - # Initialize GCS client and bucket - gcs_client = storage.Client(project=gcp_project_id) - bucket = gcs_client.bucket(destination_bucket) - - fields = [ - "filename", - "timestamp", - "testsuite", - "outcome", - "duration_seconds", - "failure_message", - "framework", - "commit_sha", - "branch", - "flags", - ] - - repositories_succeeded = [] - repositories_failed = [] - - # Process each repository and upload the data as tar.gz files using the archiver - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = pathlib.Path(temp_dir) - prefix = pathlib.PurePosixPath(destination_prefix) - - with _Archiver(temp_path, bucket, prefix) as archiver: - for repo_id, repo_name in repo_id_to_name.items(): - try: - start_time = datetime.now() - log.info(f"Processing repository: {repo_name} (ID: {repo_id})") - - test_runs_qs = ( - Testrun.objects.filter(repo_id=repo_id) - .order_by("-timestamp") - .values(*fields) - ) - test_runs_data = [ - serialize_test_run(tr) for tr in list(test_runs_qs) - ] - - repo_data = {"fields": fields, "data": test_runs_data} - blob_name = f"{integration_name}/{repo_name}.json" - archiver.upload_json(blob_name, repo_data) - - repositories_succeeded.append({"name": repo_name}) - - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - log.info( - f"Successfully processed repository {repo_name}: {len(test_runs_data)} test runs in {duration:.2f}s" - ) - except Exception as e: - log.error( - f"Failed to process repository {repo_name} (ID: {repo_id}): {str(e)}", - exc_info=True, - ) - sentry_sdk.capture_exception(e) - repositories_failed.append({"name": repo_name, "error": str(e)}) - - log.info( - "Export test analytics data task completed", - extra={ - "integration_name": integration_name, - "repositories_succeeded": repositories_succeeded, - "repositories_failed": repositories_failed, - }, - ) - - return { - "message": "Export test analytics data task completed", - "integration_name": integration_name, - "repositories_succeeded": repositories_succeeded, - "repositories_failed": repositories_failed, - } - - -RegisteredExportTestAnalyticsDataTask = celery_app.register_task( - ExportTestAnalyticsDataTask() -) -export_test_analytics_data_task = celery_app.tasks[ - RegisteredExportTestAnalyticsDataTask.name -] diff --git a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py deleted file mode 100644 index dfd5b6dd99..0000000000 --- a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py +++ /dev/null @@ -1,333 +0,0 @@ -from datetime import datetime -from unittest.mock import MagicMock, patch - -import pytest - -from shared.django_apps.codecov_auth.models import Service -from shared.django_apps.core.tests.factories import OwnerFactory, RepositoryFactory -from tasks.export_test_analytics_data import ( - ExportTestAnalyticsDataTask, - serialize_test_run, -) - - -@pytest.fixture -def test_owner(): - owner = OwnerFactory.create( - name="test_owner", service=Service.GITHUB.value, username="test_owner" - ) - return owner - - -@pytest.fixture -def test_repo(test_owner): - """Create a Django Repository model using factory""" - repo = RepositoryFactory.create( - author=test_owner, name="test_repo", test_analytics_enabled=True - ) - return repo - - -@pytest.fixture -def mock_gcs_and_archiver(): - with ( - patch("tasks.export_test_analytics_data.storage.Client") as mock_storage_client, - patch("tasks.export_test_analytics_data._Archiver") as mock_archiver, - ): - mock_bucket = MagicMock() - mock_storage_client.return_value.bucket.return_value = mock_bucket - - mock_archiver_instance = MagicMock() - mock_archiver.return_value.__enter__.return_value = mock_archiver_instance - mock_archiver.return_value.__exit__.return_value = False - - yield { - "storage_client": mock_storage_client, - "archiver": mock_archiver, - "archiver_instance": mock_archiver_instance, - "bucket": mock_bucket, - } - - -@pytest.fixture -def sample_test_run_data(): - return [ - { - "filename": "test_file.py", - "timestamp": datetime(2024, 1, 15, 10, 30, 0), - "testsuite": "TestSuite", - "outcome": "passed", - "duration_seconds": 1.5, - "failure_message": None, - "framework": "pytest", - "commit_sha": "abc123", - "branch": "main", - "flags": ["unit"], - } - ] - - -@pytest.fixture -def multiple_test_runs_data(): - return [ - { - "filename": "test_file1.py", - "timestamp": datetime(2024, 1, 15, 10, 30, 0), - "testsuite": "TestSuite1", - "outcome": "passed", - "duration_seconds": 1.5, - "failure_message": None, - "framework": "pytest", - "commit_sha": "abc123", - "branch": "main", - "flags": ["unit"], - }, - { - "filename": "test_file2.py", - "timestamp": datetime(2024, 1, 15, 10, 31, 0), - "testsuite": "TestSuite2", - "outcome": "failed", - "duration_seconds": 2.0, - "failure_message": "Assertion error", - "framework": "pytest", - "commit_sha": "abc123", - "branch": "main", - "flags": ["integration"], - }, - { - "filename": "test_file3.py", - "timestamp": datetime(2024, 1, 15, 10, 32, 0), - "testsuite": "TestSuite3", - "outcome": "passed", - "duration_seconds": 0.8, - "failure_message": None, - "framework": "pytest", - "commit_sha": "abc123", - "branch": "main", - "flags": ["unit"], - }, - ] - - -class TestSerializeTestRun: - def test_serialize_test_run_all_fields(self): - timestamp = datetime(2024, 1, 15, 10, 30, 0) - test_run = { - "filename": "test_file.py", - "timestamp": timestamp, - "testsuite": "TestSuite", - "outcome": "passed", - "duration_seconds": 1.5, - "failure_message": None, - "framework": "pytest", - "commit_sha": "abc123", - "branch": "main", - "flags": ["unit"], - } - - result = serialize_test_run(test_run) - - assert result == [ - "test_file.py", - "2024-01-15T10:30:00", - "TestSuite", - "passed", - 1.5, - None, - "pytest", - "abc123", - "main", - ["unit"], - ] - - -@pytest.mark.django_db -class TestExportTestAnalyticsDataTask: - def test_owner_does_not_exist(self, dbsession): - integration_name = "nonexistent_owner" - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name=integration_name, - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["successful"] is False - assert ( - "Owner with name nonexistent_owner and service github not found" - in result["error"] - ) - - def test_owner_has_no_repos(self, dbsession, test_owner): - # Create a repo without test_analytics_enabled - repo = RepositoryFactory.create( - author=test_owner, name="test_repo", test_analytics_enabled=False - ) - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name="test_owner", - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["successful"] is False - assert "No repositories found for owner test_owner" in result["error"] - - @patch("tasks.export_test_analytics_data.Testrun") - def test_successful_export_test_run_data( - self, - mock_testrun, - dbsession, - test_repo, - mock_gcs_and_archiver, - sample_test_run_data, - ): - mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = sample_test_run_data - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name="test_owner", - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["integration_name"] == "test_owner" - assert len(result["repositories_succeeded"]) == 1 - assert result["repositories_succeeded"][0]["name"] == "test_repo" - assert len(result["repositories_failed"]) == 0 - - archiver_instance = mock_gcs_and_archiver["archiver_instance"] - archiver_instance.upload_json.assert_called_once() - call_args = archiver_instance.upload_json.call_args - assert call_args[0][0] == "test_owner/test_repo.json" - assert "fields" in call_args[0][1] - assert "data" in call_args[0][1] - - @patch("tasks.export_test_analytics_data.Testrun") - @patch("tasks.export_test_analytics_data.sentry_sdk") - def test_failed_export_test_run_data( - self, mock_sentry, mock_testrun, dbsession, test_repo, mock_gcs_and_archiver - ): - mock_testrun.objects.filter.side_effect = Exception("Database error") - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name="test_owner", - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["integration_name"] == "test_owner" - assert len(result["repositories_succeeded"]) == 0 - assert len(result["repositories_failed"]) == 1 - assert result["repositories_failed"][0]["name"] == "test_repo" - assert "Database error" in result["repositories_failed"][0]["error"] - mock_sentry.capture_exception.assert_called_once() - - @patch("tasks.export_test_analytics_data.Testrun") - @patch("tasks.export_test_analytics_data.sentry_sdk") - def test_mixed_success_and_failure( - self, - mock_sentry, - mock_testrun, - dbsession, - test_owner, - mock_gcs_and_archiver, - sample_test_run_data, - ): - repo1 = RepositoryFactory.create( - author=test_owner, name="successful_repo", test_analytics_enabled=True - ) - repo2 = RepositoryFactory.create( - author=test_owner, name="failing_repo", test_analytics_enabled=True - ) - repo3 = RepositoryFactory.create( - author=test_owner, - name="another_successful_repo", - test_analytics_enabled=True, - ) - - # Mock test run data - make it fail for repo2 only - def filter_side_effect(*args, **kwargs): - repo_id = kwargs.get("repo_id") - if repo_id == repo2.repoid: - raise Exception("Processing error for failing_repo") - else: - mock_result = MagicMock() - mock_result.order_by.return_value.values.return_value = ( - sample_test_run_data - ) - return mock_result - - mock_testrun.objects.filter.side_effect = filter_side_effect - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name="test_owner", - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["integration_name"] == "test_owner" - assert len(result["repositories_succeeded"]) == 2 - assert len(result["repositories_failed"]) == 1 - - successful_names = [repo["name"] for repo in result["repositories_succeeded"]] - assert "successful_repo" in successful_names - assert "another_successful_repo" in successful_names - - assert result["repositories_failed"][0]["name"] == "failing_repo" - assert "Processing error" in result["repositories_failed"][0]["error"] - assert mock_sentry.capture_exception.call_count == 1 - - archiver_instance = mock_gcs_and_archiver["archiver_instance"] - assert archiver_instance.upload_json.call_count == 2 - - @patch("tasks.export_test_analytics_data.Testrun") - def test_export_with_multiple_test_runs( - self, - mock_testrun, - dbsession, - test_repo, - mock_gcs_and_archiver, - multiple_test_runs_data, - ): - mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = multiple_test_runs_data - - result = ExportTestAnalyticsDataTask().run_impl( - dbsession, - integration_name="test_owner", - gcp_project_id="test-project", - destination_bucket="test-bucket", - destination_prefix="test-prefix", - ) - - assert result["integration_name"] == "test_owner" - assert len(result["repositories_succeeded"]) == 1 - assert len(result["repositories_failed"]) == 0 - - archiver_instance = mock_gcs_and_archiver["archiver_instance"] - archiver_instance.upload_json.assert_called_once() - call_args = archiver_instance.upload_json.call_args - uploaded_data = call_args[0][1] - - assert len(uploaded_data["data"]) == 3 - assert uploaded_data["fields"] == [ - "filename", - "timestamp", - "testsuite", - "outcome", - "duration_seconds", - "failure_message", - "framework", - "commit_sha", - "branch", - "flags", - ] diff --git a/libs/shared/pyproject.toml b/libs/shared/pyproject.toml index 37babb520c..dedad2638a 100644 --- a/libs/shared/pyproject.toml +++ b/libs/shared/pyproject.toml @@ -17,7 +17,6 @@ dependencies = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.0", "google-cloud-pubsub>=2.18.4", - "google-cloud-storage>=2.18.0", "httpx>=0.23.0", "ijson>=3.2.3", "minio>=7.1.13", diff --git a/libs/shared/shared/storage/data_exporter.py b/libs/shared/shared/storage/data_exporter.py deleted file mode 100644 index fd3ea7f18e..0000000000 --- a/libs/shared/shared/storage/data_exporter.py +++ /dev/null @@ -1,76 +0,0 @@ -import io -import json -import pathlib -import tarfile -import time -from types import TracebackType -from typing import Literal, Self - -from google.cloud import storage - -ARCHIVE_SIZE_THRESHOLD = 4 * 1024 * 1024 * 1024 # 4GB - - -class _Archiver: - """Upload files to GCS as chunked tar.gz archives.""" - - def __init__( - self, - local_dir: pathlib.Path, - bucket: storage.Bucket, - prefix: pathlib.PurePosixPath, - ) -> None: - self.current_archive = local_dir / "current_archive.tar.gz" - self.bucket = bucket - self.prefix = prefix - self.time = time.time() - self.counter = -1 - self._next_chunk() - - def _next_chunk(self) -> None: - self.counter += 1 - self.entries = 0 - self.archive = tarfile.open(name=self.current_archive, mode="w:gz") - - def _upload(self) -> None: - self.archive.close() - blob_name = str(self.prefix / f"{self.counter}.tar.gz") - blob = self.bucket.blob(blob_name) - blob.upload_from_filename(self.current_archive, content_type="application/gzip") - - def _add_file( - self, name: str, file_obj: io.BufferedIOBase, size: int | None = None - ) -> None: - info = tarfile.TarInfo(name) - if size is None: - file_obj.seek(0, 2) - info.size = file_obj.tell() - file_obj.seek(0, 0) - else: - info.size = size - info.mode = 0o600 - info.mtime = self.time - self.archive.addfile(info, file_obj) - - self.entries += 1 - if self.current_archive.stat().st_size >= ARCHIVE_SIZE_THRESHOLD: - self._upload() - self._next_chunk() - - def __enter__(self) -> Self: - return self - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc_value: BaseException | None, - exc_tb: TracebackType | None, - ) -> Literal[False]: - if exc_type is None and self.entries > 0: - self._upload() - return False - - def upload_json(self, blob_name: str, data: dict) -> None: - json_str = json.dumps(data, indent=2) - json_bytes = json_str.encode("utf-8") - self._add_file(blob_name, io.BytesIO(json_bytes), size=len(json_bytes)) diff --git a/pyproject.toml b/pyproject.toml index 2b620fd86b..148d365b11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,6 @@ shared = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.1", "google-cloud-pubsub>=2.27.1", - "google-cloud-storage>=2.18.0", "httpx>=0.23.1", "ijson>=3.2.3", "minio>=7.2.15", From c626ee11d0a2ab2f7a0aeecc4529ca7c93a020cc Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 21 Nov 2025 17:26:08 -0800 Subject: [PATCH 08/13] feat: add worker task to export TA data --- apps/worker/tasks/__init__.py | 1 + .../tasks/export_test_analytics_data.py | 180 ++++++++++ .../unit/test_export_test_analytics_data.py | 333 ++++++++++++++++++ libs/shared/pyproject.toml | 1 + libs/shared/shared/storage/data_exporter.py | 76 ++++ pyproject.toml | 1 + 6 files changed, 592 insertions(+) create mode 100644 apps/worker/tasks/export_test_analytics_data.py create mode 100644 apps/worker/tasks/tests/unit/test_export_test_analytics_data.py create mode 100644 libs/shared/shared/storage/data_exporter.py diff --git a/apps/worker/tasks/__init__.py b/apps/worker/tasks/__init__.py index 293b48fd21..41be012dba 100644 --- a/apps/worker/tasks/__init__.py +++ b/apps/worker/tasks/__init__.py @@ -21,6 +21,7 @@ from tasks.compute_component_comparison import compute_component_comparison_task from tasks.delete_owner import delete_owner_task from tasks.detect_flakes import detect_flakes_task +from tasks.export_test_analytics_data import export_test_analytics_data_task from tasks.flush_repo import flush_repo from tasks.github_app_webhooks_check import gh_webhook_check_task from tasks.github_marketplace import ghm_sync_plans_task diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py new file mode 100644 index 0000000000..7966b6b10a --- /dev/null +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -0,0 +1,180 @@ +import logging +import pathlib +import tempfile +from datetime import datetime +from typing import Any + +import sentry_sdk +from google.cloud import storage +from sqlalchemy.orm import Session + +from app import celery_app +from shared.celery_config import export_test_analytics_data_task_name +from shared.django_apps.codecov_auth.models import Owner, Service +from shared.django_apps.core.models import Repository +from shared.django_apps.ta_timeseries.models import Testrun +from shared.storage.data_exporter import _Archiver +from tasks.base import BaseCodecovTask + +log = logging.getLogger(__name__) + + +def serialize_test_run(test_run: dict) -> list: + """ + Convert a test run dict to compact list format. This is done + instead of django serializers/dictionaries because it saves + space, and we're likely dealing with a lot of data. + + Args: + test_run: Dictionary containing test run data + + Returns: + List of values in a consistent field order + """ + return [ + test_run.get("filename"), + test_run["timestamp"].isoformat() if test_run.get("timestamp") else None, + test_run.get("testsuite"), + test_run.get("outcome"), + test_run.get("duration_seconds"), + test_run.get("failure_message"), + test_run.get("framework"), + test_run.get("commit_sha"), + test_run.get("branch"), + test_run.get("flags"), + ] + + +class ExportTestAnalyticsDataTask( + BaseCodecovTask, name=export_test_analytics_data_task_name +): + """ + This task exports test analytics data to a tarfile and uploads it to GCP. + """ + + # Override the global task_ignore_result=True setting + # so we can track this task's status and result + ignore_result = False + + def run_impl( + self, + _db_session: Session, + integration_name: str, + gcp_project_id: str, + destination_bucket: str, + destination_prefix: str, + **kwargs: Any, + ): + log.info( + "Received export test analytics data task", + extra={ + "integration_name": integration_name, + }, + ) + + try: + owner = Owner.objects.get(name=integration_name, service=Service.GITHUB) + except Owner.DoesNotExist: + log.warning( + f"Owner with name {integration_name} and service {Service.GITHUB} not found" + ) + return { + "successful": False, + "error": f"Owner with name {integration_name} and service {Service.GITHUB} not found", + } + + repo_id_to_name = dict( + Repository.objects.filter( + author=owner, test_analytics_enabled=True + ).values_list("repoid", "name") + ) + + if not repo_id_to_name: + log.warning(f"No repositories found for owner {integration_name}") + return { + "successful": False, + "error": f"No repositories found for owner {integration_name}", + } + + # Initialize GCS client and bucket + gcs_client = storage.Client(project=gcp_project_id) + bucket = gcs_client.bucket(destination_bucket) + + fields = [ + "filename", + "timestamp", + "testsuite", + "outcome", + "duration_seconds", + "failure_message", + "framework", + "commit_sha", + "branch", + "flags", + ] + + repositories_succeeded = [] + repositories_failed = [] + + # Process each repository and upload the data as tar.gz files using the archiver + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = pathlib.Path(temp_dir) + prefix = pathlib.PurePosixPath(destination_prefix) + + with _Archiver(temp_path, bucket, prefix) as archiver: + for repo_id, repo_name in repo_id_to_name.items(): + try: + start_time = datetime.now() + log.info(f"Processing repository: {repo_name} (ID: {repo_id})") + + test_runs_qs = ( + Testrun.objects.filter(repo_id=repo_id) + .order_by("-timestamp") + .values(*fields) + ) + test_runs_data = [ + serialize_test_run(tr) for tr in list(test_runs_qs) + ] + + repo_data = {"fields": fields, "data": test_runs_data} + blob_name = f"{integration_name}/{repo_name}.json" + archiver.upload_json(blob_name, repo_data) + + repositories_succeeded.append({"name": repo_name}) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + log.info( + f"Successfully processed repository {repo_name}: {len(test_runs_data)} test runs in {duration:.2f}s" + ) + except Exception as e: + log.error( + f"Failed to process repository {repo_name} (ID: {repo_id}): {str(e)}", + exc_info=True, + ) + sentry_sdk.capture_exception(e) + repositories_failed.append({"name": repo_name, "error": str(e)}) + + log.info( + "Export test analytics data task completed", + extra={ + "integration_name": integration_name, + "repositories_succeeded": repositories_succeeded, + "repositories_failed": repositories_failed, + }, + ) + + return { + "message": "Export test analytics data task completed", + "integration_name": integration_name, + "repositories_succeeded": repositories_succeeded, + "repositories_failed": repositories_failed, + } + + +RegisteredExportTestAnalyticsDataTask = celery_app.register_task( + ExportTestAnalyticsDataTask() +) +export_test_analytics_data_task = celery_app.tasks[ + RegisteredExportTestAnalyticsDataTask.name +] diff --git a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py new file mode 100644 index 0000000000..dfd5b6dd99 --- /dev/null +++ b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py @@ -0,0 +1,333 @@ +from datetime import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from shared.django_apps.codecov_auth.models import Service +from shared.django_apps.core.tests.factories import OwnerFactory, RepositoryFactory +from tasks.export_test_analytics_data import ( + ExportTestAnalyticsDataTask, + serialize_test_run, +) + + +@pytest.fixture +def test_owner(): + owner = OwnerFactory.create( + name="test_owner", service=Service.GITHUB.value, username="test_owner" + ) + return owner + + +@pytest.fixture +def test_repo(test_owner): + """Create a Django Repository model using factory""" + repo = RepositoryFactory.create( + author=test_owner, name="test_repo", test_analytics_enabled=True + ) + return repo + + +@pytest.fixture +def mock_gcs_and_archiver(): + with ( + patch("tasks.export_test_analytics_data.storage.Client") as mock_storage_client, + patch("tasks.export_test_analytics_data._Archiver") as mock_archiver, + ): + mock_bucket = MagicMock() + mock_storage_client.return_value.bucket.return_value = mock_bucket + + mock_archiver_instance = MagicMock() + mock_archiver.return_value.__enter__.return_value = mock_archiver_instance + mock_archiver.return_value.__exit__.return_value = False + + yield { + "storage_client": mock_storage_client, + "archiver": mock_archiver, + "archiver_instance": mock_archiver_instance, + "bucket": mock_bucket, + } + + +@pytest.fixture +def sample_test_run_data(): + return [ + { + "filename": "test_file.py", + "timestamp": datetime(2024, 1, 15, 10, 30, 0), + "testsuite": "TestSuite", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + } + ] + + +@pytest.fixture +def multiple_test_runs_data(): + return [ + { + "filename": "test_file1.py", + "timestamp": datetime(2024, 1, 15, 10, 30, 0), + "testsuite": "TestSuite1", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + }, + { + "filename": "test_file2.py", + "timestamp": datetime(2024, 1, 15, 10, 31, 0), + "testsuite": "TestSuite2", + "outcome": "failed", + "duration_seconds": 2.0, + "failure_message": "Assertion error", + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["integration"], + }, + { + "filename": "test_file3.py", + "timestamp": datetime(2024, 1, 15, 10, 32, 0), + "testsuite": "TestSuite3", + "outcome": "passed", + "duration_seconds": 0.8, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + }, + ] + + +class TestSerializeTestRun: + def test_serialize_test_run_all_fields(self): + timestamp = datetime(2024, 1, 15, 10, 30, 0) + test_run = { + "filename": "test_file.py", + "timestamp": timestamp, + "testsuite": "TestSuite", + "outcome": "passed", + "duration_seconds": 1.5, + "failure_message": None, + "framework": "pytest", + "commit_sha": "abc123", + "branch": "main", + "flags": ["unit"], + } + + result = serialize_test_run(test_run) + + assert result == [ + "test_file.py", + "2024-01-15T10:30:00", + "TestSuite", + "passed", + 1.5, + None, + "pytest", + "abc123", + "main", + ["unit"], + ] + + +@pytest.mark.django_db +class TestExportTestAnalyticsDataTask: + def test_owner_does_not_exist(self, dbsession): + integration_name = "nonexistent_owner" + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name=integration_name, + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["successful"] is False + assert ( + "Owner with name nonexistent_owner and service github not found" + in result["error"] + ) + + def test_owner_has_no_repos(self, dbsession, test_owner): + # Create a repo without test_analytics_enabled + repo = RepositoryFactory.create( + author=test_owner, name="test_repo", test_analytics_enabled=False + ) + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["successful"] is False + assert "No repositories found for owner test_owner" in result["error"] + + @patch("tasks.export_test_analytics_data.Testrun") + def test_successful_export_test_run_data( + self, + mock_testrun, + dbsession, + test_repo, + mock_gcs_and_archiver, + sample_test_run_data, + ): + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = sample_test_run_data + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 1 + assert result["repositories_succeeded"][0]["name"] == "test_repo" + assert len(result["repositories_failed"]) == 0 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + archiver_instance.upload_json.assert_called_once() + call_args = archiver_instance.upload_json.call_args + assert call_args[0][0] == "test_owner/test_repo.json" + assert "fields" in call_args[0][1] + assert "data" in call_args[0][1] + + @patch("tasks.export_test_analytics_data.Testrun") + @patch("tasks.export_test_analytics_data.sentry_sdk") + def test_failed_export_test_run_data( + self, mock_sentry, mock_testrun, dbsession, test_repo, mock_gcs_and_archiver + ): + mock_testrun.objects.filter.side_effect = Exception("Database error") + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 0 + assert len(result["repositories_failed"]) == 1 + assert result["repositories_failed"][0]["name"] == "test_repo" + assert "Database error" in result["repositories_failed"][0]["error"] + mock_sentry.capture_exception.assert_called_once() + + @patch("tasks.export_test_analytics_data.Testrun") + @patch("tasks.export_test_analytics_data.sentry_sdk") + def test_mixed_success_and_failure( + self, + mock_sentry, + mock_testrun, + dbsession, + test_owner, + mock_gcs_and_archiver, + sample_test_run_data, + ): + repo1 = RepositoryFactory.create( + author=test_owner, name="successful_repo", test_analytics_enabled=True + ) + repo2 = RepositoryFactory.create( + author=test_owner, name="failing_repo", test_analytics_enabled=True + ) + repo3 = RepositoryFactory.create( + author=test_owner, + name="another_successful_repo", + test_analytics_enabled=True, + ) + + # Mock test run data - make it fail for repo2 only + def filter_side_effect(*args, **kwargs): + repo_id = kwargs.get("repo_id") + if repo_id == repo2.repoid: + raise Exception("Processing error for failing_repo") + else: + mock_result = MagicMock() + mock_result.order_by.return_value.values.return_value = ( + sample_test_run_data + ) + return mock_result + + mock_testrun.objects.filter.side_effect = filter_side_effect + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 2 + assert len(result["repositories_failed"]) == 1 + + successful_names = [repo["name"] for repo in result["repositories_succeeded"]] + assert "successful_repo" in successful_names + assert "another_successful_repo" in successful_names + + assert result["repositories_failed"][0]["name"] == "failing_repo" + assert "Processing error" in result["repositories_failed"][0]["error"] + assert mock_sentry.capture_exception.call_count == 1 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + assert archiver_instance.upload_json.call_count == 2 + + @patch("tasks.export_test_analytics_data.Testrun") + def test_export_with_multiple_test_runs( + self, + mock_testrun, + dbsession, + test_repo, + mock_gcs_and_archiver, + multiple_test_runs_data, + ): + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = multiple_test_runs_data + + result = ExportTestAnalyticsDataTask().run_impl( + dbsession, + integration_name="test_owner", + gcp_project_id="test-project", + destination_bucket="test-bucket", + destination_prefix="test-prefix", + ) + + assert result["integration_name"] == "test_owner" + assert len(result["repositories_succeeded"]) == 1 + assert len(result["repositories_failed"]) == 0 + + archiver_instance = mock_gcs_and_archiver["archiver_instance"] + archiver_instance.upload_json.assert_called_once() + call_args = archiver_instance.upload_json.call_args + uploaded_data = call_args[0][1] + + assert len(uploaded_data["data"]) == 3 + assert uploaded_data["fields"] == [ + "filename", + "timestamp", + "testsuite", + "outcome", + "duration_seconds", + "failure_message", + "framework", + "commit_sha", + "branch", + "flags", + ] diff --git a/libs/shared/pyproject.toml b/libs/shared/pyproject.toml index dedad2638a..37babb520c 100644 --- a/libs/shared/pyproject.toml +++ b/libs/shared/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.0", "google-cloud-pubsub>=2.18.4", + "google-cloud-storage>=2.18.0", "httpx>=0.23.0", "ijson>=3.2.3", "minio>=7.1.13", diff --git a/libs/shared/shared/storage/data_exporter.py b/libs/shared/shared/storage/data_exporter.py new file mode 100644 index 0000000000..fd3ea7f18e --- /dev/null +++ b/libs/shared/shared/storage/data_exporter.py @@ -0,0 +1,76 @@ +import io +import json +import pathlib +import tarfile +import time +from types import TracebackType +from typing import Literal, Self + +from google.cloud import storage + +ARCHIVE_SIZE_THRESHOLD = 4 * 1024 * 1024 * 1024 # 4GB + + +class _Archiver: + """Upload files to GCS as chunked tar.gz archives.""" + + def __init__( + self, + local_dir: pathlib.Path, + bucket: storage.Bucket, + prefix: pathlib.PurePosixPath, + ) -> None: + self.current_archive = local_dir / "current_archive.tar.gz" + self.bucket = bucket + self.prefix = prefix + self.time = time.time() + self.counter = -1 + self._next_chunk() + + def _next_chunk(self) -> None: + self.counter += 1 + self.entries = 0 + self.archive = tarfile.open(name=self.current_archive, mode="w:gz") + + def _upload(self) -> None: + self.archive.close() + blob_name = str(self.prefix / f"{self.counter}.tar.gz") + blob = self.bucket.blob(blob_name) + blob.upload_from_filename(self.current_archive, content_type="application/gzip") + + def _add_file( + self, name: str, file_obj: io.BufferedIOBase, size: int | None = None + ) -> None: + info = tarfile.TarInfo(name) + if size is None: + file_obj.seek(0, 2) + info.size = file_obj.tell() + file_obj.seek(0, 0) + else: + info.size = size + info.mode = 0o600 + info.mtime = self.time + self.archive.addfile(info, file_obj) + + self.entries += 1 + if self.current_archive.stat().st_size >= ARCHIVE_SIZE_THRESHOLD: + self._upload() + self._next_chunk() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + if exc_type is None and self.entries > 0: + self._upload() + return False + + def upload_json(self, blob_name: str, data: dict) -> None: + json_str = json.dumps(data, indent=2) + json_bytes = json_str.encode("utf-8") + self._add_file(blob_name, io.BytesIO(json_bytes), size=len(json_bytes)) diff --git a/pyproject.toml b/pyproject.toml index 148d365b11..2b620fd86b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ shared = [ "django>=4.2.25,<5.0.0", "google-auth>=2.21.1", "google-cloud-pubsub>=2.27.1", + "google-cloud-storage>=2.18.0", "httpx>=0.23.1", "ijson>=3.2.3", "minio>=7.2.15", From c6776d605cba27544530b13897433e8aae6a6412 Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 1 Dec 2025 14:55:13 -0800 Subject: [PATCH 09/13] feat: stream data into a temp file while batching every 10000 records --- .../tasks/export_test_analytics_data.py | 55 ++++++++++++++++--- .../unit/test_export_test_analytics_data.py | 47 +++++++--------- 2 files changed, 68 insertions(+), 34 deletions(-) diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py index 7966b6b10a..80a9af55c3 100644 --- a/apps/worker/tasks/export_test_analytics_data.py +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -1,3 +1,4 @@ +import json import logging import pathlib import tempfile @@ -18,6 +19,9 @@ log = logging.getLogger(__name__) +# Batch size for processing test runs from the database +BATCH_SIZE = 10000 + def serialize_test_run(test_run: dict) -> list: """ @@ -96,7 +100,6 @@ def run_impl( "error": f"No repositories found for owner {integration_name}", } - # Initialize GCS client and bucket gcs_client = storage.Client(project=gcp_project_id) bucket = gcs_client.bucket(destination_bucket) @@ -132,20 +135,58 @@ def run_impl( .order_by("-timestamp") .values(*fields) ) - test_runs_data = [ - serialize_test_run(tr) for tr in list(test_runs_qs) - ] - repo_data = {"fields": fields, "data": test_runs_data} + # Stream test runs to a JSON file + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".json", + delete=False, + dir=temp_dir, + ) as json_file: + json_file.write('{"fields": ') + json.dump(fields, json_file) + json_file.write(', "data": [') + + first_item = True + total_processed = 0 + + for test_run in test_runs_qs.iterator( + chunk_size=BATCH_SIZE + ): + if not first_item: + json_file.write(", ") + else: + first_item = False + + json.dump(serialize_test_run(test_run), json_file) + total_processed += 1 + + if total_processed % BATCH_SIZE == 0: + log.debug( + f"Processed {total_processed} test runs for {repo_name}" + ) + + json_file.write("]}") + json_file_path = json_file.name + + # Upload the JSON file, then cleaning it up blob_name = f"{integration_name}/{repo_name}.json" - archiver.upload_json(blob_name, repo_data) + with open(json_file_path, "rb") as f: + archiver._add_file(blob_name, f) + + pathlib.Path(json_file_path).unlink() repositories_succeeded.append({"name": repo_name}) end_time = datetime.now() duration = (end_time - start_time).total_seconds() log.info( - f"Successfully processed repository {repo_name}: {len(test_runs_data)} test runs in {duration:.2f}s" + "Successfully processed repository test runs", + extra={ + "name": repo_name, + "total_processed": total_processed, + "duration": duration, + }, ) except Exception as e: log.error( diff --git a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py index dfd5b6dd99..148c115f76 100644 --- a/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py +++ b/apps/worker/tasks/tests/unit/test_export_test_analytics_data.py @@ -186,7 +186,10 @@ def test_successful_export_test_run_data( mock_gcs_and_archiver, sample_test_run_data, ): - mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = sample_test_run_data + # Create a mock queryset that supports iterator() + mock_queryset = MagicMock() + mock_queryset.iterator.return_value = iter(sample_test_run_data) + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = mock_queryset result = ExportTestAnalyticsDataTask().run_impl( dbsession, @@ -202,11 +205,10 @@ def test_successful_export_test_run_data( assert len(result["repositories_failed"]) == 0 archiver_instance = mock_gcs_and_archiver["archiver_instance"] - archiver_instance.upload_json.assert_called_once() - call_args = archiver_instance.upload_json.call_args + # The new implementation uses _add_file instead of upload_json + archiver_instance._add_file.assert_called_once() + call_args = archiver_instance._add_file.call_args assert call_args[0][0] == "test_owner/test_repo.json" - assert "fields" in call_args[0][1] - assert "data" in call_args[0][1] @patch("tasks.export_test_analytics_data.Testrun") @patch("tasks.export_test_analytics_data.sentry_sdk") @@ -260,9 +262,9 @@ def filter_side_effect(*args, **kwargs): raise Exception("Processing error for failing_repo") else: mock_result = MagicMock() - mock_result.order_by.return_value.values.return_value = ( - sample_test_run_data - ) + mock_queryset = MagicMock() + mock_queryset.iterator.return_value = iter(sample_test_run_data) + mock_result.order_by.return_value.values.return_value = mock_queryset return mock_result mock_testrun.objects.filter.side_effect = filter_side_effect @@ -288,7 +290,7 @@ def filter_side_effect(*args, **kwargs): assert mock_sentry.capture_exception.call_count == 1 archiver_instance = mock_gcs_and_archiver["archiver_instance"] - assert archiver_instance.upload_json.call_count == 2 + assert archiver_instance._add_file.call_count == 2 @patch("tasks.export_test_analytics_data.Testrun") def test_export_with_multiple_test_runs( @@ -299,7 +301,10 @@ def test_export_with_multiple_test_runs( mock_gcs_and_archiver, multiple_test_runs_data, ): - mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = multiple_test_runs_data + # Create a mock queryset that supports iterator() + mock_queryset = MagicMock() + mock_queryset.iterator.return_value = iter(multiple_test_runs_data) + mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = mock_queryset result = ExportTestAnalyticsDataTask().run_impl( dbsession, @@ -314,20 +319,8 @@ def test_export_with_multiple_test_runs( assert len(result["repositories_failed"]) == 0 archiver_instance = mock_gcs_and_archiver["archiver_instance"] - archiver_instance.upload_json.assert_called_once() - call_args = archiver_instance.upload_json.call_args - uploaded_data = call_args[0][1] - - assert len(uploaded_data["data"]) == 3 - assert uploaded_data["fields"] == [ - "filename", - "timestamp", - "testsuite", - "outcome", - "duration_seconds", - "failure_message", - "framework", - "commit_sha", - "branch", - "flags", - ] + archiver_instance._add_file.assert_called_once() + # We can't easily inspect the JSON file content with the new streaming approach, + # but we can verify the correct blob name was used + call_args = archiver_instance._add_file.call_args + assert call_args[0][0] == "test_owner/test_repo.json" From 3e03dcb98db22e5d372fb7c9c43d8031d1e3ad8d Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 1 Dec 2025 15:52:04 -0800 Subject: [PATCH 10/13] adjust test to take repo_name instead of name in log call --- apps/worker/tasks/export_test_analytics_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py index 80a9af55c3..aa3376087d 100644 --- a/apps/worker/tasks/export_test_analytics_data.py +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -183,7 +183,7 @@ def run_impl( log.info( "Successfully processed repository test runs", extra={ - "name": repo_name, + "repo_name": repo_name, "total_processed": total_processed, "duration": duration, }, From 5036f3b902d9fcf4527592b0f7012ddbdabc83f0 Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 2 Dec 2025 13:48:14 -0800 Subject: [PATCH 11/13] add missing_ok=true to file path --- apps/worker/tasks/export_test_analytics_data.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py index aa3376087d..7c42d35c3d 100644 --- a/apps/worker/tasks/export_test_analytics_data.py +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -18,8 +18,6 @@ from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) - -# Batch size for processing test runs from the database BATCH_SIZE = 10000 @@ -128,7 +126,10 @@ def run_impl( for repo_id, repo_name in repo_id_to_name.items(): try: start_time = datetime.now() - log.info(f"Processing repository: {repo_name} (ID: {repo_id})") + log.info( + "Processing repository", + extra={"repo_name": repo_name, "repo_id": repo_id}, + ) test_runs_qs = ( Testrun.objects.filter(repo_id=repo_id) @@ -161,11 +162,6 @@ def run_impl( json.dump(serialize_test_run(test_run), json_file) total_processed += 1 - if total_processed % BATCH_SIZE == 0: - log.debug( - f"Processed {total_processed} test runs for {repo_name}" - ) - json_file.write("]}") json_file_path = json_file.name @@ -174,8 +170,7 @@ def run_impl( with open(json_file_path, "rb") as f: archiver._add_file(blob_name, f) - pathlib.Path(json_file_path).unlink() - + pathlib.Path(json_file_path).unlink(missing_ok=True) repositories_succeeded.append({"name": repo_name}) end_time = datetime.now() From 4fb1df87492f57e2682a28c6fd5a9678ccc91cd8 Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 2 Dec 2025 13:55:19 -0800 Subject: [PATCH 12/13] wrap file addition call in try/finally to ensure file is cleanedup --- apps/worker/tasks/export_test_analytics_data.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/worker/tasks/export_test_analytics_data.py b/apps/worker/tasks/export_test_analytics_data.py index 7c42d35c3d..b0e2263115 100644 --- a/apps/worker/tasks/export_test_analytics_data.py +++ b/apps/worker/tasks/export_test_analytics_data.py @@ -165,12 +165,13 @@ def run_impl( json_file.write("]}") json_file_path = json_file.name - # Upload the JSON file, then cleaning it up blob_name = f"{integration_name}/{repo_name}.json" - with open(json_file_path, "rb") as f: - archiver._add_file(blob_name, f) + try: + with open(json_file_path, "rb") as f: + archiver._add_file(blob_name, f) + finally: + pathlib.Path(json_file_path).unlink(missing_ok=True) - pathlib.Path(json_file_path).unlink(missing_ok=True) repositories_succeeded.append({"name": repo_name}) end_time = datetime.now() From e515b8420d48b6a22a9510a9761a3f2a979dc5c4 Mon Sep 17 00:00:00 2001 From: Adrian Date: Thu, 4 Dec 2025 15:55:39 -0800 Subject: [PATCH 13/13] closer the connection regardless to make sure the tarfile isn't left open --- libs/shared/shared/storage/data_exporter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/shared/shared/storage/data_exporter.py b/libs/shared/shared/storage/data_exporter.py index fd3ea7f18e..cc05e4f659 100644 --- a/libs/shared/shared/storage/data_exporter.py +++ b/libs/shared/shared/storage/data_exporter.py @@ -68,6 +68,7 @@ def __exit__( ) -> Literal[False]: if exc_type is None and self.entries > 0: self._upload() + self.archive.close() return False def upload_json(self, blob_name: str, data: dict) -> None: