-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathcommands.go
206 lines (175 loc) · 7.45 KB
/
commands.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package cqrs
import (
"reflect"
"time"
)
// Command represents an actor intention to alter the state of the system
type Command struct {
MessageID string `json:"messageID"`
CorrelationID string `json:"correlationID"`
CommandType string `json:"commandType"`
Created time.Time `json:"time"`
Body interface{}
}
// CreateCommand is a helper for creating a new command object with populated default properties
func CreateCommand(body interface{}) Command {
commandType := reflect.TypeOf(body)
return Command{MessageID: "mid:" + NewUUIDString(),
CorrelationID: "cid:" + NewUUIDString(),
CommandType: commandType.String(),
Created: time.Now(),
Body: body}
}
// CreateCommandWithCorrelationID is a helper for creating a new command object with populated default properties
func CreateCommandWithCorrelationID(body interface{}, correlationID string) Command {
commandType := reflect.TypeOf(body)
return Command{MessageID: "mid:" + NewUUIDString(),
CorrelationID: correlationID,
CommandType: commandType.String(),
Created: time.Now(),
Body: body}
}
// CommandPublisher is responsilbe for publishing commands
type CommandPublisher interface {
PublishCommands([]Command) error
}
// CommandReceiver is responsible for receiving commands
type CommandReceiver interface {
ReceiveCommands(CommandReceiverOptions) error
}
// CommandBus ...
type CommandBus interface {
CommandReceiver
CommandPublisher
}
// CommandDispatchManager is responsible for coordinating receiving messages from command receivers and dispatching them to the command dispatcher.
type CommandDispatchManager struct {
commandDispatcher *MapBasedCommandDispatcher
typeRegistry TypeRegistry
receiver CommandReceiver
}
// CommandDispatcher the internal command dispatcher
func (m *CommandDispatchManager) CommandDispatcher() CommandDispatcher {
return m.commandDispatcher
}
// CommandReceiverOptions is an initalization structure to communicate to and from a command receiver go routine
type CommandReceiverOptions struct {
TypeRegistry TypeRegistry
Close chan chan error
Error chan error
ReceiveCommand CommandHandler
Exclusive bool
ListenerCount int
}
// CommandTransactedAccept is the message routed from a command receiver to the command manager.
// Sometimes command receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements
type CommandTransactedAccept struct {
Command Command
ProcessedSuccessfully chan bool
}
// CommandDispatcher is responsible for routing commands from the command manager to call handlers responsible for processing received commands
type CommandDispatcher interface {
DispatchCommand(Command) error
RegisterCommandHandler(event interface{}, handler CommandHandler)
RegisterGlobalHandler(handler CommandHandler)
}
// CommandHandler is a function that takes a command
type CommandHandler func(Command) error
// MapBasedCommandDispatcher is a simple implementation of the command dispatcher. Using a map it registered command handlers to command types
type MapBasedCommandDispatcher struct {
registry map[reflect.Type][]CommandHandler
globalHandlers []CommandHandler
}
// NewMapBasedCommandDispatcher is a constructor for the MapBasedVersionedCommandDispatcher
func NewMapBasedCommandDispatcher() *MapBasedCommandDispatcher {
registry := make(map[reflect.Type][]CommandHandler)
return &MapBasedCommandDispatcher{registry, []CommandHandler{}}
}
// RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received
func (m *MapBasedCommandDispatcher) RegisterCommandHandler(command interface{}, handler CommandHandler) {
commandType := reflect.TypeOf(command)
handlers, ok := m.registry[commandType]
if ok {
m.registry[commandType] = append(handlers, handler)
} else {
m.registry[commandType] = []CommandHandler{handler}
}
}
// RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received
func (m *MapBasedCommandDispatcher) RegisterGlobalHandler(handler CommandHandler) {
m.globalHandlers = append(m.globalHandlers, handler)
}
// DispatchCommand executes all command handlers registered for the given command type
func (m *MapBasedCommandDispatcher) DispatchCommand(command Command) error {
bodyType := reflect.TypeOf(command.Body)
if handlers, ok := m.registry[bodyType]; ok {
for _, handler := range handlers {
if err := handler(command); err != nil {
metricsCommandsFailed.WithLabelValues(command.CommandType).Inc()
return err
}
}
}
for _, handler := range m.globalHandlers {
if err := handler(command); err != nil {
metricsCommandsFailed.WithLabelValues(command.CommandType).Inc()
return err
}
}
metricsCommandsDispatched.WithLabelValues(command.CommandType).Inc()
return nil
}
// NewCommandDispatchManager is a constructor for the CommandDispatchManager
func NewCommandDispatchManager(receiver CommandReceiver, registry TypeRegistry) *CommandDispatchManager {
return &CommandDispatchManager{NewMapBasedCommandDispatcher(), registry, receiver}
}
// RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received
func (m *CommandDispatchManager) RegisterCommandHandler(command interface{}, handler CommandHandler) {
m.typeRegistry.RegisterType(command)
m.commandDispatcher.RegisterCommandHandler(command, handler)
}
// RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received
func (m *CommandDispatchManager) RegisterGlobalHandler(handler CommandHandler) {
m.commandDispatcher.RegisterGlobalHandler(handler)
}
// Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests
func (m *CommandDispatchManager) Listen(stop <-chan bool, exclusive bool, listenerCount int) error {
// Create communication channels
//
// for closing the queue listener,
closeChannel := make(chan chan error)
// receiving errors from the listener thread (go routine)
errorChannel := make(chan error)
// Command received channel receives a result with a channel to respond to, signifying successful processing of the message.
// This should eventually call a command handler. See cqrs.NewVersionedCommandDispatcher()
receiveCommandHandler := func(command Command) error {
PackageLogger().Debugf("CommandDispatchManager.DispatchCommand: %v", command.CorrelationID)
err := m.commandDispatcher.DispatchCommand(command)
if err != nil {
PackageLogger().Debugf("Error dispatching command: %v", err)
}
return err
}
// Start receiving commands by passing these channels to the worker thread (go routine)
options := CommandReceiverOptions{m.typeRegistry, closeChannel, errorChannel, receiveCommandHandler, exclusive, listenerCount}
if err := m.receiver.ReceiveCommands(options); err != nil {
return err
}
go func() {
for {
// Wait on multiple channels using the select control flow.
select {
case <-stop:
PackageLogger().Debugf("CommandDispatchManager.Stopping")
closeSignal := make(chan error)
closeChannel <- closeSignal
PackageLogger().Debugf("CommandDispatchManager.Stopped")
<-closeSignal
// Receiving on this channel signifys an error has occured worker processor side
case err := <-errorChannel:
PackageLogger().Debugf("CommandDispatchManager.ErrorReceived: %s", err)
}
}
}()
return nil
}