-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmessage.go
84 lines (69 loc) · 1.75 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package streams
import (
"context"
)
// MetadataOrigin represents the metadata origin type.
type MetadataOrigin uint8
// MetadataOrigin types.
const (
CommitterOrigin MetadataOrigin = iota
ProcessorOrigin
)
// MetadataStrategy represents the metadata merge strategy.
type MetadataStrategy uint8
// MetadataStrategy types.
const (
Lossless MetadataStrategy = iota
Dupless
)
// Metadata represents metadata that can be merged.
type Metadata interface {
// WithOrigin sets the MetadataOrigin on the metadata.
WithOrigin(MetadataOrigin)
// Merge merges the contained metadata into the given the metadata with the given strategy.
Merge(Metadata, MetadataStrategy) Metadata
}
// Message represents data the flows through the stream.
type Message struct {
source Source
metadata Metadata
Ctx context.Context
Key interface{}
Value interface{}
}
// Metadata returns the Message Metadata.
func (m Message) Metadata() (Source, Metadata) {
return m.source, m.metadata
}
// WithMetadata add metadata to the Message for a Source.
func (m Message) WithMetadata(s Source, v Metadata) Message {
m.source = s
m.metadata = v
return m
}
// Empty determines if the Message is empty.
func (m Message) Empty() bool {
return m.Key == nil && m.Value == nil
}
// EmptyMessage is a predefined empty message.
var EmptyMessage = Message{}
// NewMessage creates a Message.
func NewMessage(k, v interface{}) Message {
return Message{
source: nil,
metadata: nil,
Ctx: context.Background(),
Key: k,
Value: v,
}
}
// NewMessageWithContext creates a Message with the given context.
func NewMessageWithContext(ctx context.Context, k, v interface{}) Message {
return Message{
source: nil,
metadata: nil,
Ctx: ctx,
Key: k,
Value: v,
}
}