Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compress backups with gzip #6111

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ anyhow = { workspace = true }
async-broadcast = "0.7.1"
async-channel = { workspace = true }
async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] }
async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }
Expand Down
2 changes: 1 addition & 1 deletion deltachat-rpc-client/tests/test_securejoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_qr_securejoin(acfactory, protect, tmp_path):
# Setup second device for Alice
# to test observing securejoin protocol.
alice.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0])

Expand Down
4 changes: 2 additions & 2 deletions deltachat-rpc-client/tests/test_something.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def test_import_export_backup(acfactory, tmp_path) -> None:
alice = acfactory.new_configured_account()
alice.export_backup(tmp_path)

files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0])

Expand Down Expand Up @@ -630,7 +630,7 @@ def test_markseen_contact_request(acfactory, tmp_path):

# Bob sets up a second device.
bob.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
bob2 = acfactory.get_unconfigured_account()
bob2.import_backup(files[0])
bob2.start_io()
Expand Down
72 changes: 43 additions & 29 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_lite::FutureExt;
use pin_project::pin_project;

use tokio::fs::{self, File};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_tar::Archive;

use crate::blob::BlobDirContents;
Expand Down Expand Up @@ -123,7 +123,7 @@ pub async fn has_backup(_context: &Context, dir_name: &Path) -> Result<String> {
let name = dirent.file_name();
let name: String = name.to_string_lossy().into();
if name.starts_with("delta-chat")
&& name.ends_with(".tar")
&& (name.ends_with(".tar") || name.ends_with(".tar.gz"))
&& (newest_backup_name.is_empty() || name > newest_backup_name)
{
// We just use string comparison to determine which backup is newer.
Expand Down Expand Up @@ -269,30 +269,24 @@ async fn import_backup(
context.get_dbfile().display()
);

import_backup_stream(context, backup_file, file_size, passphrase).await?;
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
if backup_to_import.extension() == Some(OsStr::new("gz")) {
let backup_file = tokio::io::BufReader::new(backup_file);
let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file);
import_backup_stream(context, backup_file, passphrase).await?;
} else {
import_backup_stream(context, backup_file, passphrase).await?;
}
Ok(())
}

/// Imports backup by reading a tar file from a stream.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> Result<()> {
import_backup_stream_inner(context, backup_file, file_size, passphrase)
import_backup_stream_inner(context, backup_file, passphrase)
.await
.0
}
Expand All @@ -319,6 +313,19 @@ struct ProgressReader<R> {
}

impl<R> ProgressReader<R> {
/// Creates a new `ProgressReader`.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
fn new(r: R, context: Context, file_size: u64) -> Self {
Self {
inner: r,
Expand Down Expand Up @@ -358,10 +365,8 @@ where
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> (Result<()>,) {
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
let mut archive = Archive::new(backup_file);

let mut entries = match archive.entries() {
Expand Down Expand Up @@ -461,10 +466,10 @@ fn get_next_backup_path(
tempdbfile.push(format!("{stem}-{i:02}-{addr}.db"));

let mut tempfile = folder.clone();
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part"));
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part"));

let mut destfile = folder.clone();
destfile.push(format!("{stem}-{i:02}-{addr}.tar"));
destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz"));

if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() {
return Ok((tempdbfile, tempfile, destfile));
Expand Down Expand Up @@ -504,9 +509,13 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
file_size += blob.to_abs_path().metadata()?.len()
}

export_backup_stream(context, &temp_db_path, blobdir, file, file_size)
.await
.context("Exporting backup to file failed")?;
let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
let mut gzip_encoder =
export_backup_stream(context, &temp_db_path, blobdir, gzip_encoder, file_size)
.await
.context("Exporting backup to file failed")?;
gzip_encoder.shutdown().await?;

fs::rename(temp_path, &dest_path).await?;
context.emit_event(EventType::ImexFileWritten(dest_path));
Ok(())
Expand Down Expand Up @@ -543,6 +552,10 @@ impl<W> ProgressWriter<W> {
context,
}
}

fn into_inner(self) -> W {
self.inner
}
}

impl<W> AsyncWrite for ProgressWriter<W>
Expand Down Expand Up @@ -590,12 +603,12 @@ pub(crate) async fn export_backup_stream<'a, W>(
blobdir: BlobDirContents<'a>,
writer: W,
file_size: u64,
) -> Result<()>
) -> Result<W>
where
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
{
let writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(writer);
let progress_writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(progress_writer);

builder
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
Expand All @@ -607,8 +620,9 @@ where
builder.append_file(path_in_archive, &mut file).await?;
}

builder.finish().await?;
Ok(())
// Convert tar builder back into the underlying stream.
let progress_writer = builder.into_inner().await?;
Ok(progress_writer.into_inner())
}

/// Imports secret key from a file.
Expand Down
14 changes: 9 additions & 5 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ use futures_lite::FutureExt;
use iroh_net::relay::RelayMode;
use iroh_net::Endpoint;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crate::chat::add_device_msg;
use crate::context::Context;
use crate::imex::BlobDirContents;
use crate::imex::{BlobDirContents, ProgressReader};
use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
Expand Down Expand Up @@ -190,9 +191,11 @@ impl BackupProvider {

send_stream.write_all(&file_size.to_be_bytes()).await?;

export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
let mut send_stream =
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
send_stream.shutdown().await?;
info!(context, "Finished writing backup into QUIC stream.");
let mut buf = [0u8; 1];
info!(context, "Waiting for acknowledgment.");
Expand Down Expand Up @@ -310,7 +313,8 @@ pub async fn get_backup2(
let mut file_size_buf = [0u8; 8];
recv_stream.read_exact(&mut file_size_buf).await?;
let file_size = u64::from_be_bytes(file_size_buf);
import_backup_stream(context, recv_stream, file_size, passphrase)
let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size);
import_backup_stream(context, recv_stream, passphrase)
.await
.context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream.");
Expand Down
Loading