-
Notifications
You must be signed in to change notification settings - Fork 1
/
parquet_s3_async.rs
61 lines (51 loc) · 2.28 KB
/
parquet_s3_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::io::Result;
use std::sync::Arc;
use futures::pin_mut;
use futures::{future::BoxFuture, StreamExt};
use s3::Bucket;
use parquet2::read::{get_page_stream, read_metadata_async};
use range_reader::{RangeOutput, RangedAsyncReader};
#[tokio::test]
async fn main() -> Result<()> {
let bucket_name = "dev-jorgecardleitao";
let region = "eu-central-1".parse().unwrap();
let bucket = Bucket::new_public(bucket_name, region).unwrap();
let path = "benches_65536.parquet".to_string();
let (data, _) = bucket.head_object(&path).await.unwrap();
let length = data.content_length.unwrap() as usize;
println!("total size in bytes: {}", length);
let range_get = Box::new(move |start: u64, length: usize| {
let bucket = bucket.clone();
let path = path.clone();
Box::pin(async move {
let bucket = bucket.clone();
let path = path.clone();
// to get a sense of what is being queried in s3
let (mut data, _) = bucket
// -1 because ranges are inclusive in `get_object_range`
.get_object_range(&path, start, Some(start + length as u64 - 1))
.await
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
data.truncate(length);
Ok(RangeOutput { start, data })
}) as BoxFuture<'static, std::io::Result<RangeOutput>>
});
// at least 4kb per s3 request. Adjust as you like.
let mut reader = RangedAsyncReader::new(length, 4 * 1024, range_get);
let metadata = read_metadata_async(&mut reader)
.await
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
assert_eq!(524288, metadata.num_rows);
// pages of the first row group and first column
let column_metadata = &metadata.row_groups[0].columns()[0];
let pages = get_page_stream(column_metadata, &mut reader, vec![], Arc::new(|_, _| true))
.await
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
pin_mut!(pages);
while let Some(maybe_page) = pages.next().await {
let page = maybe_page
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
assert_eq!(page.num_values(), 524288);
}
Ok(())
}