Skip to content

Commit

Permalink
Learning Golang: Pub/Sub Concurrency Pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
MarioCarrion committed Oct 21, 2023
1 parent 609b7c2 commit 30565d9
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
21 changes: 21 additions & 0 deletions 2023/concurrency-pattern-publisher-subscriber/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Learning Go: Concurrency Pattern Publisher / Subscriber

[<img src="https://github.com/MarioCarrion/MarioCarrion/blob/main/youtube.svg" width="20" height="20" alt="YouTube video"> Watch the video!](https://youtu.be/s-I3Bs3ZUsY)

Also known as **Pub/Sub**, this messaging pattern allows customers, known as `subscribers`, to asynchronously receive events generated by a producer, known as a `publisher`. **Pub/Sub** is a well-known pattern used in distributed systems to asynchronously communicate different services interested in knowing specific information about a particular service.

In Go, we can use the concurrency primitives of the language, such as channels and goroutines, to implement this pattern.

## What's a simpler way to read a channel when only one is used?

I asked this question in the final part of the video, the answer is to remove `select` and use the `for` directly, so something like this inthe go routine:

```
for val := range s1 {
fmt.Println("sub 1, value ", val)
}
fmt.Print("sub 1, exiting\n")
wg.Done()
```
3 changes: 3 additions & 0 deletions 2023/concurrency-pattern-publisher-subscriber/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/MarioCarrion/videos/2023/concurrency-pattern-publisher-subscriber

go 1.21.2
116 changes: 116 additions & 0 deletions 2023/concurrency-pattern-publisher-subscriber/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"fmt"
"sync"
)

type PubSub[T any] struct {
subscribers []chan T
mu sync.RWMutex
closed bool
}

func NewPubSub[T any]() *PubSub[T] {
return &PubSub[T]{
mu: sync.RWMutex{},
}
}

func (s *PubSub[T]) Subscribe() <-chan T {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return nil
}

r := make(chan T)

s.subscribers = append(s.subscribers, r)

return r
}

func (s *PubSub[T]) Publish(value T) {
s.mu.RLock()
defer s.mu.RUnlock()

if s.closed {
return
}

for _, ch := range s.subscribers {
ch <- value
}
}

func (s *PubSub[T]) Close() {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return
}

for _, ch := range s.subscribers {
close(ch)
}

s.closed = true
}

func main() {
ps := NewPubSub[string]()

wg := sync.WaitGroup{}

//-

s1 := ps.Subscribe()

go func() {
wg.Add(1)

for {
select {
case val, ok := <-s1:
if !ok {
fmt.Print("sub 1, exiting\n")
wg.Done()
return
}

fmt.Println("sub 1, value ", val)
}
}
}()

//-

s2 := ps.Subscribe()

go func() {
wg.Add(1)

for val := range s2 {
fmt.Println("sub 2, value ", val)
}

wg.Done()

fmt.Print("sub 2, exiting\n")
}()

//-

ps.Publish("one")
ps.Publish("two")
ps.Publish("three")

ps.Close()

wg.Wait()

fmt.Println("completed")
}

0 comments on commit 30565d9

Please sign in to comment.