Skip to content

Commit

Permalink
Add Puffin crate and CompressionCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Nov 30, 2024
1 parent f3a571d commit 8f1ae48
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/iceberg",
"crates/integration_tests",
"crates/integrations/*",
"crates/puffin",
"crates/test_utils",
]
exclude = ["bindings/python"]
Expand Down Expand Up @@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
volo-thrift = "0.10"
hive_metastore = "0.1"
tera = "1"
zstd = "0.13.2"
38 changes: 38 additions & 0 deletions crates/puffin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iceberg-puffin"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
rust-version = { workspace = true }

categories = ["database"]
description = "Apache Iceberg Puffin"
repository = { workspace = true }
license = { workspace = true }
keywords = ["iceberg", "puffin"]

[dependencies]
iceberg = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zstd = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
123 changes: 123 additions & 0 deletions crates/puffin/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use iceberg::{Error, ErrorKind, Result};
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
/// Data compression formats
pub enum CompressionCodec {
#[default]
/// No compression
None,
/// LZ4 single compression frame with content size present
Lz4,
/// Zstandard single compression frame with content size present
Zstd,
}

impl CompressionCodec {
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
CompressionCodec::Zstd => {
let decompressed = zstd::stream::decode_all(&bytes[..])?;
Ok(decompressed)
}
}
}

pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 compression is not supported currently",
)),
CompressionCodec::Zstd => {
let writer = Vec::<u8>::new();
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
encoder.include_checksum(true)?;
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
std::io::copy(&mut &bytes[..], &mut encoder)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
}
}

pub(crate) fn is_none(&self) -> bool {
matches!(self, CompressionCodec::None)
}
}

#[cfg(test)]
mod tests {
use crate::compression::CompressionCodec;

#[tokio::test]
async fn test_compression_codec_none() {
let compression_codec = CompressionCodec::None;
let bytes_vec = [0_u8; 100].to_vec();

let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
assert_eq!(bytes_vec, compressed);

let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
assert_eq!(compressed, decompressed)
}

#[tokio::test]
async fn test_compression_codec_lz4() {
let compression_codec = CompressionCodec::Lz4;
let bytes_vec = [0_u8; 100].to_vec();

assert_eq!(
compression_codec
.compress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 compression is not supported currently",
);

assert_eq!(
compression_codec
.decompress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 decompression is not supported currently",
)
}

#[tokio::test]
async fn test_compression_codec_zstd() {
let compression_codec = CompressionCodec::Zstd;
let bytes_vec = [0_u8; 100].to_vec();

let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
assert!(compressed.len() < bytes_vec.len());

let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
assert_eq!(decompressed, bytes_vec)
}
}
25 changes: 25 additions & 0 deletions crates/puffin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Iceberg Puffin implementation.

#![deny(missing_docs)]
// Temporarily allowing this while crate is under active development
#![allow(dead_code)]

mod compression;
pub use compression::CompressionCodec;

0 comments on commit 8f1ae48

Please sign in to comment.