Skip to content

Latest commit

 

History

History
288 lines (218 loc) · 7.79 KB

chapter_11_cqrs.asciidoc

File metadata and controls

288 lines (218 loc) · 7.79 KB

Command-Query Responsibility Separation (CQRS)

In this chapter
  • We’ll discuss the different needs of reads and writes in our system.

  • We’ll show how separating readers and writes can simplify our code and improve performance.

  • We’ll talk about advanced patterns for building scalable applications.

    // DIAGRAM GOES HERE
Note
placeholder chapter, under construction

Always Redirect After a POST?

The API returns information from the post request and that’s bad, arguably.

Let’s have an endpoint to go and get the updated state instead:

Example 1. API test does a GET after the POST (tests/e2e/test_api.py)
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated():
    orderid = random_orderid()
    sku, othersku = random_sku(), random_sku('other')
    batch1, batch2, batch3 = random_batchref(1), random_batchref(2), random_batchref(3)
    api_client.post_to_add_batch(batch1, sku, 100, '2011-01-02')
    api_client.post_to_add_batch(batch2, sku, 100, '2011-01-01')
    api_client.post_to_add_batch(batch3, othersku, 100, None)

    r = api_client.post_to_allocate(orderid, sku, qty=3)
    assert r.status_code == 202

    r = api_client.get_allocation(orderid)
    assert r.ok
    assert r.json() == [
        {'sku': sku, 'batchref': batch2},
    ]


@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_unhappy_path_returns_400_and_error_message():
    unknown_sku, orderid = random_sku(), random_orderid()
    r = api_client.post_to_allocate(
        orderid, unknown_sku, qty=20, expect_success=False,
    )
    assert r.status_code == 400
    assert r.json()['message'] == f'Invalid sku {unknown_sku}'

    r = api_client.get_allocation(orderid)
    assert r.status_code == 404

OK what might the flask app look like?

Example 2. Endpoint for viewing allocations (src/allocation/flask_app.py)
@app.route("/allocations/<orderid>", methods=['GET'])
def allocations_view_endpoint(orderid):
    uow = unit_of_work.SqlAlchemyUnitOfWork()
    result = views.allocations(orderid, uow)
    if not result:
        return 'not found', 404
    return jsonify(result), 200

Hold on to Your Lunch Folks.

All right, a views.py, fair enough, we can keep read-only stuff in there, and it’ll be a real views.py, not like Django’s…​

Example 3. Views do…​ raw sql??? (src/allocation/views.py)
from allocation import unit_of_work

def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
    with uow:
        results = list(uow.session.execute(
            'SELECT ol.sku, b.reference'
            ' FROM allocations AS a'
            ' JOIN batches AS b ON a.batch_id = b.id'
            ' JOIN order_lines AS ol ON a.orderline_id = ol.id'
            ' WHERE ol.orderid = :orderid',
            dict(orderid=orderid)
        ))
        print('results', results, flush=True)
    return [{'sku': sku, 'batchref': batchref} for sku, batchref in results]

WHAT THE ACTUAL F? ARE YOU GUYS TRIPPING F-ING BALLS?

Yes. yes we are. Obviously don’t do this. Unless you really need to. Now, allow us to explain some possible places where this total insanity might make a shred of sense.

  • Link to CQRS paper

  • SELECT N+1

btw you can test this stuff. note that it can’t be unit tested, because it needs a real db, it’s an integration test! Just another anti-feather in the anti-cap of this total anti-pattern.

Example 4. An integration test for a view (tests/integration/test_views.py)
from datetime import date
from allocation import commands, unit_of_work, messagebus, views


def test_allocations_view(sqlite_session_factory):
    uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
    messagebus.handle(commands.CreateBatch('b1', 'sku1', 50, None), uow)
    messagebus.handle(commands.CreateBatch('b2', 'sku2', 50, date.today()), uow)
    messagebus.handle(commands.Allocate('o1', 'sku1', 20), uow)
    messagebus.handle(commands.Allocate('o1', 'sku2', 20), uow)

    assert views.allocations('o1', uow) == [
        {'sku': 'sku1', 'batchref': 'b1'},
        {'sku': 'sku2', 'batchref': 'b2'},
    ]

Doubling Down on the Madness.

that hardcoded sql query is pretty ugly right? what if we made it nicer by keeping a totally separate, denormalised datastore for our view model?

Horrifying, right? wait 'til we tell you we’re not even going to use postgres or triggers or anything known and reliable and boring like that to keep it up to date. We’re going to use our amazing event-driven architecture! That’s right! may as well join the cult and start drinking folks, the ship is made of cardboard and the captains are crazy and there’s nothing you can do to stop them.

Example 5. A much nicer query (src/allocation/views.py)
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
    with uow:
        results = list(uow.session.execute(
            'SELECT sku, batchref FROM allocations_view WHERE orderid = :orderid',
            dict(orderid=orderid)
        ))
        ...

Here’s our table. Hee hee hee, no foreign keys, just strings, yolo

Example 6. A very simple table (src/allocation/orm.py)
allocations_view = Table(
    'allocations_view', metadata,
    Column('orderid', String(255)),
    Column('sku', String(255)),
    Column('batchref', String(255)),
)

We add a second handler to the Allocated event:

Example 7. Allocated event gets a new handler (src/allocation/messagebus.py)
EVENT_HANDLERS = {
    events.Allocated: [
        handlers.publish_allocated_event,
        handlers.add_allocation_to_read_model
    ],

Here’s what our update-view-model code looks like:

Example 8. Update on allocation (src/allocation/handlers.py)
def add_allocation_to_read_model(
        event: events.Allocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
    with uow:
        uow.session.execute(
            'INSERT INTO allocations_view (orderid, sku, batchref)'
            ' VALUES (:orderid, :sku, :batchref)',
            dict(orderid=event.orderid, sku=event.sku, batchref=event.batchref)
        )
        uow.commit()

And it’ll work!

(OK you’ll also need to handle deallocated:)

Example 9. A second listener for read model updates
events.Deallocated: [
    handlers.remove_allocation_from_read_model,
    handlers.reallocate
],

...

def remove_allocation_from_read_model(
        event: events.Deallocated, uow: unit_of_work.SqlAlchemyUnitOfWork,
):
    with uow:
        uow.session.execute(
            'DELETE FROM allocations_view '
            ' WHERE orderid = :orderid AND sku = :sku',

But Whyyyyyyy?

OK. horrible, right? But also, kinda, surprisingly nice, considering? Our events and message bus give us a really nice place to do this sort of stuff, if we need to.

And think how easy it’d be to swap our read model from postgres to redis? super-simple. We don’t even need to change the integration test.

TODO: demo this.

So definitely don’t do this. ever. But, if you do need to, see how easy the event-driven model makes it?

OK. On that note, let’s sally forth into our final chapter.