From 336df2bd5f47ede6630f094debec28b27dfee181 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 30 Nov 2024 17:03:44 -0500 Subject: [PATCH] Add Puffin crate and CompressionCodec --- Cargo.toml | 2 + crates/puffin/Cargo.toml | 36 ++++++++++ crates/puffin/src/compression.rs | 120 +++++++++++++++++++++++++++++++ crates/puffin/src/lib.rs | 25 +++++++ 4 files changed, 183 insertions(+) create mode 100644 crates/puffin/Cargo.toml create mode 100644 crates/puffin/src/compression.rs create mode 100644 crates/puffin/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 71809fdb7..296cb5d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "crates/iceberg", "crates/integration_tests", "crates/integrations/*", + "crates/puffin", "crates/test_utils", ] exclude = ["bindings/python"] @@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] } volo-thrift = "0.10" hive_metastore = "0.1" tera = "1" +zstd = "0.13.2" diff --git a/crates/puffin/Cargo.toml b/crates/puffin/Cargo.toml new file mode 100644 index 000000000..fc62088c8 --- /dev/null +++ b/crates/puffin/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-puffin" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Puffin" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "puffin"] + +[dependencies] +iceberg = { workspace = true } +zstd = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/puffin/src/compression.rs b/crates/puffin/src/compression.rs new file mode 100644 index 000000000..1cb07bee8 --- /dev/null +++ b/crates/puffin/src/compression.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind, Result}; + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] +/// Data compression formats +pub enum CompressionCodec { + #[default] + /// No compression + None, + /// LZ4 single compression frame with content size present + Lz4, + /// Zstandard single compression frame with content size present + Zstd, +} + +impl CompressionCodec { + pub(crate) fn decompress(&self, bytes: Vec) -> Result> { + match self { + CompressionCodec::None => Ok(bytes), + CompressionCodec::Lz4 => Err(Error::new( + ErrorKind::FeatureUnsupported, + "LZ4 decompression is not supported currently", + )), + CompressionCodec::Zstd => { + let decompressed = zstd::stream::decode_all(&bytes[..])?; + Ok(decompressed) + } + } + } + + pub(crate) fn compress(&self, bytes: Vec) -> Result> { + match self { + CompressionCodec::None => Ok(bytes), + CompressionCodec::Lz4 => Err(Error::new( + ErrorKind::FeatureUnsupported, + "LZ4 compression is not supported currently", + )), + CompressionCodec::Zstd => { + let writer = Vec::::new(); + let mut encoder = zstd::stream::Encoder::new(writer, 3)?; + encoder.include_checksum(true)?; + encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?; + std::io::copy(&mut &bytes[..], &mut encoder)?; + let compressed = encoder.finish()?; + Ok(compressed) + } + } + } + + pub(crate) fn is_none(&self) -> bool { + matches!(self, CompressionCodec::None) + } +} + +#[cfg(test)] +mod tests { + use crate::compression::CompressionCodec; + + #[tokio::test] + async fn test_compression_codec_none() { + let compression_codec = CompressionCodec::None; + let bytes_vec = [0_u8; 100].to_vec(); + + let compressed = compression_codec.compress(bytes_vec.clone()).unwrap(); + assert_eq!(bytes_vec, compressed); + + let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); + assert_eq!(compressed, decompressed) + } + + #[tokio::test] + async fn test_compression_codec_lz4() { + let compression_codec = CompressionCodec::Lz4; + let bytes_vec = [0_u8; 100].to_vec(); + + assert_eq!( + compression_codec + .compress(bytes_vec.clone()) + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 compression is not supported currently", + ); + + assert_eq!( + compression_codec + .decompress(bytes_vec.clone()) + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 decompression is not supported currently", + ) + } + + #[tokio::test] + async fn test_compression_codec_zstd() { + let compression_codec = CompressionCodec::Zstd; + let bytes_vec = [0_u8; 100].to_vec(); + + let compressed = compression_codec.compress(bytes_vec.clone()).unwrap(); + assert!(compressed.len() < bytes_vec.len()); + + let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); + assert_eq!(decompressed, bytes_vec) + } +} diff --git a/crates/puffin/src/lib.rs b/crates/puffin/src/lib.rs new file mode 100644 index 000000000..a76b6a8ab --- /dev/null +++ b/crates/puffin/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg Puffin implementation. + +#![deny(missing_docs)] +// Temporarily allowing this while crate is under active development +#![allow(dead_code)] + +mod compression; +pub use compression::CompressionCodec;