Skip to content

Commit

Permalink
Republish SEND events back to AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Aug 3, 2020
1 parent 0318457 commit fc39470
Showing 1 changed file with 63 additions and 46 deletions.
109 changes: 63 additions & 46 deletions gateway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/spec-tacles/go/types"
)

// RepublishPacket represents a SEND packet that now has a shard ID and must be re-published back to AMQP
type RepublishPacket struct {
ShardID int
Packet *types.SendPacket
}

// Manager manages Gateway shards
type Manager struct {
Shards map[int]*Shard
Expand Down Expand Up @@ -147,7 +153,63 @@ func (m *Manager) ConnectBroker(b *BrokerManager, events map[string]struct{}, ti
}
}

b.SetCallback(m.HandleEvent)
b.SetCallback(func(event string, d []byte) {
var (
shard *Shard
packet *types.SendPacket
)
if event == "SEND" {
p := &UnknownSendPacket{}
err := json.Unmarshal(d, p)
if err != nil {
m.log(LogLevelWarn, "unable to parse SEND packet: %s", err)
return
}

shardID := int(p.GuildID >> 22 % uint64(m.opts.ShardCount))
shard = m.Shards[shardID]
if shard == nil {
data, err := json.Marshal(p.Packet)
if err != nil {
m.log(LogLevelError, "error serializing SEND packet data (%+v): %s", *p.Packet, err)
return
}

err = b.PublishOptions(broker.PublishOptions{
Event: strconv.Itoa(shardID),
Data: data,
Timeout: timeout,
})
if err != nil {
m.log(LogLevelError, "error re-publishing SEND packet data to shard %d: %s", shardID, err)
}
return
}
packet = p.Packet
} else {
shardID, err := strconv.Atoi(event)
if err != nil {
m.log(LogLevelWarn, "received unexpected non-int event from AMQP: %s", err)
}
shard = m.Shards[shardID]
if shard == nil {
m.log(LogLevelWarn, "received event for shard %d which does not exist", shardID)
return
}

err = json.Unmarshal(d, packet)
if err != nil {
m.log(LogLevelWarn, "unable to parse packet intended for shard %d: %s", shardID, err)
return
}
}

err := shard.Send(packet)
if err != nil {
m.log(LogLevelError, "error sending packet (%d): %s", packet.Op, err)
}
})

go m.Subscribe(b, "SEND")
for id := range m.Shards {
go m.Subscribe(b, strconv.FormatInt(int64(id), 10))
Expand All @@ -161,48 +223,3 @@ func (m *Manager) Subscribe(b *BrokerManager, event string) {
m.log(LogLevelError, "failed to subscribe to event \"%s\": %s", event, err)
}
}

// HandleEvent handles an incoming message to be potentially sent on a shard that this manager is
// responsible for
func (m *Manager) HandleEvent(event string, d []byte) {
var (
shard *Shard
packet *types.SendPacket
)
if event == "SEND" {
p := &UnknownSendPacket{}
err := json.Unmarshal(d, p)
if err != nil {
m.log(LogLevelWarn, "unable to parse SEND packet: %s", err)
return
}

shard = m.Shards[int(p.GuildID>>22%uint64(m.opts.ShardCount))]
if shard == nil {
// TODO: republish back to AMQP
return
}
packet = p.Packet
} else {
shardID, err := strconv.ParseInt(event, 10, 64)
if err != nil {
m.log(LogLevelWarn, "received unexpected non-int event from AMQP: %s", err)
}
shard = m.Shards[int(shardID)]
if shard == nil {
m.log(LogLevelWarn, "received event for shard %d which does not exist", shardID)
return
}

err = json.Unmarshal(d, packet)
if err != nil {
m.log(LogLevelWarn, "unable to parse packet intended for shard %d: %s", shardID, err)
return
}
}

err := shard.Send(packet)
if err != nil {
m.log(LogLevelError, "error sending packet (%d): %s", packet.Op, err)
}
}

0 comments on commit fc39470

Please sign in to comment.