Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update publish_subscribe.md #101

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 31 additions & 35 deletions messaging/publish_subscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,59 @@ To accomplish this, an intermediary, called a "message broker" or "event bus",
receives published messages, and then routes them on to subscribers.


There are three components **messages**, **topics**, **users**.
There are three components **messages**, **topics**, **subscriptions**.

```go
type Message struct {
// Contents
}


type Subscription struct {
ch chan<- Message

Inbox chan Message
closed bool
inbox chan Message
}

func (s *Subscription) Publish(msg Message) error {
if _, ok := <-s.ch; !ok {
return errors.New("Topic has been closed")
func (s *Subscription) Next() (Message, error) {
if s.closed {
return Message{}, errors.New("subscription closed")
}

m, ok := <-s.inbox
if !ok {
return Message{}, errors.New("subscription closed")
}

return m, nil
}

s.ch <- msg

return nil
func (s *Subscription) Unsubscribe(Subscription) error {
s.closed = true
close(s.inbox)
}
```

```go
type Topic struct {
Subscribers []Session
Subscribers []Subscription
MessageHistory []Message
}

func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
// Get session and create one if it's the first

// Add session to the Topic & MessageHistory

// Create a subscription
func (t *Topic) Subscribe() (Subscription) {
return Subscription{inbox: make(chan Message)}
}

func (t *Topic) Unsubscribe(Subscription) error {
// Implementation
}

func (t *Topic) Delete() error {
// Implementation
}
```

```go
type User struct {
ID uint64
Name string
}
func (t *Topic) Publish(msg Message) error {
for _, sub := range t.Subscribers {
if sub.closed {
continue
}

go func() {
sub.inbox <- msg
}()
}

type Session struct {
User User
Timestamp time.Time
return nil
}
```

Expand Down