Skip to content

Commit

Permalink
Merge pull request #26 from tarasko/feature/extra_headers
Browse files Browse the repository at this point in the history
Feature/extra headers
  • Loading branch information
tarasko authored Nov 26, 2024
2 parents 4151842 + d3464a3 commit ee0c11d
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 126 deletions.
38 changes: 38 additions & 0 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ Classes

Request headers. Keys are case insensitive

.. autoclass:: WSUpgradeResponse
:members:

.. py:attribute:: version
:type: bytes

HTTP version. For example b"HTTP/1.1"

.. py:attribute:: status
:type: http.HTTPStatus

HTTP response status enum value. For example: HTTPStatus.SWITCHING_PROTOCOLS

.. py:attribute:: headers
:type: CIMultiDict[str, str]

Response headers. Keys are case insensitive

.. py:attribute:: body
:type: bytes

Optional response body. Can be non-empty in case of errors

.. autoclass:: WSUpgradeResponseWithListener
:members:

.. py:method:: __init__(response: WSUpgradeResponse, listener: Optional[WSListener])
.. autoclass:: WSListener
:members:

Expand All @@ -117,6 +145,16 @@ Classes

Please don't use it to send data. Use only WSTransport.send_* methods to send frames.

.. py:attribute:: request
:type: WSUpgradeRequest

Opening handshake request.

.. py:attribute:: response
:type: WSUpgradeResponse

Opening handshake response.

.. py:method:: send_reuse_external_buffer(WSMsgType msg_type, char* msg_ptr, size_t msg_size, bint fin=True, bint rsv1=False)
**Available only from Cython.**
Expand Down
6 changes: 5 additions & 1 deletion examples/echo_client_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ async def picows_main(endpoint: str, msg: bytes, duration: int, ssl_context):
print(f"Run picows python {cl_type} client")

class PicowsClientListener(WSListener):
_transport: WSTransport
_start_time: float
_cnt: int

def __init__(self):
super().__init__()

Expand Down Expand Up @@ -113,7 +117,7 @@ async def aiohttp_main(url: str, data: bytes, duration: int, ssl_context):


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Publish updates to telegram subscribers",
parser = argparse.ArgumentParser(description="Benchmark for the various websocket clients",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--host", default="127.0.0.1", help="Server host")
parser.add_argument("--plain-port", default="9001", help="Server port with plain websockets")
Expand Down
4 changes: 4 additions & 0 deletions picows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
WSTransport,
WSListener,
WSUpgradeRequest,
WSUpgradeResponse,
WSUpgradeResponseWithListener,
ws_connect,
ws_create_server,
PICOWS_DEBUG_LL
Expand All @@ -22,6 +24,8 @@
'WSTransport',
'WSListener',
'WSUpgradeRequest',
'WSUpgradeResponse',
'WSUpgradeResponseWithListener',
'ws_connect',
'ws_create_server',
'PICOWS_DEBUG_LL'
Expand Down
25 changes: 20 additions & 5 deletions picows/picows.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ cdef class WSUpgradeRequest:
readonly object headers # CIMultiDict[istr, str]


cdef class WSUpgradeResponse:
cdef:
readonly bytes version
readonly object status # HTTPStatus
readonly object headers # CIMultiDict[istr, str]
readonly bytes body

cdef bytearray to_bytes(self)


cdef class WSUpgradeResponseWithListener:
cdef:
readonly WSUpgradeResponse response
readonly WSListener listener


cdef class WSFrame:
cdef:
char* payload_ptr
Expand All @@ -87,6 +103,8 @@ cdef class WSTransport:
readonly object underlying_transport #: asyncio.Transport
readonly bint is_client_side
readonly bint is_secure
readonly WSUpgradeRequest request
readonly WSUpgradeResponse response

bint auto_ping_expect_pong
object pong_received_at_future
Expand All @@ -106,11 +124,8 @@ cdef class WSTransport:
cpdef disconnect(self, bint graceful=*)
cpdef notify_user_specific_pong_received(self)

cdef inline _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64, bytes user_agent_header, dict extra_headers)
cdef inline _send_http_handshake_response(self, bytes accept_val)
cdef inline _send_bad_request(self, str error)
cdef inline _send_not_found(self)
cdef inline _send_internal_server_error(self, str error)
cdef inline _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64, object extra_headers)
cdef inline _send_http_handshake_response(self, WSUpgradeResponse response, bytes accept_val)
cdef inline _mark_disconnected(self)
cdef inline _try_native_write_then_transport_write(self, char * ptr, Py_ssize_t sz)

Expand Down
112 changes: 96 additions & 16 deletions picows/picows.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import asyncio
from enum import Enum
from ssl import SSLContext
from collections.abc import Callable, Iterable
from typing import Final
from http import HTTPStatus

# Some of the imports are deprecated in the newer python versions
# But we still have support for 3.8 where collection.abc didn't have
# proper types yet.
from typing import Final, Optional, Mapping, Iterable, Tuple, Callable, Union
from multidict import CIMultiDict


PICOWS_DEBUG_LL: Final = 9
WSHeadersLike = Union[Mapping[str, str], Iterable[Tuple[str, str]]]
WSServerListenerFactory = Callable[[WSUpgradeRequest], Union[WSListener, WSUpgradeResponseWithListener, None]]
WSBuffer = Union[bytes, bytearray, memoryview]


class WSError(RuntimeError): ...


class WSMsgType(Enum):
CONTINUATION = 0x0
TEXT = 0x1
Expand All @@ -16,6 +27,7 @@ class WSMsgType(Enum):
PONG = 0xA
CLOSE = 0x8


class WSCloseCode(Enum):
NO_INFO = 0
OK = 1000
Expand All @@ -32,36 +44,69 @@ class WSCloseCode(Enum):
TRY_AGAIN_LATER = 1013
BAD_GATEWAY = 1014


class WSAutoPingStrategy(Enum):
PING_WHEN_IDLE = 1
PING_PERIODICALLY = 2


class WSFrame:
@property
def tail_size(self) -> int: ...

@property
def msg_type(self) -> WSMsgType: ...

@property
def fin(self) -> bool: ...

@property
def rsv1(self) -> bool: ...

@property
def last_in_buffer(self) -> bool: ...

def get_payload_as_bytes(self) -> bytes: ...
def get_payload_as_utf8_text(self) -> str: ...
def get_payload_as_ascii_text(self) -> str: ...
def get_payload_as_memoryview(self) -> object: ...
def get_payload_as_memoryview(self) -> memoryview: ...
def get_close_code(self) -> WSCloseCode: ...
def get_close_message(self) -> bytes: ...
def __str__(self): ...


class WSTransport:
def __init__(self, is_client_side: bool, underlying_transport, logger, loop): ...
@property
def underlying_transport(self) -> asyncio.Transport: ...

@property
def is_client_side(self) -> bool: ...

@property
def is_secure(self) -> bool: ...

@property
def request(self) -> WSUpgradeRequest: ...

@property
def response(self) -> WSUpgradeResponse: ...

def send(
self,
msg_type: WSMsgType,
message,
message: Optional[WSBuffer],
fin: bool = True,
rsv1: bool = False,
): ...
def send_ping(self, message=None): ...
def send_pong(self, message=None): ...
def send_close(self, close_code: WSCloseCode = ..., close_message=None): ...
def send_ping(self, message: Optional[WSBuffer]=None): ...
def send_pong(self, message: Optional[WSBuffer]=None): ...
def send_close(self, close_code: WSCloseCode = ..., close_message: Optional[WSBuffer]=None): ...
def disconnect(self, graceful: bool = True): ...
async def wait_disconnected(self): ...
async def measure_roundtrip_time(self, rounds: int) -> list[float]: ...
def notify_user_specific_pong_received(self): ...


class WSListener:
def on_ws_connected(self, transport: WSTransport): ...
def on_ws_frame(self, transport: WSTransport, frame: WSFrame): ...
Expand All @@ -71,13 +116,47 @@ class WSListener:
def pause_writing(self): ...
def resume_writing(self): ...

class WSUpgradeRequest: ...

class WSUpgradeRequest:
@property
def method(self) -> bytes: ...

@property
def path(self) -> bytes: ...

@property
def version(self) -> bytes: ...

@property
def headers(self) -> CIMultiDict: ...


class WSUpgradeResponse:
@staticmethod
def create_error_response(status: Union[int, HTTPStatus], body=None, extra_headers: Optional[WSHeadersLike]=None): ...

@staticmethod
def create_101_response(extra_headers: Optional[WSHeadersLike]=None): ...

@property
def version(self) -> bytes: ...

@property
def status(self) -> HTTPStatus: ...

@property
def headers(self) -> CIMultiDict: ...


class WSUpgradeResponseWithListener:
def __init__(self, response: WSUpgradeResponse, listener: Optional[WSListener]): ...


async def ws_connect(
ws_listener_factory: Callable[[], WSListener],
url: str,
*,
ssl_context: SSLContext | None = None,
ssl_context: Union[SSLContext, None] = None,
disconnect_on_exception: bool = True,
websocket_handshake_timeout=5,
logger_name: str = "client",
Expand All @@ -86,14 +165,15 @@ async def ws_connect(
auto_ping_reply_timeout: float = 10,
auto_ping_strategy: WSAutoPingStrategy = ...,
enable_auto_pong: bool = True,
user_agent_header: str | None = None,
extra_headers: dict | None = None,
extra_headers: Optional[WSHeadersLike] = None,
**kwargs,
) -> tuple[WSTransport, WSListener]: ...
) -> Tuple[WSTransport, WSListener]: ...


async def ws_create_server(
ws_listener_factory: Callable[[WSUpgradeRequest], WSListener | None],
host: str | Iterable[str] | None = None,
port: int | None = None,
ws_listener_factory: WSServerListenerFactory,
host: Union[str, Iterable[str], None] = None,
port: Union[int, None] = None,
*,
disconnect_on_exception: bool = True,
websocket_handshake_timeout: int = 5,
Expand Down
Loading

0 comments on commit ee0c11d

Please sign in to comment.