Skip to content

Commit

Permalink
Split up features in separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin-b committed Sep 18, 2023
1 parent 58721af commit b198a59
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 243 deletions.
34 changes: 34 additions & 0 deletions pytest_httpx/_httpx_internals.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
from typing import (
Union,
Dict,
Expand All @@ -6,8 +7,10 @@
Iterable,
AsyncIterator,
Iterator,
Optional,
)

import httpcore
import httpx

# TODO Get rid of this internal import
Expand Down Expand Up @@ -36,3 +39,34 @@ async def __aiter__(self) -> AsyncIterator[bytes]:

AsyncIteratorByteStream.__init__(self, stream=Stream())
IteratorByteStream.__init__(self, stream=Stream())


def _to_httpx_url(url: httpcore.URL, headers: list[tuple[bytes, bytes]]) -> httpx.URL:
for name, value in headers:
if b"Proxy-Authorization" == name:
return httpx.URL(
scheme=url.scheme.decode(),
host=url.host.decode(),
port=url.port,
raw_path=url.target,
userinfo=base64.b64decode(value[6:]),
)

return httpx.URL(
scheme=url.scheme.decode(),
host=url.host.decode(),
port=url.port,
raw_path=url.target,
)


def _proxy_url(
real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport]
) -> Optional[httpx.URL]:
if isinstance(real_transport, httpx.HTTPTransport):
if isinstance(real_pool := real_transport._pool, httpcore.HTTPProxy):
return _to_httpx_url(real_pool._proxy_url, real_pool._proxy_headers)

if isinstance(real_transport, httpx.AsyncHTTPTransport):
if isinstance(real_pool := real_transport._pool, httpcore.AsyncHTTPProxy):
return _to_httpx_url(real_pool._proxy_url, real_pool._proxy_headers)
253 changes: 10 additions & 243 deletions pytest_httpx/_httpx_mock.py
Original file line number Diff line number Diff line change
@@ -1,233 +1,21 @@
import base64
import copy
import inspect
import json
import re
from typing import List, Union, Optional, Callable, Tuple, Pattern, Any, Dict, Awaitable
from typing import Union, Optional, Callable, Any, Awaitable

import httpcore
import httpx

from pytest_httpx import _httpx_internals


def _to_httpx_url(url: httpcore.URL, headers: list[tuple[bytes, bytes]]) -> httpx.URL:
for name, value in headers:
if b"Proxy-Authorization" == name:
return httpx.URL(
scheme=url.scheme.decode(),
host=url.host.decode(),
port=url.port,
raw_path=url.target,
userinfo=base64.b64decode(value[6:]),
)

return httpx.URL(
scheme=url.scheme.decode(),
host=url.host.decode(),
port=url.port,
raw_path=url.target,
)


def _url_match(
url_to_match: Union[Pattern[str], httpx.URL], received: httpx.URL
) -> bool:
if isinstance(url_to_match, re.Pattern):
return url_to_match.match(str(received)) is not None

# Compare query parameters apart as order of parameters should not matter
received_params = dict(received.params)
params = dict(url_to_match.params)

# Remove the query parameters from the original URL to compare everything besides query parameters
received_url = received.copy_with(query=None)
url = url_to_match.copy_with(query=None)

return (received_params == params) and (url == received_url)


def _request_description(
real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport],
request: httpx.Request,
expected_headers: set[bytes],
expect_body: bool,
expect_proxy: bool,
) -> str:
request_description = f"{request.method} request on {request.url}"
if extra_description := extra_request_description(
real_transport, request, expected_headers, expect_body, expect_proxy
):
request_description += f" with {extra_description}"
return request_description


def _proxy_url(
real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport]
) -> Optional[httpx.URL]:
if isinstance(real_transport, httpx.HTTPTransport):
if isinstance(real_pool := real_transport._pool, httpcore.HTTPProxy):
return _to_httpx_url(real_pool._proxy_url, real_pool._proxy_headers)

if isinstance(real_transport, httpx.AsyncHTTPTransport):
if isinstance(real_pool := real_transport._pool, httpcore.AsyncHTTPProxy):
return _to_httpx_url(real_pool._proxy_url, real_pool._proxy_headers)


def extra_request_description(
real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport],
request: httpx.Request,
expected_headers: set[bytes],
expect_body: bool,
expect_proxy: bool,
):
extra_description = []

if expected_headers:
headers_encoding = request.headers.encoding
present_headers = {}
# Can be cleaned based on the outcome of https://github.com/encode/httpx/discussions/2841
for name, lower_name, value in request.headers._list:
if lower_name in expected_headers:
name = name.decode(headers_encoding)
if name in present_headers:
present_headers[name] += f", {value.decode(headers_encoding)}"
else:
present_headers[name] = value.decode(headers_encoding)

extra_description.append(f"{present_headers} headers")

if expect_body:
extra_description.append(f"{request.read()} body")

if expect_proxy:
extra_description.append(f"{_proxy_url(real_transport)} proxy URL")

return " and ".join(extra_description)


class _RequestMatcher:
def __init__(
self,
url: Optional[Union[str, Pattern[str], httpx.URL]] = None,
method: Optional[str] = None,
proxy_url: Optional[Union[str, Pattern[str], httpx.URL]] = None,
match_headers: Optional[Dict[str, Any]] = None,
match_content: Optional[bytes] = None,
match_json: Optional[Any] = None,
):
self.nb_calls = 0
self.url = httpx.URL(url) if url and isinstance(url, str) else url
self.method = method.upper() if method else method
self.headers = match_headers
if match_content is not None and match_json is not None:
raise ValueError(
"Only one way of matching against the body can be provided. If you want to match against the JSON decoded representation, use match_json. Otherwise, use match_content."
)
self.content = match_content
self.json = match_json
self.proxy_url = (
httpx.URL(proxy_url)
if proxy_url and isinstance(proxy_url, str)
else proxy_url
)

def match(
self,
real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport],
request: httpx.Request,
) -> bool:
return (
self._url_match(request)
and self._method_match(request)
and self._headers_match(request)
and self._content_match(request)
and self._proxy_match(real_transport)
)

def _url_match(self, request: httpx.Request) -> bool:
if not self.url:
return True

return _url_match(self.url, request.url)

def _method_match(self, request: httpx.Request) -> bool:
if not self.method:
return True

return request.method == self.method

def _headers_match(self, request: httpx.Request) -> bool:
if not self.headers:
return True

encoding = request.headers.encoding
request_headers = {}
# Can be cleaned based on the outcome of https://github.com/encode/httpx/discussions/2841
for raw_name, raw_value in request.headers.raw:
if raw_name in request_headers:
request_headers[raw_name] += b", " + raw_value
else:
request_headers[raw_name] = raw_value

return all(
request_headers.get(header_name.encode(encoding))
== header_value.encode(encoding)
for header_name, header_value in self.headers.items()
)

def _content_match(self, request: httpx.Request) -> bool:
if self.content is None and self.json is None:
return True
if self.content is not None:
return request.read() == self.content
try:
# httpx._content.encode_json hard codes utf-8 encoding.
return json.loads(request.read().decode("utf-8")) == self.json
except json.decoder.JSONDecodeError:
return False

def _proxy_match(
self, real_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport]
) -> bool:
if not self.proxy_url:
return True

if real_proxy_url := _proxy_url(real_transport):
return _url_match(self.proxy_url, real_proxy_url)

return False

def __str__(self) -> str:
matcher_description = f"Match {self.method or 'all'} requests"
if self.url:
matcher_description += f" on {self.url}"
if extra_description := self._extra_description():
matcher_description += f" with {extra_description}"
return matcher_description

def _extra_description(self) -> str:
extra_description = []

if self.headers:
extra_description.append(f"{self.headers} headers")
if self.content is not None:
extra_description.append(f"{self.content} body")
if self.json is not None:
extra_description.append(f"{self.json} json body")
if self.proxy_url:
extra_description.append(f"{self.proxy_url} proxy URL")

return " and ".join(extra_description)
from pytest_httpx._pretty_print import RequestDescription
from pytest_httpx._request_matcher import _RequestMatcher


class HTTPXMock:
def __init__(self) -> None:
self._requests: List[
Tuple[Union[httpx.BaseTransport, httpx.AsyncBaseTransport], httpx.Request]
self._requests: list[
tuple[Union[httpx.BaseTransport, httpx.AsyncBaseTransport], httpx.Request]
] = []
self._callbacks: List[
Tuple[
self._callbacks: list[
tuple[
_RequestMatcher,
Callable[
[httpx.Request],
Expand Down Expand Up @@ -379,31 +167,10 @@ def _explain_that_no_response_was_found(
request: httpx.Request,
) -> str:
matchers = [matcher for matcher, _ in self._callbacks]
headers_encoding = request.headers.encoding
expected_headers = set(
[
# httpx uses lower cased header names as internal key
header.lower().encode(headers_encoding)
for matcher in matchers
if matcher.headers
for header in matcher.headers
]
)
expect_body = any(
[
matcher.content is not None or matcher.json is not None
for matcher in matchers
]
)
expect_proxy = any([matcher.proxy_url is not None for matcher in matchers])

request_description = _request_description(
real_transport, request, expected_headers, expect_body, expect_proxy
)
message = f"No response can be found for {RequestDescription(real_transport, request, matchers)}"

matchers_description = "\n".join([str(matcher) for matcher in matchers])

message = f"No response can be found for {request_description}"
if matchers_description:
message += f" amongst:\n{matchers_description}"

Expand Down Expand Up @@ -440,7 +207,7 @@ def _get_callback(
matcher.nb_calls += 1
return callback

def get_requests(self, **matchers: Any) -> List[httpx.Request]:
def get_requests(self, **matchers: Any) -> list[httpx.Request]:
"""
Return all requests sent that match (empty list if no requests were matched).
Expand Down Expand Up @@ -491,7 +258,7 @@ def reset(self, assert_all_responses_were_requested: bool) -> None:
not not_called
), f"The following responses are mocked but not requested:\n{matchers_description}"

def _reset_callbacks(self) -> List[_RequestMatcher]:
def _reset_callbacks(self) -> list[_RequestMatcher]:
callbacks_not_executed = [
matcher for matcher, _ in self._callbacks if not matcher.nb_calls
]
Expand Down
Loading

0 comments on commit b198a59

Please sign in to comment.