Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handlesignal: add ServerTimestamp remote stream order on incoming messages #550

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions pkg/connector/handlesignal.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,16 @@ type Bv2ChatEvent struct {
}

var (
_ bridgev2.RemoteMessage = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEdit = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithTimestamp = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReaction = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReactionRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessageRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteTyping = (*Bv2ChatEvent)(nil)
_ bridgev2.RemotePreHandler = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteChatInfoChange = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessage = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEdit = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithTimestamp = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReaction = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReactionRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessageRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteTyping = (*Bv2ChatEvent)(nil)
_ bridgev2.RemotePreHandler = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteChatInfoChange = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithStreamOrder = (*Bv2ChatEvent)(nil)
)

func (evt *Bv2ChatEvent) GetType() bridgev2.RemoteEventType {
Expand Down Expand Up @@ -337,6 +338,10 @@ func (evt *Bv2ChatEvent) ConvertEdit(ctx context.Context, portal *bridgev2.Porta
}, nil
}

func (evt *Bv2ChatEvent) GetStreamOrder() int64 {
return int64(evt.Info.ServerTimestamp)
}

type Bv2Receipt struct {
Type signalpb.ReceiptMessage_Type
Chat networkid.PortalKey
Expand Down
3 changes: 2 additions & 1 deletion pkg/signalmeow/events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type MessageInfo struct {
Sender uuid.UUID
ChatID string

GroupRevision uint32
GroupRevision uint32
ServerTimestamp uint64
}

type ChatEvent struct {
Expand Down
36 changes: 20 additions & 16 deletions pkg/signalmeow/receiving.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ func (cli *Client) handleDecryptedResult(
log.Warn().Msg("sync message sent destination is nil")
} else if content.SyncMessage.Sent.Message != nil {
// TODO handle expiration start ts, and maybe the sync message ts?
cli.incomingDataMessage(ctx, content.SyncMessage.Sent.Message, cli.Store.ACI, syncDestinationServiceID)
cli.incomingDataMessage(ctx, content.SyncMessage.Sent.Message, cli.Store.ACI, syncDestinationServiceID, envelope.GetServerTimestamp())
} else if content.SyncMessage.Sent.EditMessage != nil {
cli.incomingEditMessage(ctx, content.SyncMessage.Sent.EditMessage, cli.Store.ACI, syncDestinationServiceID)
cli.incomingEditMessage(ctx, content.SyncMessage.Sent.EditMessage, cli.Store.ACI, syncDestinationServiceID, envelope.GetServerTimestamp())
}
}
if content.SyncMessage.Contacts != nil {
Expand Down Expand Up @@ -794,9 +794,9 @@ func (cli *Client) handleDecryptedResult(

var sendDeliveryReceipt bool
if content.DataMessage != nil {
sendDeliveryReceipt = cli.incomingDataMessage(ctx, content.DataMessage, theirServiceID.UUID, theirServiceID)
sendDeliveryReceipt = cli.incomingDataMessage(ctx, content.DataMessage, theirServiceID.UUID, theirServiceID, envelope.GetServerTimestamp())
} else if content.EditMessage != nil {
sendDeliveryReceipt = cli.incomingEditMessage(ctx, content.EditMessage, theirServiceID.UUID, theirServiceID)
sendDeliveryReceipt = cli.incomingEditMessage(ctx, content.EditMessage, theirServiceID.UUID, theirServiceID, envelope.GetServerTimestamp())
}
if sendDeliveryReceipt {
// TODO send delivery receipts after actually bridging instead of here
Expand All @@ -814,8 +814,9 @@ func (cli *Client) handleDecryptedResult(
}
cli.handleEvent(&events.ChatEvent{
Info: events.MessageInfo{
Sender: theirServiceID.UUID,
ChatID: groupOrUserID(groupID, theirServiceID),
Sender: theirServiceID.UUID,
ChatID: groupOrUserID(groupID, theirServiceID),
ServerTimestamp: envelope.GetServerTimestamp(),
},
Event: content.TypingMessage,
})
Expand All @@ -825,8 +826,9 @@ func (cli *Client) handleDecryptedResult(
if content.CallMessage != nil && (content.CallMessage.Offer != nil || content.CallMessage.Hangup != nil) {
cli.handleEvent(&events.Call{
Info: events.MessageInfo{
Sender: theirServiceID.UUID,
ChatID: theirServiceID.String(),
Sender: theirServiceID.UUID,
ChatID: theirServiceID.String(),
ServerTimestamp: envelope.GetServerTimestamp(),
},
IsRinging: content.CallMessage.Offer != nil,
})
Expand Down Expand Up @@ -954,7 +956,7 @@ func (cli *Client) handlePNISignatureMessage(ctx context.Context, sender libsign
return nil
}

func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalpb.EditMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID) bool {
func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalpb.EditMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID, serverTimestamp uint64) bool {
// If it's a group message, get the ID and invalidate cache if necessary
var groupID types.GroupIdentifier
var groupRevision uint32
Expand All @@ -972,16 +974,17 @@ func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalp
}
cli.handleEvent(&events.ChatEvent{
Info: events.MessageInfo{
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
ServerTimestamp: serverTimestamp,
},
Event: editMessage,
})
return true
}

func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalpb.DataMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID) bool {
func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalpb.DataMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID, serverTimestamp uint64) bool {
// If there's a profile key, save it
if dataMessage.ProfileKey != nil {
profileKey := libsignalgo.ProfileKey(dataMessage.ProfileKey)
Expand Down Expand Up @@ -1009,9 +1012,10 @@ func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalp
}

evtInfo := events.MessageInfo{
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
ServerTimestamp: serverTimestamp,
}
// Hacky special case for group calls to cache the state
if dataMessage.GroupCallUpdate != nil {
Expand Down
Loading