Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc(decision): Batch multiple files together into single large file to improve network throughput #98

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

cmanallen
Copy link
Member

@cmanallen cmanallen commented May 24, 2023

RFC is still in progress...

Rendered RFC

@cmanallen cmanallen force-pushed the rfc/store-multiple-replay-segments-in-a-single-blob branch from 0c6626b to 912301d Compare May 24, 2023 18:42
@cmanallen cmanallen marked this pull request as ready for review May 30, 2023 19:29
Copy link
Member

@bmckerry bmckerry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments about data deletions, but I don't feel good about coupling separate replays together like this.
How do you feel instead about buffering all segments from a single replay together and storing those as individual blobs? There would be less upside cost-wise, but (imo) it would simplify how we store our data, decrease storage costs somewhat, and make it so we only fetch one blob per replay.
I'm not sure how much work we'll need to accommodate this, just something I wanted to float

@cmanallen
Copy link
Member Author

@bmckerry

How do you feel instead about buffering all segments from a single replay together and storing those as individual blobs?

Not possible unfortunately.

@mdtro
Copy link
Member

mdtro commented Jun 6, 2023

I can help address some of the security concerns around data deletion and segment leaking. Symmetric encryption can solve both of these problems for us.


Specifically, I recommend using envelope encryption for this. With envelope encryption, we'll have two layers of keys. The following is rather broad and general guidelines, but we can work to make this more specific as needed. 🙂

  • the Data Encryption Key (DEK): used to encrypt the actual data
  • the Key Encryption Key (KEK): used to encrypt the Data Encryption Key

With the proposed design, we'd need a data encryption key (DEK) per segment. You could create DEKs per replay_id or per project_id, but then you're looking at centrally managing them. With the scale this is at, I'm not sure that's the route to go. By replay_id might be feasible?

I'm not too familiar with the underlying design of Replay's storage, but it sounds like there is an option of storing data in a metadata row? If so, the wrapped DEK can be stored there safely in a column/key. "Wrapped" refers to the plaintext DEK being encrypted itself with the Key Encryption Key (KEK).

For the KEK, I recommend generating a unique key every day. That day's key is used to encrypt any DEKs associated with segments being written. With a key per day, we can just delete the oldest key on a 90 day rotate cycle to maintain compliance with our data retention standards. Data older than 90 days would be unusable once the associated KEK was deleted.

At this point, data deletion would be a matter of deleting the DEK for the particular segments (ex. setting the column to an empty string or null). The data is unusable at that point. In terms of accidentally over-reading a segment, the data would not be decrypt-able and a failure would be obvious (since you'd need two keys).

You get some added bonuses here too:

  • data deletion is almost instantaneous (since you're just removing a key, not the actual bytes)
  • there's no need to read and re-write files

@cmanallen
Copy link
Member Author

@mdtro You probably wouldn't recommend having a statically configured KEK live as an environment variable, right? Probably not safe long-term.

I'm curious about failure mode of a daily generated KEK. Let's say we have some periodic process that creates KEKs. Let's also say this process experiences an outage and no one notices (or doesn't notice quick enough). Our ingestion pipeline would begin to fail because a KEK for that day could not be located.

So I'm thinking what if we generate KEKs in batches? That would mean we have a long lead time to fix any problems associated with generating KEKs. Is that a security liability? The keys won't be in use until their due date so I'm under the impression that if we were compromised we could conceivably generate new keys for the days that have not elapsed.

@cmanallen
Copy link
Member Author

cmanallen commented Jun 7, 2023

How do you feel instead about buffering all segments from a single replay together and storing those as individual blobs?

This question has been asked several different ways by several different people so I'd like to respond to it broadly. The first question I would ask is: why? Why would we want to do that? What problem are we experiencing currently that this tries to solve for (and that the RFCs proposal does not)?

Does it provide a better cost structure in GCS? The average replay is roughly 10 segments. I want to store more than 200 segments per file. So at a minimum we have a 20x difference in cost -- so no it does not have a better COGS outcome.

Maybe storing the segments together improves deletes in some way? With @mdtro's comment I think we've come to the conclusion that deleting files from GCS is not a concern and that encryption is both a viable and sanctioned alternative.

Is it safer and more fault-tolerant? The design of a system that aggregates like-segments will necessarily be less safe and more prone to distributed faults than our existing service. Why? Because its an in addition to piece of infrastructure. Its another component that can fail. It does not replace an existing component.

Is it possible to provide guarantees around this service's promise (all segments of a replay in a single file)? The state in the state machine has an undefined life-span. Because Kafka can backlog the maximum life of a Replay is not relevant. So when we flush we can never be truly certain that the entire replay was contained within. And so we're back to storing a replay in multiple files.

Does it improve the liveness of the data? The opposite. Data is retained for a minimum of an hour before its made available. Unless you expose the stateful service's internals via REST API (a significant addition to the scope).

Even if this were a high value goal, do we have the organization capability to execute on this? No. Maybe a couple people in this company could construct such a stateful service (in a safe and fault tolerant way) but again we circle back to the first question: what is the utility of storing like-segments together? And is it greater than the value those minds are providing elsewhere in the org?

Put simply:

Its technically possible to do. But its difficult to create, more expensive, prone to cascading faults in production, reduces the quality of the replay product, and unnecessarily couples a system to replay which otherwise could be used by other products.

Hopefully this clears things up :-)

@mdtro
Copy link
Member

mdtro commented Jun 7, 2023

@mdtro You probably wouldn't recommend having a statically configured KEK live as an environment variable, right? Probably not safe long-term.

I'm curious about failure mode of a daily generated KEK. Let's say we have some periodic process that creates KEKs. Let's also say this process experiences an outage and no one notices (or doesn't notice quick enough). Our ingestion pipeline would begin to fail because a KEK for that day could not be located.

So I'm thinking what if we generate KEKs in batches? That would mean we have a long lead time to fix any problems associated with generating KEKs. Is that a security liability? The keys won't be in use until their due date so I'm under the impression that if we were compromised we could conceivably generate new keys for the days that have not elapsed.

Ah, that's a really good point and I failed to leave out some detail. The wrapped DEK would include the key id of the KEK used to encrypt it. You wouldn't need to do any timestamp logic to determine which key to use. Rotating them each day doesn't need to be a perfectly timed orchestration. So if a KEK fails to rotate a particular day, you'd just use the existing current one until the issue is resolved. The KEK failing to rotate would not be a P0 issue, imo.

  • Encryption would always be done with the currently configured KEK.
  • Decryption would be done with the KEK key id defined in the wrapped DEK.

I'd use Cloud KMS which has some solid uptime guarantees. Combined with the above behavior, you'd maintain resilience in the ingest pipeline.


Google Cloud Storage lists the costs for writing and storing data as two separate categories. Writing a file costs $0.005 per 1000 files. Storing that file costs $0.02 per gigabyte. For the average Session Replay file (with a retention period of 90 days) this works out to: $0.000000012 for storage and $0.00000005 for the write.

In practical terms, this means 75% of our spend is allocated to writing new files.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the cost is mostly due to write, at scale is this cost problematic? Like is it a blocker for reaching higher scale or are we simply looking for a more efficient option ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe cost will prevent us from reaching greater scale. Write cost scale linearly. Pricing does not quite scale linearly but if you're happy with how much GCS costs at low levels of demand you will be happy at peak demand.

- We will use envelope encryption to protect the contents of every file.
- https://cloud.google.com/kms/docs/envelope-encryption
- Related, contiguous byte ranges will be encrypted independently of the rest of the file.
- We will use KMS to manage our key-encryption-keys.
Copy link

@fpacifici fpacifici Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KMS ?
Are you talking about GCP KMS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Google Key Management Service. Subject to change.

Comment on lines 57 to 58
- It is not unique.
- The value of the key field should be easily computable by your service.
Copy link

@fpacifici fpacifici Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ensuring this is unique and avoiding having two keys ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least once delivery guarantees. If you're bulk inserting 1000 rows per upload then it becomes difficult to split out the rows that have duplicates.

But, thinking on this more... my plan is to use Kafka for this. Ordering guarantees could mean that the file batch is deterministic (assuming we don't have a stateful countdown timer). If 1 row exists in the database then they all must exist (therefore the batch was already written - commit offsets and move on). I suppose re-balancing would not affect this? A unique constraint could be possible.

That being said I think a deadline is important so files don't sit idle in the consumer for undefined periods of time.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again a read query against the unique key prior to the insert operation could satisfy this constraint. So yes I would say a unique constraint is possible here.

Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few high level remarks:

What is the rationale for merging parts into single blob outside of Filestore instead of evolving Filestore to achieve the result? In this implementation it seems you are targeting a generic solution that is meant to be reusable in other scenarios, but if we go that way we will end up in a split world where one "filestore" is great for latency but unable to merge parts into files and the "other filestore" is horrible at latency but able to merge parts into files.
This seems an undesirable situation and I would strongly advice to consolidate into one solution that handles files well across the board instead (whether that is Filestore or a brand new service).

Could you please expand on how this would be supported for open source customers? IT seems to depend on additional external services we do not have today, they must have a self hosted alternative. In general also it is a good idea to be wary of solutions that are substantially different in different environments. We tend to avoid them unless strictly necessary as maintenance becomes a lot harder and the chances to break the system is higher.


Notice each row in the example above points to the same file but with different start and stop locations. This implies that multiple, independent parts can be present in the same file. A single file can be shared by hundreds of different parts.

Second, the Session Replay recording consumer will not commit blob data to Google Cloud Storage for each segment. Instead it will buffer many segments and flush them together as a single blob to GCS. Next it will make a bulk insertion into the database for tracking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?
It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this buffering process going to look like?

Like buffering would in our Snuba consumers. You keep an array of messages/rows/bytes in memory. When it comes time to flush you zip them together in some protocol specific format. In this case the protocol is byte concatenation.

It will have to rely on persistent state to prevent you from loosing chunks in case of failover before a file is fully ingested while parts have been already committed in Kafka.

If we assume that the replays consumer pushes the bytes to another, generic Kafka consumer which buffers the files before upload then the persistent state will be the log. Upload failures can be re-run from the last committed offset. Persistent failures would have to be handled as part of a DLQ and would require re-running. Potentially introducing a significant amount of latency between (in the replays case) a billing outcome and the replay recording being made available.

Assuming this buffering/upload step exists inside our existing consumer (i.e. not a generic service) then offsets will not be committed until after the batch has been uploaded.

Also a more high level concern with this.
If the idea is to build a generic system reusable by other features, relying on a specific consumer to do the buffering has the important implication that Kafka is always going to be a moving part of your system. Have you considered doing the buffering separately.

I have considered that. The idea being process A buffers a bunch of file-parts/offsets before sending the completed object to permanent storage either through direct interaction, filestore, or Kafka intermediary (or any number of intermediaries). The problem is then that each call site is responsible for the buffer which is the hardest part of the problem.

Kafka is an important part of this system in my mind. When I wrote this document I relied on the guarantees it provides. I think if this is a generic service made available to the company then publishing to the topic should be a simple process for integrators. The fact that its Kafka internally is an irrelevant implementation detail (to the caller).

- A "part" is a distinct blob of binary data.
- It exists as a subset of bytes within a larger set of bytes (referred to as a "file").
- A "part" could refer to a replay segment or to a sourcemap or anything that requires storage in a blob storage service.
2. Each "part" within a file will be encrypted.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since encryption is used here entirely to make deletion of individual parts quicker, could you please expand on:

  • whether we are confident that this is a use case worth optimizing for (deleting individual part quickly)
  • how would the process look like if we just accepted to rewrite files entirely in order to remove the part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we are confident that this is a use case worth optimizing for (deleting individual part quickly)

It is necessary to support GDPR, project, and point deletes. I consider it a high priority. The alternative is rotating the file with the offending parts removed.

how would the process look like if we just accepted to rewrite files entirely in order to remove the part.

The operations is no particular order:

  1. Download the file
  2. Get the active byte ranges from the database.
  3. Remove all byte ranges from the file not found in the set of returned byte ranges.
  4. Delete all references to the file in the database.
  5. Upload the new file.
  6. Insert new offset rows.
  7. Delete the old file.

Repeat for every delete operation. Deletes must be single-threaded per file to prevent concurrent access. You can use kafka and partition on filename.

Comment on lines 157 to 160
2. How will read efficiency be impacted if we rely on a remote service to decrypt blob data?
- It will have some cost but hopefully that cost is minimized by the constraints of your system.
- For example, Session Replay fetches multiple segment blobs in a single request. At most we will need to fetch two keys (and in the majority of cases a single key) to decrypt the segments.
- This key fetching latency is immaterial to the total latency of the request.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the self hosted open source implementation for this

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment variable named SECRET_KEY.


**Buffer Location and Behavior**

The buffer is kept as an in-memory list inside the consumer process. For each message we receive we append the message to the buffer. Afterwards, we check if the buffer is full. If it is we flush. Else we wait for another message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this will constrain you to a fairly small buffer size.
Also it ties the number of replicas of your consumer to the cost efficiency of the storage, which is quite undesirable:
Assuming you are never going to commit on kafka untill the buffer is flushed (if you did you would not be able to guarantee at least once):

  • If you increase the number of replicas for any reason, each replicas takes less traffic, thus it takes longer to fill the buffer.
  • Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.
    replicas and file size should not be connected to each other, otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the time to accumulate a batch is less than the time to upload a batch then you need to add a replica. That's the only constraint. You get more efficiency at peak load so its best to run our replicas hot. The deadline will prevent the upload from sitting idle too long. The total scale factor will be determined by the number of machines we can throw at the problem.

Multi-processing/threading, I think, will be deadly to this project. So we will need a lot of single-threaded machines running.

I re-wrote this response several times. Its as disorganized as my thoughts are on this. Happy to hear critiques.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you tie the commit batch time and size to the number of replicas (which is undesirable - see the snuba consumer), increasing the number of replicas would increase the number of files written per unit of time.

I think this is an okay outcome. If you double the number of replicas you halve the number of parts per file and double the number of files. That reduces cost efficiency but the throughput efficiency remains the same for each replica. Ignoring replica count, cost efficiency will ebb and flow with the variations in load we receive throughout the day.

We still come out ahead because the total number of files written per second is less than the current implementation which is 1 file per message.

Copy link
Member Author

@cmanallen cmanallen Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] otherwise we will have to keep in mind a lot of affected moving parts when scaling the consumer.

We should scale our replicas agnostic to the implementation of the buffer's flush mechanics. I mentioned above about cost-efficiency being a hard target. So I don't think we should target it.

A deadline should be present to guarantee regular buffer commits and a max buffer size should exist to prevent us from using too many resources. I think those two commit semantics save us from having to think about the implications of adding replicas. The throughput of a single machine may drop but the risk of back log has decreased across the cluster.


**Handling Consumer Restarts**

If the consumer restarts with a non-empty buffer, the buffer's last item's offset will not be committed. When the consumer resumes it will start processing from the last offset committed (i.e. the last item in the last successfully-flushed-buffer). The buffer will be rebuilt exactly as it was prior to restart.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the target size of each file you want to write and the target buffer size ?
The approach of throwing away the in flight work is generally ok if the buffer are very small and the time it takes to reprocessing is negligible. If this is not the case (you want large files) you may create a lot of noise when kafka rebalances consumer groups and create a lot of backlog.

Copy link
Member Author

@cmanallen cmanallen Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the commit criterion I have (naively) envisioned. All these values are adjustable.

  • Commit if filesize >= 10MB.
  • Commit if num_parts >= 1000.
  • Commit if five-second-deadline exceeded.

10MB and 1000 parts can be scaled down to 1MB and 100 parts if they seem unrealistic. Any lower and the concept, I think, has run its course and is not worth pursuing.

the time it takes to reprocessing is negligible

I believe this will be the case. A generic consumer implementation should only be doing byte concatenation. But depending on size it may take a while to fetch those files over the network to even begin buffering.

I would prefer this generic consumer deployed independently of getsentry so re-balances are less common.

@cmanallen
Copy link
Member Author

@fpacifici

What is the rationale for merging parts into single blob outside of Filestore instead of evolving Filestore to achieve the result? In this implementation it seems you are targeting a generic solution that is meant to be reusable in other scenarios, but if we go that way we will end up in a split world where one "filestore" is great for latency but unable to merge parts into files and the "other filestore" is horrible at latency but able to merge parts into files.
This seems an undesirable situation and I would strongly advice to consolidate into one solution that handles files well across the board instead (whether that is Filestore or a brand new service).

Why should these cases be mutually exclusive? A buffering consumer could point to filestore as its storage destination.

That being said I'm happy to improve filestore in whatever capacity you think I'm able. Just point me in a direction and I'll go.

Could you please expand on how this would be supported for open source customers? IT seems to depend on additional external services we do not have today, they must have a self hosted alternative. In general also it is a good idea to be wary of solutions that are substantially different in different environments. We tend to avoid them unless strictly necessary as maintenance becomes a lot harder and the chances to break the system is higher.

I think I addressed these cases in my other responses but if not. Things like KMS can be replaced with an environment variable and the Kafka consumer is part of the broader sentry ecosystem of technologies. Everything else is just code.

@cmanallen
Copy link
Member Author

cmanallen commented Jul 14, 2023

@fpacifici

Have you considered doing the buffering separately.

I've been thinking about this a lot and it made me realize that I goofed. When I originally wrote this RFC it was for an in-memory buffer within the replays consumer. You reduce service costs and simplify your network overhead by batching the files together. That all makes sense to me.

Then this generic consumer idea emerged. I got really excited about it and forgot one crucial thing. Publishing to Kafka isn't free. This idea only works if you totally disregard its network cost. I've taken 1000 network calls and made it 1000 + 1 network calls. Those 1000 publishes to Kafka could be faster (or slower) but faster isn't free and free is what I was looking for.

So with that in mind I'm going to revert the RFC back to the point where its just talking about an in-memory buffer on the replays consumer. So I think this RFC no longer has any org-wide impact other than whatever minor libraries or database tables emerge from it.

@cmanallen
Copy link
Member Author

Following up from my July 13th comment from above. Having reviewed the Kafka producer implementation I do believe a generic Kafka consumer is a viable path and should be explored.

@cmanallen cmanallen changed the title rfc(decision): Store Multiple Replay Segments in a Single Blob rfc(decision): Batch multiple files together into single large file to improve network throughput Aug 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants