Skip to content

Commit

Permalink
signalmeow/sending: use structured logging
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Jan 6, 2024
1 parent 4828f64 commit 6168b25
Showing 1 changed file with 89 additions and 44 deletions.
133 changes: 89 additions & 44 deletions pkg/signalmeow/sending.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ func checkForErrorWithSessions(err error, addresses []*libsignalgo.Address, sess
return err
}
if addresses == nil || sessionRecords == nil {
return fmt.Errorf("Addresses or session records are nil")
return fmt.Errorf("addresses or session records are nil")
}
if len(addresses) != len(sessionRecords) {
return fmt.Errorf("Mismatched number of addresses (%d) and session records (%d)", len(addresses), len(sessionRecords))
return fmt.Errorf("mismatched number of addresses (%d) and session records (%d)", len(addresses), len(sessionRecords))
}
if len(addresses) == 0 || len(sessionRecords) == 0 {
return fmt.Errorf("No addresses or session records")
return fmt.Errorf("no addresses or session records")
}
return nil
}
Expand All @@ -153,7 +153,7 @@ func (cli *Client) howManyOtherDevicesDoWeHave(ctx context.Context) int {
for _, address := range addresses {
deviceID, err := address.DeviceID()
if err != nil {
zlog.Err(err).Msg("Error getting deviceID from address")
zerolog.Ctx(ctx).Err(err).Msg("Error getting deviceID from address")
continue
}
if deviceID != uint(cli.Store.DeviceID) {
Expand Down Expand Up @@ -189,7 +189,9 @@ func (cli *Client) buildMessagesToSend(ctx context.Context, recipientUUID uuid.U

// Don't send to this device that we are sending from
if recipientUUID == cli.Store.ACI && recipientDeviceID == uint(cli.Store.DeviceID) {
zlog.Debug().Msgf("Not sending to the device I'm sending from (%v:%v)", recipientUUID, recipientDeviceID)
zerolog.Ctx(ctx).Debug().
Uint("recipient_device_id", recipientDeviceID).
Msg("Not sending to the device I'm sending from")
continue
}

Expand All @@ -199,6 +201,9 @@ func (cli *Client) buildMessagesToSend(ctx context.Context, recipientUUID uuid.U
return nil, err
}
paddedMessage, err := addPadding(3, []byte(serializedMessage)) // TODO: figure out how to get actual version
if err != nil {
return nil, err
}
sessionRecord := sessionRecords[i]

var envelopeType int
Expand All @@ -208,6 +213,9 @@ func (cli *Client) buildMessagesToSend(ctx context.Context, recipientUUID uuid.U
} else {
envelopeType, encryptedPayload, err = cli.buildAuthedMessageToSend(ctx, recipientAddress, paddedMessage)
}
if err != nil {
return nil, err
}

destinationRegistrationID, err := sessionRecord.GetRemoteRegistrationID()
if err != nil {
Expand All @@ -233,6 +241,9 @@ func (cli *Client) buildAuthedMessageToSend(ctx context.Context, recipientAddres
cli.Store.SessionStore,
cli.Store.IdentityStore,
)
if err != nil {
return 0, nil, err
}
encryptedPayload, err = cipherTextMessage.Serialize()
if err != nil {
return 0, nil, err
Expand All @@ -245,7 +256,7 @@ func (cli *Client) buildAuthedMessageToSend(ctx context.Context, recipientAddres
} else if cipherMessageType == libsignalgo.CiphertextMessageTypeWhisper { // 2 -> 1
envelopeType = int(signalpb.Envelope_CIPHERTEXT)
} else {
return 0, nil, fmt.Errorf("Unknown message type: %v", cipherMessageType)
return 0, nil, fmt.Errorf("unknown message type: %v", cipherMessageType)
}
return envelopeType, encryptedPayload, nil
}
Expand Down Expand Up @@ -374,9 +385,11 @@ func syncMessageForContactRequest() *signalpb.Content {
}
}

func syncMessageFromReadReceiptMessage(receiptMessage *signalpb.ReceiptMessage, messageSender uuid.UUID) *signalpb.Content {
func syncMessageFromReadReceiptMessage(ctx context.Context, receiptMessage *signalpb.ReceiptMessage, messageSender uuid.UUID) *signalpb.Content {
if *receiptMessage.Type != signalpb.ReceiptMessage_READ {
zlog.Warn().Msgf("syncMessageFromReadReceiptMessage called with non-read receipt message: %v", receiptMessage.Type)
zerolog.Ctx(ctx).Warn().
Any("receipt_message_type", receiptMessage.Type).
Msg("syncMessageFromReadReceiptMessage called with non-read receipt message")
return nil
}
read := []*signalpb.SyncMessage_Read{}
Expand All @@ -394,18 +407,28 @@ func syncMessageFromReadReceiptMessage(receiptMessage *signalpb.ReceiptMessage,
}

func (cli *Client) SendContactSyncRequest(ctx context.Context) error {
if cli.LastContactRequestTime == nil {
cli.LastContactRequestTime = new(int64)
}
currentUnixTime := time.Now().Unix()
lastRequestTime := cli.LastContactRequestTime
log := zerolog.Ctx(ctx).With().
Str("action", "send contact sync request").
Int64("current_unix_time", currentUnixTime).
Int64("last_request_time", *lastRequestTime).
Int64("seconds_since_last_request", currentUnixTime-*lastRequestTime).
Logger()
ctx = log.WithContext(ctx)
// If we've requested in the last minute, don't request again
if lastRequestTime != nil && currentUnixTime-*lastRequestTime < 60 {
zlog.Warn().Msgf("Not sending contact sync request, already sent %v seconds ago", currentUnixTime-*lastRequestTime)
log.Warn().Msg("Not sending contact sync request because we already requested it in the past minute")
return nil
}

groupRequest := syncMessageForContactRequest()
_, err := cli.sendContent(ctx, cli.Store.ACI, uint64(currentUnixTime), groupRequest, 0)
if err != nil {
zlog.Err(err).Msg("Failed to send contact sync request message to myself (%v)")
log.Err(err).Msg("Failed to send contact sync request message to myself")
return err
}
cli.LastContactRequestTime = &currentUnixTime
Expand Down Expand Up @@ -484,6 +507,11 @@ func wrapDataMessageInContent(dm *signalpb.DataMessage) *signalpb.Content {
}

func (cli *Client) SendGroupMessage(ctx context.Context, gid types.GroupIdentifier, content *signalpb.Content) (*GroupMessageSendResult, error) {
log := zerolog.Ctx(ctx).With().
Str("action", "send group message").
Stringer("group_id", gid).
Logger()
ctx = log.WithContext(ctx)
group, err := cli.RetrieveGroupByID(ctx, gid, 0)
if err != nil {
return nil, err
Expand All @@ -508,19 +536,21 @@ func (cli *Client) SendGroupMessage(ctx context.Context, gid types.GroupIdentifi
// Don't send normal DataMessages to ourselves
continue
}
log := log.With().Stringer("member", member.UserID).Logger()
ctx := log.WithContext(ctx)
sentUnidentified, err := cli.sendContent(ctx, member.UserID, messageTimestamp, content, 0)
if err != nil {
result.FailedToSendTo = append(result.FailedToSendTo, FailedSendResult{
RecipientUUID: member.UserID,
Error: err,
})
zlog.Err(err).Msgf("Failed to send to %v", member.UserID)
log.Err(err).Msg("Failed to send to user")
} else {
result.SuccessfullySentTo = append(result.SuccessfullySentTo, SuccessfulSendResult{
RecipientUUID: member.UserID,
Unidentified: sentUnidentified,
})
zlog.Trace().Msgf("Successfully sent to %v", member.UserID)
log.Trace().Msg("Successfully sent to user")
}
}

Expand All @@ -534,7 +564,7 @@ func (cli *Client) SendGroupMessage(ctx context.Context, gid types.GroupIdentifi
}
_, selfSendErr := cli.sendContent(ctx, cli.Store.ACI, messageTimestamp, syncContent, 0)
if selfSendErr != nil {
zlog.Err(selfSendErr).Msg("Failed to send sync message to myself (%v)")
log.Err(selfSendErr).Msg("Failed to send sync message to myself")
}
}

Expand All @@ -543,7 +573,7 @@ func (cli *Client) SendGroupMessage(ctx context.Context, gid types.GroupIdentifi
}
if len(result.SuccessfullySentTo) == 0 {
lastError := result.FailedToSendTo[len(result.FailedToSendTo)-1].Error
return nil, fmt.Errorf("Failed to send to any group members: %v", lastError)
return nil, fmt.Errorf("failed to send to any group members: %w", lastError)
}

return result, nil
Expand Down Expand Up @@ -592,7 +622,7 @@ func (cli *Client) SendMessage(ctx context.Context, recipientID uuid.UUID, conte
} else if content.GetEditMessage() != nil {
syncContent = syncMessageFromSoloEditMessage(content.EditMessage, *result.SuccessfulSendResult)
} else if content.GetReceiptMessage().GetType() == signalpb.ReceiptMessage_READ {
syncContent = syncMessageFromReadReceiptMessage(content.ReceiptMessage, recipientID)
syncContent = syncMessageFromReadReceiptMessage(ctx, content.ReceiptMessage, recipientID)
}
if syncContent != nil {
_, selfSendErr := cli.sendContent(ctx, cli.Store.ACI, messageTimestamp, syncContent, 0)
Expand All @@ -615,23 +645,28 @@ func (cli *Client) sendContent(
content *signalpb.Content,
retryCount int, // For ending recursive retries
) (sentUnidentified bool, err error) {
log := zerolog.Ctx(ctx).With().
Str("action", "send content").
Stringer("recipient", recipientUUID).
Uint64("timestamp", messageTimestamp).
Logger()
ctx = log.WithContext(ctx)
printContentFieldString(ctx, content, "Outgoing message")
zerolog.Ctx(ctx).Trace().Any("raw_content", content).Msg("Raw data of outgoing message")
log.Trace().Any("raw_content", content).Msg("Raw data of outgoing message")

// If it's a data message, add our profile key
if content.DataMessage != nil {
profileKey, err := cli.ProfileKeyForSignalID(ctx, cli.Store.ACI)
if err != nil {
zlog.Err(err).Msg("Error getting profile key, not adding to outgoing message")
log.Err(err).Msg("Error getting profile key, not adding to outgoing message")
} else {
content.DataMessage.ProfileKey = profileKey.Slice()
}
}

if retryCount > 3 {
err := fmt.Errorf("Too many retries")
zlog.Err(err).Msgf("sendContent too many retries: %v", retryCount)
return false, err
log.Error().Int("retry_count", retryCount).Msg("sendContent too many retries")
return false, fmt.Errorf("too many retries")
}

useUnidentifiedSender := true
Expand All @@ -641,7 +676,7 @@ func (cli *Client) sendContent(
}
profileKey, err := cli.ProfileKeyForSignalID(ctx, recipientUUID)
if err != nil || profileKey == nil {
zlog.Err(err).Msg("Error getting profile key")
log.Err(err).Msg("Error getting profile key")
useUnidentifiedSender = false
// Try to self heal by requesting contact sync, though this is slow and not guaranteed to help
cli.SendContactSyncRequest(ctx)
Expand All @@ -650,7 +685,7 @@ func (cli *Client) sendContent(
if profileKey != nil {
accessKey, err = profileKey.DeriveAccessKey()
if err != nil {
zlog.Err(err).Msg("Error deriving access key")
log.Err(err).Msg("Error deriving access key")
useUnidentifiedSender = false
}
}
Expand All @@ -665,7 +700,7 @@ func (cli *Client) sendContent(
var messages []MyMessage
messages, err = cli.buildMessagesToSend(ctx, recipientUUID, content, useUnidentifiedSender)
if err != nil {
zlog.Err(err).Msg("Error building messages to send")
log.Err(err).Msg("Error building messages to send")
return false, err
}

Expand All @@ -684,19 +719,24 @@ func (cli *Client) sendContent(

var response *signalpb.WebSocketResponseMessage
if useUnidentifiedSender {
zlog.Trace().Msgf("Sending message to %v over unidentified WS", recipientUUID)
log.Trace().Msg("Sending message over unidentified WS")
base64AccessKey := base64.StdEncoding.EncodeToString(accessKey[:])
request.Headers = append(request.Headers, "unidentified-access-key:"+base64AccessKey)
response, err = cli.UnauthedWS.SendRequest(ctx, request)
} else {
zlog.Trace().Msgf("Sending message to %v over authed WS", recipientUUID)
log.Trace().Msg("Sending message over authed WS")
response, err = cli.AuthedWS.SendRequest(ctx, request)
}
sentUnidentified = useUnidentifiedSender
if err != nil {
return sentUnidentified, err
}
zlog.Trace().Msgf("Received a response to a message send from: %v, id: %v, code: %v", recipientUUID, *response.Id, *response.Status)
log = log.With().
Uint64("response_id", *response.Id).
Uint32("response_status", *response.Status).
Logger()
ctx = log.WithContext(ctx)
log.Trace().Msg("Received a response to a message send")

retryableStatuses := []uint32{409, 410, 428, 500, 503}

Expand Down Expand Up @@ -724,52 +764,51 @@ func (cli *Client) sendContent(
// Try to send again (**RECURSIVELY**)
sentUnidentified, err = cli.sendContent(ctx, recipientUUID, messageTimestamp, content, retryCount+1)
if err != nil {
zlog.Err(err).Msg("2nd try sendMessage error")
log.Err(err).Msg("2nd try sendMessage error")
return sentUnidentified, err
}
} else if *response.Status != 200 {
err := fmt.Errorf("Unexpected status code while sending: %v", *response.Status)
zlog.Err(err).Msg("")
return sentUnidentified, err
return sentUnidentified, fmt.Errorf("unexpected status code while sending: %d", *response.Status)
}

return sentUnidentified, nil
}

// A 409 means our device list was out of date, so we will fix it up
func (cli *Client) handle409(ctx context.Context, recipientUUID uuid.UUID, response *signalpb.WebSocketResponseMessage) error {
log := zerolog.Ctx(ctx)
// Decode json body
var body map[string]interface{}
err := json.Unmarshal(response.Body, &body)
if err != nil {
zlog.Err(err).Msg("Unmarshal error")
log.Err(err).Msg("Unmarshal error")
return err
}
// check for missingDevices and extraDevices
if body["missingDevices"] != nil {
missingDevices := body["missingDevices"].([]interface{})
zlog.Debug().Msgf("missing devices found in 409 response: %v", missingDevices)
missingDevices := body["missingDevices"].([]any)
log.Debug().Any("missing_devices", missingDevices).Msg("missing devices found in 409 response")
// TODO: establish session with missing devices
for _, missingDevice := range missingDevices {
cli.FetchAndProcessPreKey(ctx, recipientUUID, int(missingDevice.(float64)))
}
}
if body["extraDevices"] != nil {
extraDevices := body["extraDevices"].([]interface{})
zlog.Debug().Msgf("extra devices found in 409 response: %v", extraDevices)
extraDevices := body["extraDevices"].([]any)
log.Debug().Any("extra_devices", extraDevices).Msg("extra devices found in 409 response")
for _, extraDevice := range extraDevices {
// Remove extra device from the sessionstore
recipient, err := libsignalgo.NewUUIDAddress(
recipientUUID,
uint(extraDevice.(float64)),
)
if err != nil {
zlog.Err(err).Msg("NewAddress error")
log.Err(err).Msg("NewAddress error")
return err
}
err = cli.Store.SessionStoreExtras.RemoveSession(ctx, recipient)
if err != nil {
zlog.Err(err).Msg("RemoveSession error")
log.Err(err).Msg("RemoveSession error")
return err
}
}
Expand All @@ -779,25 +818,30 @@ func (cli *Client) handle409(ctx context.Context, recipientUUID uuid.UUID, respo

// A 410 means we have a stale device, so get rid of it
func (cli *Client) handle410(ctx context.Context, recipientUUID uuid.UUID, response *signalpb.WebSocketResponseMessage) error {
log := zerolog.Ctx(ctx)
// Decode json body
var body map[string]interface{}
err := json.Unmarshal(response.Body, &body)
if err != nil {
zlog.Err(err).Msg("Unmarshal error")
log.Err(err).Msg("Unmarshal error")
return err
}
// check for staleDevices and make new sessions with them
if body["staleDevices"] != nil {
staleDevices := body["staleDevices"].([]interface{})
zlog.Debug().Msgf("stale devices found in 410 response: %v", staleDevices)
staleDevices := body["staleDevices"].([]any)
log.Debug().Any("stale_devices", staleDevices).Msg("stale devices found in 410 response")
for _, staleDevice := range staleDevices {
recipient, err := libsignalgo.NewUUIDAddress(
recipientUUID,
uint(staleDevice.(float64)),
)
if err != nil {
log.Err(err).Msg("error creating new UUID Address")
return err
}
err = cli.Store.SessionStoreExtras.RemoveSession(ctx, recipient)
if err != nil {
zlog.Err(err).Msg("RemoveSession error")
log.Err(err).Msg("RemoveSession error")
return err
}
cli.FetchAndProcessPreKey(ctx, recipientUUID, int(staleDevice.(float64)))
Expand All @@ -810,11 +854,12 @@ func (cli *Client) handle410(ctx context.Context, recipientUUID uuid.UUID, respo
// We ~~will~~ could try sending a "pushChallenge" response, but if that doesn't work we just gotta wait.
// TODO: explore captcha response
func (cli *Client) handle428(ctx context.Context, recipientUUID uuid.UUID, response *signalpb.WebSocketResponseMessage) error {
log := zerolog.Ctx(ctx)
// Decode json body
var body map[string]interface{}
err := json.Unmarshal(response.Body, &body)
if err != nil {
zlog.Err(err).Msg("Unmarshal error")
log.Err(err).Msg("Unmarshal error")
return err
}

Expand All @@ -829,12 +874,12 @@ func (cli *Client) handle428(ctx context.Context, recipientUUID uuid.UUID, respo
if key == "Retry-After" {
retryAfterSeconds, err = strconv.ParseUint(value, 10, 64)
if err != nil {
zlog.Err(err).Msg("ParseUint error")
log.Err(err).Msg("ParseUint error")
}
}
}
if retryAfterSeconds > 0 {
zlog.Warn().Msgf("Got rate limited, need to wait %v seconds", retryAfterSeconds)
log.Warn().Uint64("retry_after_seconds", retryAfterSeconds).Msg("Got rate limited")
}
// TODO: responding to a pushChallenge this way doesn't work, server just returns 422
// Luckily challenges seem rare when sending with sealed sender
Expand Down

0 comments on commit 6168b25

Please sign in to comment.