Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new ta tasks #976

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

feat: new ta tasks #976

wants to merge 11 commits into from

Conversation

joseph-sentry
Copy link
Contributor

this PR creates new ta processor and finisher tasks and uses them behind a feature flag in the upload task for a smooth rollout

@joseph-sentry joseph-sentry requested a review from a team December 19, 2024 21:39
Copy link

sentry-io bot commented Dec 19, 2024

🔍 Existing Issues For Review

Your pull request is modifying functions with the following pre-existing issues:

📄 File: tasks/upload.py

Function Unhandled Issue
_schedule_test_results_processing_task [**TypeError: unsupported operand type(s) for

Did you find this useful? React with a 👍 or 👎

Copy link

This PR includes changes to shared. Please review them here: https://github.com/codecov/shared/compare/2674ae99811767e63151590906691aed4c5ce1f9...

@codecov-staging
Copy link

codecov-staging bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 92.89100% with 45 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
tasks/ta_finisher.py 83.43% 28 Missing ⚠️
services/ta_finishing.py 84.15% 16 Missing ⚠️
tasks/ta_processor.py 98.48% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 92.89100% with 45 lines in your changes missing coverage. Please review.

Project coverage is 97.89%. Comparing base (6f971be) to head (0a6bb1e).
Report is 1 commits behind head on main.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
tasks/ta_finisher.py 83.43% 28 Missing ⚠️
services/ta_finishing.py 84.15% 16 Missing ⚠️
tasks/ta_processor.py 98.48% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #976      +/-   ##
==========================================
- Coverage   97.97%   97.89%   -0.09%     
==========================================
  Files         444      449       +5     
  Lines       35855    36471     +616     
==========================================
+ Hits        35129    35702     +573     
- Misses        726      769      +43     
Flag Coverage Δ
integration 42.48% <58.45%> (+0.30%) ⬆️
unit 90.16% <63.50%> (-0.45%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

⚠️ Impact Analysis from Codecov is deprecated and will be sunset on Jan 31 2025. See more

@codecov-qa
Copy link

codecov-qa bot commented Dec 19, 2024

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1775 1 1774 1
View the top 1 failed tests by shortest run time
tasks/tests/unit/test_ta_finisher_task.py::::test_test_analytics
Stack Traces | 6.14s run time
self = <urllib3.connection.HTTPConnection object at 0x7faf2332a450>

    def _new_conn(self):
        """Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw["source_address"] = self.source_address
    
        if self.socket_options:
            extra_kw["socket_options"] = self.socket_options
    
        try:
>           conn = connection.create_connection(
                (self._dns_host, self.port), self.timeout, **extra_kw
            )

.../local/lib/python3.13............/site-packages/urllib3/connection.py:174: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13.../urllib3/util/connection.py:72: in create_connection
    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

host = 'minio', port = 80, family = <AddressFamily.AF_UNSPEC: 0>
type = <SocketKind.SOCK_STREAM: 1>, proto = 0, flags = 0

    def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
        """Resolve host and port into list of address info entries.
    
        Translate the host/port argument into a sequence of 5-tuples that contain
        all the necessary arguments for creating a socket connected to that service.
        host is a domain name, a string representation of an IPv4/v6 address or
        None. port is a string service name such as 'http', a numeric port number or
        None. By passing None as the value of host and port, you can pass NULL to
        the underlying C API.
    
        The family, type and proto arguments can be optionally specified in order to
        narrow the list of addresses returned. Passing zero as a value for each of
        these arguments selects the full range of results.
        """
        # We override this function since we want to translate the numeric family
        # and socket type values to enum constants.
        addrlist = []
>       for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
E       socket.gaierror: [Errno -3] Temporary failure in name resolution

.../local/lib/python3.13/socket.py:975: gaierror

During handling of the above exception, another exception occurred:

self = <urllib3.connectionpool.HTTPConnectionPool object at 0x7faf74448d70>
method = 'GET', url = '/archive?location=', body = None
headers = HTTPHeaderDict({'Host': 'minio', 'User-Agent': 'MinIO (Linux; x86_64) minio-py/7.1.13', 'x-amz-content-sha256': 'e3b0c...ers=host;x-amz-content-sha256;x-amz-date, Signature=67ab1281d9510cdc0a263045800328ed517580413c0649d61292114e021b186d'})
retries = Retry(total=0, connect=None, read=None, redirect=None, status=None)
redirect = False, assert_same_host = False
timeout = <object object at 0x7faf85591670>, pool_timeout = None
release_conn = True, chunked = False, body_pos = None
response_kw = {'preload_content': True}
parsed_url = Url(scheme=None, auth=None, host=None, port=None, path='/archive', query='location=', fragment=None)
destination_scheme = None, conn = None, release_this_conn = True
http_tunnel_required = False, err = None, clean_exit = False

    def urlopen(
        self,
        method,
        url,
        body=None,
        headers=None,
        retries=None,
        redirect=True,
        assert_same_host=True,
        timeout=_Default,
        pool_timeout=None,
        release_conn=None,
        chunked=False,
        body_pos=None,
        **response_kw
    ):
        """
        Get a connection from the pool and perform an HTTP request. This is the
        lowest level call for making a request, so you'll need to specify all
        the raw details.
    
        .. note::
    
           More commonly, it's appropriate to use a convenience method provided
           by :class:`.RequestMethods`, such as :meth:`request`.
    
        .. note::
    
           `release_conn` will only behave as expected if
           `preload_content=False` because we want to make
           `preload_content=False` the default behaviour someday soon without
           breaking backwards compatibility.
    
        :param method:
            HTTP request method (such as GET, POST, PUT, etc.)
    
        :param url:
            The URL to perform the request on.
    
        :param body:
            Data to send in the request body, either :class:`str`, :class:`bytes`,
            an iterable of :class:`str`/:class:`bytes`, or a file-like object.
    
        :param headers:
            Dictionary of custom headers to send, such as User-Agent,
            If-None-Match, etc. If None, pool headers are used. If provided,
            these headers completely replace any pool-specific headers.
    
        :param retries:
            Configure the number of retries to allow before raising a
            :class:`~urllib3.exceptions.MaxRetryError` exception.
    
            Pass ``None`` to retry until you receive a response. Pass a
            :class:`~urllib3.util.retry.Retry` object for fine-grained control
            over different types of retries.
            Pass an integer number to retry connection errors that many times,
            but no other types of errors. Pass zero to never retry.
    
            If ``False``, then retries are disabled and any exception is raised
            immediately. Also, instead of raising a MaxRetryError on redirects,
            the redirect response will be returned.
    
        :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int.
    
        :param redirect:
            If True, automatically handle redirects (status codes 301, 302,
            303, 307, 308). Each redirect counts as a retry. Disabling retries
            will disable redirect, too.
    
        :param assert_same_host:
            If ``True``, will make sure that the host of the pool requests is
            consistent else will raise HostChangedError. When ``False``, you can
            use the pool on an HTTP proxy and request foreign hosts.
    
        :param timeout:
            If specified, overrides the default timeout for this one
            request. It may be a float (in seconds) or an instance of
            :class:`urllib3.util.Timeout`.
    
        :param pool_timeout:
            If set and the pool is set to block=True, then this method will
            block for ``pool_timeout`` seconds and raise EmptyPoolError if no
            connection is available within the time period.
    
        :param release_conn:
            If False, then the urlopen call will not release the connection
            back into the pool once a response is received (but will release if
            you read the entire contents of the response such as when
            `preload_content=True`). This is useful if you're not preloading
            the response's content immediately. You will need to call
            ``r.release_conn()`` on the response ``r`` to return the connection
            back into the pool. If None, it takes the value of
            ``response_kw.get('preload_content', True)``.
    
        :param chunked:
            If True, urllib3 will send the body using chunked transfer
            encoding. Otherwise, urllib3 will send the body using the standard
            content-length form. Defaults to False.
    
        :param int body_pos:
            Position to seek to in file-like body in the event of a retry or
            redirect. Typically this won't need to be set because urllib3 will
            auto-populate the value when needed.
    
        :param \\**response_kw:
            Additional parameters are passed to
            :meth:`urllib3.response.HTTPResponse.from_httplib`
        """
    
        parsed_url = parse_url(url)
        destination_scheme = parsed_url.scheme
    
        if headers is None:
            headers = self.headers
    
        if not isinstance(retries, Retry):
            retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
    
        if release_conn is None:
            release_conn = response_kw.get("preload_content", True)
    
        # Check host
        if assert_same_host and not self.is_same_host(url):
            raise HostChangedError(self, url, retries)
    
        # Ensure that the URL we're connecting to is properly encoded
        if url.startswith("/"):
            url = six.ensure_str(_encode_target(url))
        else:
            url = six.ensure_str(parsed_url.url)
    
        conn = None
    
        # Track whether `conn` needs to be released before
        # returning/raising/recursing. Update this variable if necessary, and
        # leave `release_conn` constant throughout the function. That way, if
        # the function recurses, the original value of `release_conn` will be
        # passed down into the recursive call, and its value will be respected.
        #
        # See issue #651 [1] for details.
        #
        # [1] <https://github..../urllib3/issues/651>
        release_this_conn = release_conn
    
        http_tunnel_required = connection_requires_http_tunnel(
            self.proxy, self.proxy_config, destination_scheme
        )
    
        # Merge the proxy headers. Only done when not using HTTP CONNECT. We
        # have to copy the headers dict so we can safely change it without those
        # changes being reflected in anyone else's copy.
        if not http_tunnel_required:
            headers = headers.copy()
            headers.update(self.proxy_headers)
    
        # Must keep the exception bound to a separate variable or else Python 3
        # complains about UnboundLocalError.
        err = None
    
        # Keep track of whether we cleanly exited the except block. This
        # ensures we do proper cleanup in finally.
        clean_exit = False
    
        # Rewind body position, if needed. Record current position
        # for future rewinds in the event of a redirect/retry.
        body_pos = set_file_position(body, body_pos)
    
        try:
            # Request a connection from the queue.
            timeout_obj = self._get_timeout(timeout)
            conn = self._get_conn(timeout=pool_timeout)
    
            conn.timeout = timeout_obj.connect_timeout
    
            is_new_proxy_conn = self.proxy is not None and not getattr(
                conn, "sock", None
            )
            if is_new_proxy_conn and http_tunnel_required:
                self._prepare_proxy(conn)
    
            # Make the request on the httplib connection object.
>           httplib_response = self._make_request(
                conn,
                method,
                url,
                timeout=timeout_obj,
                body=body,
                headers=headers,
                chunked=chunked,
            )

.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:715: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:416: in _make_request
    conn.request(method, url, **httplib_request_kw)
.../local/lib/python3.13............/site-packages/urllib3/connection.py:244: in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
.../local/lib/python3.13/http/client.py:1336: in request
    self._send_request(method, url, body, headers, encode_chunked)
.../local/lib/python3.13/http/client.py:1382: in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
.../local/lib/python3.13/http/client.py:1331: in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
.../local/lib/python3.13/http/client.py:1091: in _send_output
    self.send(msg)
.../local/lib/python3.13/http/client.py:1035: in send
    self.connect()
.../local/lib/python3.13............/site-packages/urllib3/connection.py:205: in connect
    conn = self._new_conn()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7faf2332a450>

    def _new_conn(self):
        """Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw["source_address"] = self.source_address
    
        if self.socket_options:
            extra_kw["socket_options"] = self.socket_options
    
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port), self.timeout, **extra_kw
            )
    
        except SocketTimeout:
            raise ConnectTimeoutError(
                self,
                "Connection to %s timed out. (connect timeout=%s)"
                % (self.host, self.timeout),
            )
    
        except SocketError as e:
>           raise NewConnectionError(
                self, "Failed to establish a new connection: %s" % e
            )
E           urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution

.../local/lib/python3.13............/site-packages/urllib3/connection.py:186: NewConnectionError

During handling of the above exception, another exception occurred:

dbsession = <sqlalchemy.orm.session.Session object at 0x7faf6ee28aa0>
mocker = <pytest_mock.plugin.MockFixture object at 0x7faf586dabd0>
celery_app = <Celery celery.tests at 0x7faf23d402d0>

    def test_test_analytics(dbsession, mocker, celery_app):
        url = "literally/whatever"
        storage_service = get_appropriate_storage_service(None)
    
        testruns = [
            {
                "name": "test_divide",
                "outcome": "fail",
                "duration_seconds": 0.001,
                "failure_message": "hello world",
            },
            {"name": "test_multiply", "outcome": "pass", "duration_seconds": 0.002},
            {"name": "test_add", "outcome": "skip", "duration_seconds": 0.003},
            {"name": "test_subtract", "outcome": "error", "duration_seconds": 0.004},
        ]
    
        content: str = generate_junit_xml(testruns)
        print(content)
        json_content: dict[str, Any] = {
            "test_results_files": [
                {
                    "filename": "hello_world.junit.xml",
                    "data": base64.b64encode(zlib.compress(content.encode())).decode(),
                }
            ],
        }
>       storage_service.write_file("archive", url, json.dumps(json_content).encode())

.../tests/unit/test_ta_finisher_task.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13.../shared/storage/minio.py:187: in write_file
    self.minio_client.put_object(
.../local/lib/python3.13................../site-packages/minio/api.py:1766: in put_object
    raise exc
.../local/lib/python3.13................../site-packages/minio/api.py:1720: in put_object
    return self._put_object(
.../local/lib/python3.13................../site-packages/minio/api.py:1578: in _put_object
    response = self._execute(
.../local/lib/python3.13................../site-packages/minio/api.py:400: in _execute
    region = self._get_region(bucket_name, None)
.../local/lib/python3.13................../site-packages/minio/api.py:467: in _get_region
    response = self._url_open(
.../local/lib/python3.13................../site-packages/minio/api.py:266: in _url_open
    response = self._http.urlopen(
.../local/lib/python3.13.../site-packages/urllib3/poolmanager.py:376: in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:801: in urlopen
    retries = retries.increment(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Retry(total=0, connect=None, read=None, redirect=None, status=None)
method = 'GET', url = '/archive?location=', response = None
error = NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')
_pool = <urllib3.connectionpool.HTTPConnectionPool object at 0x7faf74448d70>
_stacktrace = <traceback object at 0x7faf45fa8580>

    def increment(
        self,
        method=None,
        url=None,
        response=None,
        error=None,
        _pool=None,
        _stacktrace=None,
    ):
        """Return a new Retry object with incremented retry counters.
    
        :param response: A response object, or None, if the server did not
            return a response.
        :type response: :class:`~urllib3.response.HTTPResponse`
        :param Exception error: An error encountered during the request, or
            None if the response was received successfully.
    
        :return: A new ``Retry`` object.
        """
        if self.total is False and error:
            # Disabled, indicate to re-raise the error.
            raise six.reraise(type(error), error, _stacktrace)
    
        total = self.total
        if total is not None:
            total -= 1
    
        connect = self.connect
        read = self.read
        redirect = self.redirect
        status_count = self.status
        other = self.other
        cause = "unknown"
        status = None
        redirect_location = None
    
        if error and self._is_connection_error(error):
            # Connect retry?
            if connect is False:
                raise six.reraise(type(error), error, _stacktrace)
            elif connect is not None:
                connect -= 1
    
        elif error and self._is_read_error(error):
            # Read retry?
            if read is False or not self._is_method_retryable(method):
                raise six.reraise(type(error), error, _stacktrace)
            elif read is not None:
                read -= 1
    
        elif error:
            # Other retry?
            if other is not None:
                other -= 1
    
        elif response and response.get_redirect_location():
            # Redirect retry?
            if redirect is not None:
                redirect -= 1
            cause = "too many redirects"
            redirect_location = response.get_redirect_location()
            status = response.status
    
        else:
            # Incrementing because of a server error like a 500 in
            # status_forcelist and the given method is in the allowed_methods
            cause = ResponseError.GENERIC_ERROR
            if response and response.status:
                if status_count is not None:
                    status_count -= 1
                cause = ResponseError.SPECIFIC_ERROR.format(status_code=response.status)
                status = response.status
    
        history = self.history + (
            RequestHistory(method, url, error, status, redirect_location),
        )
    
        new_retry = self.new(
            total=total,
            connect=connect,
            read=read,
            redirect=redirect,
            status=status_count,
            other=other,
            history=history,
        )
    
        if new_retry.is_exhausted():
>           raise MaxRetryError(_pool, url, error or ResponseError(cause))
E           urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='minio', port=80): Max retries exceeded with url: /archive?location= (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))

.../local/lib/python3.13.../urllib3/util/retry.py:594: MaxRetryError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link

codecov-public-qa bot commented Dec 19, 2024

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1775 1 1774 1
View the top 1 failed tests by shortest run time
tasks/tests/unit/test_ta_finisher_task.py::::test_test_analytics
Stack Traces | 6.14s run time
self = <urllib3.connection.HTTPConnection object at 0x7faf2332a450>

    def _new_conn(self):
        """Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw["source_address"] = self.source_address
    
        if self.socket_options:
            extra_kw["socket_options"] = self.socket_options
    
        try:
>           conn = connection.create_connection(
                (self._dns_host, self.port), self.timeout, **extra_kw
            )

.../local/lib/python3.13............/site-packages/urllib3/connection.py:174: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13.../urllib3/util/connection.py:72: in create_connection
    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

host = 'minio', port = 80, family = <AddressFamily.AF_UNSPEC: 0>
type = <SocketKind.SOCK_STREAM: 1>, proto = 0, flags = 0

    def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
        """Resolve host and port into list of address info entries.
    
        Translate the host/port argument into a sequence of 5-tuples that contain
        all the necessary arguments for creating a socket connected to that service.
        host is a domain name, a string representation of an IPv4/v6 address or
        None. port is a string service name such as 'http', a numeric port number or
        None. By passing None as the value of host and port, you can pass NULL to
        the underlying C API.
    
        The family, type and proto arguments can be optionally specified in order to
        narrow the list of addresses returned. Passing zero as a value for each of
        these arguments selects the full range of results.
        """
        # We override this function since we want to translate the numeric family
        # and socket type values to enum constants.
        addrlist = []
>       for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
E       socket.gaierror: [Errno -3] Temporary failure in name resolution

.../local/lib/python3.13/socket.py:975: gaierror

During handling of the above exception, another exception occurred:

self = <urllib3.connectionpool.HTTPConnectionPool object at 0x7faf74448d70>
method = 'GET', url = '/archive?location=', body = None
headers = HTTPHeaderDict({'Host': 'minio', 'User-Agent': 'MinIO (Linux; x86_64) minio-py/7.1.13', 'x-amz-content-sha256': 'e3b0c...ers=host;x-amz-content-sha256;x-amz-date, Signature=67ab1281d9510cdc0a263045800328ed517580413c0649d61292114e021b186d'})
retries = Retry(total=0, connect=None, read=None, redirect=None, status=None)
redirect = False, assert_same_host = False
timeout = <object object at 0x7faf85591670>, pool_timeout = None
release_conn = True, chunked = False, body_pos = None
response_kw = {'preload_content': True}
parsed_url = Url(scheme=None, auth=None, host=None, port=None, path='/archive', query='location=', fragment=None)
destination_scheme = None, conn = None, release_this_conn = True
http_tunnel_required = False, err = None, clean_exit = False

    def urlopen(
        self,
        method,
        url,
        body=None,
        headers=None,
        retries=None,
        redirect=True,
        assert_same_host=True,
        timeout=_Default,
        pool_timeout=None,
        release_conn=None,
        chunked=False,
        body_pos=None,
        **response_kw
    ):
        """
        Get a connection from the pool and perform an HTTP request. This is the
        lowest level call for making a request, so you'll need to specify all
        the raw details.
    
        .. note::
    
           More commonly, it's appropriate to use a convenience method provided
           by :class:`.RequestMethods`, such as :meth:`request`.
    
        .. note::
    
           `release_conn` will only behave as expected if
           `preload_content=False` because we want to make
           `preload_content=False` the default behaviour someday soon without
           breaking backwards compatibility.
    
        :param method:
            HTTP request method (such as GET, POST, PUT, etc.)
    
        :param url:
            The URL to perform the request on.
    
        :param body:
            Data to send in the request body, either :class:`str`, :class:`bytes`,
            an iterable of :class:`str`/:class:`bytes`, or a file-like object.
    
        :param headers:
            Dictionary of custom headers to send, such as User-Agent,
            If-None-Match, etc. If None, pool headers are used. If provided,
            these headers completely replace any pool-specific headers.
    
        :param retries:
            Configure the number of retries to allow before raising a
            :class:`~urllib3.exceptions.MaxRetryError` exception.
    
            Pass ``None`` to retry until you receive a response. Pass a
            :class:`~urllib3.util.retry.Retry` object for fine-grained control
            over different types of retries.
            Pass an integer number to retry connection errors that many times,
            but no other types of errors. Pass zero to never retry.
    
            If ``False``, then retries are disabled and any exception is raised
            immediately. Also, instead of raising a MaxRetryError on redirects,
            the redirect response will be returned.
    
        :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int.
    
        :param redirect:
            If True, automatically handle redirects (status codes 301, 302,
            303, 307, 308). Each redirect counts as a retry. Disabling retries
            will disable redirect, too.
    
        :param assert_same_host:
            If ``True``, will make sure that the host of the pool requests is
            consistent else will raise HostChangedError. When ``False``, you can
            use the pool on an HTTP proxy and request foreign hosts.
    
        :param timeout:
            If specified, overrides the default timeout for this one
            request. It may be a float (in seconds) or an instance of
            :class:`urllib3.util.Timeout`.
    
        :param pool_timeout:
            If set and the pool is set to block=True, then this method will
            block for ``pool_timeout`` seconds and raise EmptyPoolError if no
            connection is available within the time period.
    
        :param release_conn:
            If False, then the urlopen call will not release the connection
            back into the pool once a response is received (but will release if
            you read the entire contents of the response such as when
            `preload_content=True`). This is useful if you're not preloading
            the response's content immediately. You will need to call
            ``r.release_conn()`` on the response ``r`` to return the connection
            back into the pool. If None, it takes the value of
            ``response_kw.get('preload_content', True)``.
    
        :param chunked:
            If True, urllib3 will send the body using chunked transfer
            encoding. Otherwise, urllib3 will send the body using the standard
            content-length form. Defaults to False.
    
        :param int body_pos:
            Position to seek to in file-like body in the event of a retry or
            redirect. Typically this won't need to be set because urllib3 will
            auto-populate the value when needed.
    
        :param \\**response_kw:
            Additional parameters are passed to
            :meth:`urllib3.response.HTTPResponse.from_httplib`
        """
    
        parsed_url = parse_url(url)
        destination_scheme = parsed_url.scheme
    
        if headers is None:
            headers = self.headers
    
        if not isinstance(retries, Retry):
            retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
    
        if release_conn is None:
            release_conn = response_kw.get("preload_content", True)
    
        # Check host
        if assert_same_host and not self.is_same_host(url):
            raise HostChangedError(self, url, retries)
    
        # Ensure that the URL we're connecting to is properly encoded
        if url.startswith("/"):
            url = six.ensure_str(_encode_target(url))
        else:
            url = six.ensure_str(parsed_url.url)
    
        conn = None
    
        # Track whether `conn` needs to be released before
        # returning/raising/recursing. Update this variable if necessary, and
        # leave `release_conn` constant throughout the function. That way, if
        # the function recurses, the original value of `release_conn` will be
        # passed down into the recursive call, and its value will be respected.
        #
        # See issue #651 [1] for details.
        #
        # [1] <https://github..../urllib3/issues/651>
        release_this_conn = release_conn
    
        http_tunnel_required = connection_requires_http_tunnel(
            self.proxy, self.proxy_config, destination_scheme
        )
    
        # Merge the proxy headers. Only done when not using HTTP CONNECT. We
        # have to copy the headers dict so we can safely change it without those
        # changes being reflected in anyone else's copy.
        if not http_tunnel_required:
            headers = headers.copy()
            headers.update(self.proxy_headers)
    
        # Must keep the exception bound to a separate variable or else Python 3
        # complains about UnboundLocalError.
        err = None
    
        # Keep track of whether we cleanly exited the except block. This
        # ensures we do proper cleanup in finally.
        clean_exit = False
    
        # Rewind body position, if needed. Record current position
        # for future rewinds in the event of a redirect/retry.
        body_pos = set_file_position(body, body_pos)
    
        try:
            # Request a connection from the queue.
            timeout_obj = self._get_timeout(timeout)
            conn = self._get_conn(timeout=pool_timeout)
    
            conn.timeout = timeout_obj.connect_timeout
    
            is_new_proxy_conn = self.proxy is not None and not getattr(
                conn, "sock", None
            )
            if is_new_proxy_conn and http_tunnel_required:
                self._prepare_proxy(conn)
    
            # Make the request on the httplib connection object.
>           httplib_response = self._make_request(
                conn,
                method,
                url,
                timeout=timeout_obj,
                body=body,
                headers=headers,
                chunked=chunked,
            )

.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:715: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:416: in _make_request
    conn.request(method, url, **httplib_request_kw)
.../local/lib/python3.13............/site-packages/urllib3/connection.py:244: in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
.../local/lib/python3.13/http/client.py:1336: in request
    self._send_request(method, url, body, headers, encode_chunked)
.../local/lib/python3.13/http/client.py:1382: in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
.../local/lib/python3.13/http/client.py:1331: in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
.../local/lib/python3.13/http/client.py:1091: in _send_output
    self.send(msg)
.../local/lib/python3.13/http/client.py:1035: in send
    self.connect()
.../local/lib/python3.13............/site-packages/urllib3/connection.py:205: in connect
    conn = self._new_conn()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7faf2332a450>

    def _new_conn(self):
        """Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw["source_address"] = self.source_address
    
        if self.socket_options:
            extra_kw["socket_options"] = self.socket_options
    
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port), self.timeout, **extra_kw
            )
    
        except SocketTimeout:
            raise ConnectTimeoutError(
                self,
                "Connection to %s timed out. (connect timeout=%s)"
                % (self.host, self.timeout),
            )
    
        except SocketError as e:
>           raise NewConnectionError(
                self, "Failed to establish a new connection: %s" % e
            )
E           urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution

.../local/lib/python3.13............/site-packages/urllib3/connection.py:186: NewConnectionError

During handling of the above exception, another exception occurred:

dbsession = <sqlalchemy.orm.session.Session object at 0x7faf6ee28aa0>
mocker = <pytest_mock.plugin.MockFixture object at 0x7faf586dabd0>
celery_app = <Celery celery.tests at 0x7faf23d402d0>

    def test_test_analytics(dbsession, mocker, celery_app):
        url = "literally/whatever"
        storage_service = get_appropriate_storage_service(None)
    
        testruns = [
            {
                "name": "test_divide",
                "outcome": "fail",
                "duration_seconds": 0.001,
                "failure_message": "hello world",
            },
            {"name": "test_multiply", "outcome": "pass", "duration_seconds": 0.002},
            {"name": "test_add", "outcome": "skip", "duration_seconds": 0.003},
            {"name": "test_subtract", "outcome": "error", "duration_seconds": 0.004},
        ]
    
        content: str = generate_junit_xml(testruns)
        print(content)
        json_content: dict[str, Any] = {
            "test_results_files": [
                {
                    "filename": "hello_world.junit.xml",
                    "data": base64.b64encode(zlib.compress(content.encode())).decode(),
                }
            ],
        }
>       storage_service.write_file("archive", url, json.dumps(json_content).encode())

.../tests/unit/test_ta_finisher_task.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13.../shared/storage/minio.py:187: in write_file
    self.minio_client.put_object(
.../local/lib/python3.13................../site-packages/minio/api.py:1766: in put_object
    raise exc
.../local/lib/python3.13................../site-packages/minio/api.py:1720: in put_object
    return self._put_object(
.../local/lib/python3.13................../site-packages/minio/api.py:1578: in _put_object
    response = self._execute(
.../local/lib/python3.13................../site-packages/minio/api.py:400: in _execute
    region = self._get_region(bucket_name, None)
.../local/lib/python3.13................../site-packages/minio/api.py:467: in _get_region
    response = self._url_open(
.../local/lib/python3.13................../site-packages/minio/api.py:266: in _url_open
    response = self._http.urlopen(
.../local/lib/python3.13.../site-packages/urllib3/poolmanager.py:376: in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:829: in urlopen
    return self.urlopen(
.../local/lib/python3.13......................../site-packages/urllib3/connectionpool.py:801: in urlopen
    retries = retries.increment(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Retry(total=0, connect=None, read=None, redirect=None, status=None)
method = 'GET', url = '/archive?location=', response = None
error = NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')
_pool = <urllib3.connectionpool.HTTPConnectionPool object at 0x7faf74448d70>
_stacktrace = <traceback object at 0x7faf45fa8580>

    def increment(
        self,
        method=None,
        url=None,
        response=None,
        error=None,
        _pool=None,
        _stacktrace=None,
    ):
        """Return a new Retry object with incremented retry counters.
    
        :param response: A response object, or None, if the server did not
            return a response.
        :type response: :class:`~urllib3.response.HTTPResponse`
        :param Exception error: An error encountered during the request, or
            None if the response was received successfully.
    
        :return: A new ``Retry`` object.
        """
        if self.total is False and error:
            # Disabled, indicate to re-raise the error.
            raise six.reraise(type(error), error, _stacktrace)
    
        total = self.total
        if total is not None:
            total -= 1
    
        connect = self.connect
        read = self.read
        redirect = self.redirect
        status_count = self.status
        other = self.other
        cause = "unknown"
        status = None
        redirect_location = None
    
        if error and self._is_connection_error(error):
            # Connect retry?
            if connect is False:
                raise six.reraise(type(error), error, _stacktrace)
            elif connect is not None:
                connect -= 1
    
        elif error and self._is_read_error(error):
            # Read retry?
            if read is False or not self._is_method_retryable(method):
                raise six.reraise(type(error), error, _stacktrace)
            elif read is not None:
                read -= 1
    
        elif error:
            # Other retry?
            if other is not None:
                other -= 1
    
        elif response and response.get_redirect_location():
            # Redirect retry?
            if redirect is not None:
                redirect -= 1
            cause = "too many redirects"
            redirect_location = response.get_redirect_location()
            status = response.status
    
        else:
            # Incrementing because of a server error like a 500 in
            # status_forcelist and the given method is in the allowed_methods
            cause = ResponseError.GENERIC_ERROR
            if response and response.status:
                if status_count is not None:
                    status_count -= 1
                cause = ResponseError.SPECIFIC_ERROR.format(status_code=response.status)
                status = response.status
    
        history = self.history + (
            RequestHistory(method, url, error, status, redirect_location),
        )
    
        new_retry = self.new(
            total=total,
            connect=connect,
            read=read,
            redirect=redirect,
            status=status_count,
            other=other,
            history=history,
        )
    
        if new_retry.is_exhausted():
>           raise MaxRetryError(_pool, url, error or ResponseError(cause))
E           urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='minio', port=80): Max retries exceeded with url: /archive?location= (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7faf2332a450>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))

.../local/lib/python3.13.../urllib3/util/retry.py:594: MaxRetryError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link

github-actions bot commented Dec 19, 2024

✅ All tests successful. No failed tests were found.

📣 Thoughts on this report? Let Codecov know! | Powered by Codecov

tasks/upload.py Outdated
Comment on lines 660 to 662
arguments_list=list(chunk),
)
for chunk in itertools.batched(argument_list, CHUNK_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we still want to run these in batches, or rather one upload per task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one upload per task seems reasonable now that we aren't writing to the db in the processor


def test_test_analytics(dbsession, mocker, celery_app):
url = "literally/whatever"
storage_service = get_appropriate_storage_service(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you want to use the mock storage provider for this?

Comment on lines 110 to 116
mocker.patch.object(TAProcessorTask, "app", celery_app)
mocker.patch.object(TAFinisherTask, "app", celery_app)

hello = celery_app.register_task(ProcessFlakesTask())
_ = celery_app.tasks[hello.name]
goodbye = celery_app.register_task(CacheTestRollupsTask())
_ = celery_app.tasks[goodbye.name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never seen this pattern, what does it do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this, when the finisher would try to call those tasks, they weren't in the mocked celery app, so what i was trying to do here is add them to the mocked celery app

i replaced this with some code that is hopefully more clear

user-agent:
- Default
method: GET
uri: https://api.github.com/repos/ThiagoCodecov/example-python/commits/abf6d4df662c47e32460020ab14abf9303581429
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rather mock away whatever call does this request, instead of relying on vcr?

Comment on lines 254 to 256
for upload in uploads:
repo_flag_ids = get_repo_flag_ids(db_session, repoid, upload.flag_names)
if upload.state == "processed":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop has a couple of problems:

  • you are querying all uploads from the DB, but only ever run the code on processed ones
  • you only append to tests_to_write and friends, but never clear those across uploads
  • save_tests and friends runs for all the uploads, together with never clearing tests_to_write above means that you insert the same tests over and over again depending on how many total uploads you have
  • you unconditionally set state = "finished" for all the downloads, also ones that already have that state
  • the intermediate msgpack file is never cleared.

these 2 new tasks are meant to replace the test results processor and
test results finisher tasks

the difference between the tasks is that the new tasks:
- use the new parse_raw_upload function provided by the test results
  parser library
- instead of writing to the database in the processor, the finisher
  takes care of writing to the database
- the processor writes the results of its parsing and the finisher pulls
  from that
the upload task will check the new ta tasks feature flag to determine
whether to use the newly introduced ta processor and ta finisher tasks

we also call the new ta processor and ta finisher tasks via a chord
since we removed the concurrent db writes from the processors
also update test results parser version
we don't need to chunk them anymore and each upload can get its own
processing task
i want to introduce the new finished state in the test results upload
pipeline

to do so safely i'm adding the new v2_processed and v2_finished states

the reason for this is that the meaning of processed and v2_processed
are different

processed means that test instances are in the db and persisted
but in the new pipeline v2_finished has that meaning and v2_processed
just means that the intermediate results are in redis for now
@joseph-sentry
Copy link
Contributor Author

@Swatinem sorry i got confused and rebased and force pushed but i really just added 5 new commits on top of the existing ones and didn't modify any of the existing ones

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants