diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..42e0902 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,14 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true + +[Makefile] +indent_style = tab + +[*.{yml,yaml,json,js,ts}] +indent_size = 2 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..75f041c --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,36 @@ +on: + push: + branches: + - main + tags: + - "*" + + pull_request: + branches: + - "*" + workflow_dispatch: + +name: CI + +jobs: + ci: + name: Test + runs-on: "ubuntu-latest" + strategy: + matrix: + version: ["3.7", "3.8", "3.9", "3.10"] + fail-fast: false + + services: + elasticmq-native: + image: softwaremill/elasticmq-native + ports: + - 9324:9324 + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.version }} + - run: make install-dependencies + - run: make lint + - run: make test diff --git a/.gitignore b/.gitignore index 9ae6859..11a70c2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ __pycache__ build dist htmlcov +coverage.xml diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7130cc2 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +install-dependencies: + pip install -U pip + pip install .[dev] + +test: + pytest + + +### +# Lint section +### +_flake8: + @flake8 --show-source . + +_isort: + @isort --check-only . + +_black: + @black --diff --check . + +_isort_fix: + @isort . + +_black_fix: + @black . + + +lint: _flake8 _isort _black +format-code: _isort_fix _black_fix diff --git a/dramatiq_sqs/broker.py b/dramatiq_sqs/broker.py index d9a2b8f..55280b7 100644 --- a/dramatiq_sqs/broker.py +++ b/dramatiq_sqs/broker.py @@ -62,14 +62,15 @@ class SQSBroker(dramatiq.Broker): """ def __init__( - self, *, - namespace: Optional[str] = None, - middleware: Optional[List[dramatiq.Middleware]] = None, - retention: int = MAX_MESSAGE_RETENTION, - dead_letter: bool = False, - max_receives: int = MAX_RECEIVES, - tags: Optional[Dict[str, str]] = None, - **options, + self, + *, + namespace: Optional[str] = None, + middleware: Optional[List[dramatiq.Middleware]] = None, + retention: int = MAX_MESSAGE_RETENTION, + dead_letter: bool = False, + max_receives: int = MAX_RECEIVES, + tags: Optional[Dict[str, str]] = None, + **options, ) -> None: super().__init__(middleware=middleware) @@ -104,31 +105,21 @@ def declare_queue(self, queue_name: str) -> None: QueueName=prefixed_queue_name, Attributes={ "MessageRetentionPeriod": self.retention, - } + }, ) if self.tags: - self.sqs.meta.client.tag_queue( - QueueUrl=self.queues[queue_name].url, - Tags=self.tags - ) + self.sqs.meta.client.tag_queue(QueueUrl=self.queues[queue_name].url, Tags=self.tags) if self.dead_letter: dead_letter_queue_name = f"{prefixed_queue_name}_dlq" - dead_letter_queue = self.sqs.create_queue( - QueueName=dead_letter_queue_name - ) + dead_letter_queue = self.sqs.create_queue(QueueName=dead_letter_queue_name) if self.tags: - self.sqs.meta.client.tag_queue( - QueueUrl=dead_letter_queue.url, - Tags=self.tags - ) + self.sqs.meta.client.tag_queue(QueueUrl=dead_letter_queue.url, Tags=self.tags) redrive_policy = { "deadLetterTargetArn": dead_letter_queue.attributes["QueueArn"], - "maxReceiveCount": str(self.max_receives) + "maxReceiveCount": str(self.max_receives), } - self.queues[queue_name].set_attributes(Attributes={ - "RedrivePolicy": json.dumps(redrive_policy) - }) + self.queues[queue_name].set_attributes(Attributes={"RedrivePolicy": json.dumps(redrive_policy)}) self.emit_after("declare_queue", queue_name) def enqueue(self, message: dramatiq.Message, *, delay: Optional[int] = None) -> dramatiq.Message: @@ -146,7 +137,11 @@ def enqueue(self, message: dramatiq.Message, *, delay: Optional[int] = None) -> if len(encoded_message) > MAX_MESSAGE_SIZE: raise RuntimeError("Messages in SQS can be at most 256KiB large.") - self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) + self.logger.debug( + "Enqueueing message %r on queue %r.", + message.message_id, + queue_name, + ) self.emit_before("enqueue", message, delay) queue.send_message( MessageBody=encoded_message, @@ -181,20 +176,30 @@ def ack(self, message: "_SQSMessage") -> None: def requeue(self, messages: Iterable["_SQSMessage"]) -> None: for batch in chunk(messages, chunksize=10): # Re-enqueue batches of up to 10 messages. - send_response = self.queue.send_messages(Entries=[{ - "Id": str(i), - "MessageBody": message._sqs_message.body, - } for i, message in enumerate(batch)]) + send_response = self.queue.send_messages( + Entries=[ + { + "Id": str(i), + "MessageBody": message._sqs_message.body, + } + for i, message in enumerate(batch) + ] + ) # Then delete the ones that were successfully re-enqueued. # The rest will have to wait until their visibility # timeout expires. failed_message_ids = [int(res["Id"]) for res in send_response.get("Failed", [])] requeued_messages = [m for i, m in enumerate(batch) if i not in failed_message_ids] - self.queue.delete_messages(Entries=[{ - "Id": str(i), - "ReceiptHandle": message._sqs_message.receipt_handle, - } for i, message in enumerate(requeued_messages)]) + self.queue.delete_messages( + Entries=[ + { + "Id": str(i), + "ReceiptHandle": message._sqs_message.receipt_handle, + } + for i, message in enumerate(requeued_messages) + ] + ) self.message_refc -= len(requeued_messages) @@ -233,8 +238,7 @@ def __init__(self, sqs_message: Any, message: dramatiq.Message) -> None: def chunk(xs: Iterable[T], *, chunksize=10) -> Iterable[Sequence[T]]: - """Split a sequence into subseqs of chunksize length. - """ + """Split a sequence into subseqs of chunksize length.""" chunk = [] for x in xs: chunk.append(x) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5c4f573 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,33 @@ +[tool.black] +line-length = 120 +target-version = [ "py37" ] +include = "\\.pyi?$" +exclude = """ +( + /( + \\.eggs # exclude a few common directories in the + | \\.git # root of the project + | \\.hg + | \\.mypy_cache + | \\.tox + | \\.venv + | _build + | buck-out + | build + | dist + ) +) +""" + +[tool.isort] +profile = "black" +known_first_party = "dramatiq_sqs" +multi_line_output = 3 +use_parentheses = true +include_trailing_comma = true + +[tool.coverage.run] +branch = true + +[tool.coverage.report] +precision = 2 diff --git a/setup.cfg b/setup.cfg index 806cfff..6a75ecd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [tool:pytest] testpaths = tests -addopts = --cov dramatiq_sqs --cov-report html +addopts = --cov dramatiq_sqs --cov-report html --cov-report term-missing --cov-report xml [pep8] max-line-length = 120 diff --git a/setup.py b/setup.py index c25b2b8..3cd2ada 100644 --- a/setup.py +++ b/setup.py @@ -27,13 +27,14 @@ def rel(*xs): install_requires=["boto3", "dramatiq"], extras_require={ "dev": [ + "black", "bumpversion", - "flake8", "flake8-quotes", + "flake8", "isort", "mypy", - "pytest", "pytest-cov", + "pytest", "twine", ], }, diff --git a/tests/conftest.py b/tests/conftest.py index afd5441..2b48c2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,8 +4,7 @@ import dramatiq import pytest -from dramatiq.middleware import (AgeLimit, Callbacks, Pipelines, Retries, - TimeLimit) +from dramatiq.middleware import AgeLimit, Callbacks, Pipelines, Retries, TimeLimit from dramatiq_sqs import SQSBroker @@ -18,13 +17,18 @@ @pytest.fixture def broker(): broker = SQSBroker( + endpoint_url="http://127.0.0.1:9324", + region_name="elasticmq", + aws_secret_access_key="x", + aws_access_key_id="x", + use_ssl=False, namespace="dramatiq_sqs_tests", middleware=[ AgeLimit(), TimeLimit(), Callbacks(), Pipelines(), - Retries(min_backoff=1000, max_backoff=900000, max_retries=96), + Retries(min_backoff=1, max_backoff=900000, max_retries=2), ], tags={ "owner": "dramatiq_sqs_tests", diff --git a/tests/test_broker.py b/tests/test_broker.py index 1597140..37f8b23 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -135,6 +135,11 @@ def test_creates_dead_letter_queue(): namespace="dramatiq_sqs_tests", dead_letter=True, max_receives=20, + endpoint_url="http://127.0.0.1:9324", + region_name="elasticmq", + aws_secret_access_key="x", + aws_access_key_id="x", + use_ssl=False, ) # And I've stubbed out all the relevant API calls @@ -142,15 +147,21 @@ def test_creates_dead_letter_queue(): stubber.add_response("create_queue", {"QueueUrl": ""}) stubber.add_response("create_queue", {"QueueUrl": ""}) stubber.add_response("get_queue_attributes", {"Attributes": {"QueueArn": "dlq"}}) - stubber.add_response("set_queue_attributes", {}, { - "QueueUrl": "", - "Attributes": { - "RedrivePolicy": json.dumps({ - "deadLetterTargetArn": "dlq", - "maxReceiveCount": "20" - }) - } - }) + stubber.add_response( + "set_queue_attributes", + {}, + { + "QueueUrl": "", + "Attributes": { + "RedrivePolicy": json.dumps( + { + "deadLetterTargetArn": "dlq", + "maxReceiveCount": "20", + } + ) + }, + }, + ) # When I create a queue # Then a dead-letter queue should be created @@ -164,19 +175,22 @@ def test_tags_queues_on_create(): # Given that I have an SQS broker with tags broker = SQSBroker( namespace="dramatiq_sqs_tests", - tags={"key1": "value1", "key2": "value2"} + tags={"key1": "value1", "key2": "value2"}, + endpoint_url="http://127.0.0.1:9324", + region_name="elasticmq", + aws_secret_access_key="x", + aws_access_key_id="x", + use_ssl=False, ) # And I've stubbed out all the relevant API calls stubber = Stubber(broker.sqs.meta.client) stubber.add_response("create_queue", {"QueueUrl": ""}) - stubber.add_response("tag_queue", {}, { - "QueueUrl": "", - "Tags": { - "key1": "value1", - "key2": "value2" - } - }) + stubber.add_response( + "tag_queue", + {}, + {"QueueUrl": "", "Tags": {"key1": "value1", "key2": "value2"}}, + ) # When I create a queue # Then the queue should have the specified tags diff --git a/tests/test_utils.py b/tests/test_utils.py index c102015..a31157b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,5 +16,5 @@ def test_chunk_can_split_iterators_into_chunks(): [6, 7], [8, 9], [10, 11], - [12] + [12], ]