Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplication of the subscriptions #215

Open
gedw99 opened this issue May 24, 2024 · 5 comments
Open

Deduplication of the subscriptions #215

gedw99 opened this issue May 24, 2024 · 5 comments

Comments

@gedw99
Copy link

gedw99 commented May 24, 2024

Here is an example setup.

You have a 3 node corrosion running and 3 golang servers subscribing using https://superfly.github.io/corrosion/api/subscriptions.html

If , in the golang servers, the data feed from corrosion is something that does , for example, sending an email, how can I ensure only 1 email is sent instead of 3 ?

This is why I called the issue de-duplication, as it’s the closest name for this problem space.

I am looking for architecture advice and to see if we can use corrosion for this type of work pattern.

I guess I could slap in a nats Jetstream queue that ensures only 1 email job runs, but I don’t want to as it’s more moving parts :) Less is more :)

@gedw99 gedw99 changed the title Deduplication off the subscriptions Deduplication of the subscriptions May 24, 2024
@jeromegn
Copy link
Member

Exactly-once delivery is pretty hard to achieve in distributed systems.

There are ways around the issue, but there are caveats!

You could do consistent hashing the same node always processes it's own partition of emails. However, if you change the number of nodes ingesting the subscription, you'll have to update the logic to process updates so new nodes get to send emails too.

If you have numerical, unique, IDs, then you could try SQLite's mod() math function:

  • Assign an index to each node from 0 to len - 1.
  • Each node subscribes to the same query, with an added WHERE clause: mod(numerical_id, <node_count> = <node_index>. For node idx 2 (the 3rd and last node) it would look like: WHERE .. AND mod(id, 3) = 2.

This would have the effect of creating different subscriptions for each client node (your golang servers). It has the same issue as consistent hashing where you need to change the query if you add or remove nodes. This only really works if you precisely control the golang nodes and can assign them indexes without gaps between 0 and len - 1.

These are just ideas. It would probably be best to put the subscription in a proper queue and consume that queue.

@gedw99
Copy link
Author

gedw99 commented May 25, 2024

Thank you @jeromegn

I appreciate the response.

it’s funny but at the end of the day we both sort of hit the same old thing. Use a queue.

Imagine you have a golang server farm with a GUI written in htmx. Every server would try to push html to the browser , as a result of a subscription feed. The current user is only connected over SSE to one user , so it would be du-dipped by the fact that the human is on one device at a time. The other golang servers would do nothing with the subscription feed in this case.

Complex stuff .

Corrosion could ideally model subscriptions with an ACK. Under the hood, it would save that the subscription sequence was handled and distribute that to the other corrosion servers. There is a timing race condition here of course so it would again be “ once or more “ queue semantics.

Either way all devs using corrosion will hit this problem .

Does the rust client for corrosion handle this problem using hashes etc ?

@gedw99
Copy link
Author

gedw99 commented May 25, 2024

Nats Jetstream using its native de-duplication looks like a solution I will try

https://nats.io/blog/new-per-subject-discard-policy/

I still need to write a golang client for corrosion to easily digest the corrosion stream . I raised another issue about other client , like golang , in this repo issues list

@jeromegn
Copy link
Member

I think that's a hard problem that Corrosion is not well suited to solve. It's a strongly eventually consistent database. It can't offer the kinds of guarantees.

You can't use Corrosion as a queue if you need exactly once guarantees. The pub/sub functionality is designed so that every client gets to the same state eventually. You do need a different system if you need some kind of "locking" to make sure an entry is processed only once.

If you do need to achieve this, then I suggest using a single client to send the emails. That would require something like leader election if you need it to be resilient.

@gedw99
Copy link
Author

gedw99 commented May 28, 2024

I agree fully, and I appreciate the brain storm.

I need exactly-once processing. https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#exactly-once-semantics

I will take the output from the Corrosion subscription and feed it into NATS.

So it is a combination of de-duplication by the server when receiving a published message as well as a double ack call by the subscription that had received the message (plus retries if necessary).

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}

Pushing that then to Web Clients uses Out of Order streaming based on DSD Web Components ( zero javascript needed ). SO the GUI just hangs waiting for more data.

https://scottnath.com/blahg/profile-components--dsd/

A Transaction from the GUI goes to NATS and then Corrosion. Any other Web component fragment just updates off the subscription at the same time. SO PUSH and PULL are 100% decoupled. A new Page load is just a corrosion Query, and if I over subscribe then the GUI hides it as it does not care about over fetching.

So the Web GUI is an exact mirror of the DB. Kind of like the old "Naked Objects" manifesto.
If I want to change the GUI, I actually change the SQL Data Schema.

Still need to test live data schema changes with CRDT. I read the way it works. its freaky ingenious...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants