-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Query
.fetch_raw
#182
base: main
Are you sure you want to change the base?
WIP Query
.fetch_raw
#182
Changes from 2 commits
bad2ea3
b85f17e
697e719
cb22c7d
0b01dc3
569519d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
use clickhouse::Client; | ||
use tokio::fs::File; | ||
use tokio::io::AsyncWriteExt; | ||
|
||
/// An example of streaming the result of a query in an arbitrary format to a file. | ||
|
||
#[tokio::main] | ||
async fn main() -> clickhouse::error::Result<()> { | ||
let client = Client::default().with_url("http://localhost:8123"); | ||
|
||
let mut raw_cursor = client | ||
.query( | ||
" | ||
SELECT number, randomPrintableASCII(20) | ||
FROM system.numbers | ||
LIMIT 100000 | ||
FORMAT CSV | ||
", | ||
) | ||
.fetch_raw()?; | ||
|
||
let mut file = File::create("output_async.txt").await.unwrap(); | ||
|
||
loop { | ||
match raw_cursor.next().await { | ||
Ok(None) => break, | ||
Err(err) => return Err(err), | ||
Ok(Some(bytes)) => { | ||
println!("Bytes read: {}", bytes.len()); | ||
file.write_all(&bytes).await.unwrap() | ||
} | ||
} | ||
} | ||
|
||
println!("Bytes written to output_async.txt"); | ||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,54 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::str::from_utf8; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
use serde::Deserialize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
use clickhouse::Client; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// An example of streaming raw data row-by-row in an arbitrary format. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// In this case, it's JSONEachRow. Similarly, it can be used with CSV, TSV, and others; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
/// the only difference will be in how the data is parsed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[tokio::main] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn main() -> clickhouse::error::Result<()> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let client = Client::default().with_url("http://localhost:8123"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut raw_cursor_newline = client | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.query( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
SELECT number, hex(randomPrintableASCII(20)) AS hex_str | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
FROM system.numbers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
LIMIT 10 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
FORMAT JSONEachRow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
// By default, ClickHouse quotes (U)Int64 in JSON* family formats; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
// disable it to simplify this example. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
because |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
.with_option("output_format_json_quote_64bit_integers", "0") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.fetch_raw()? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
.newline(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe like this?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because specifying Ideally, I would prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is possible after a few tweaks to the SQLBuilder. See this commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave the raw API as-is. The user will quickly find an invalid Same thread #182 (comment) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
loop { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
match raw_cursor_newline.next().await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(None) => break, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Err(err) => return Err(err), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(Some(row_bytes)) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let json_str = from_utf8(row_bytes).unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let parsed_json = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
serde_json::from_str::<MyRowInJSONEachRowFormat>(json_str).unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
println!("Number: {}", parsed_json.number); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
println!("HexStr: {}", parsed_json.hex_str); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
println!("================================================="); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
println!("Done!"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
// NB: there is no `Row` derive here | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[derive(Debug, Deserialize)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
struct MyRowInJSONEachRowFormat { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest avoiding any use cases where using non-raw API is preferred. Here we can use https://docs.rs/serde_json/latest/serde_json/enum.Value.html |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
number: i64, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
hex_str: String, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
use std::marker::PhantomData; | ||
|
||
use bstr::ByteSlice; | ||
use bytes::Bytes; | ||
use futures::TryStreamExt; | ||
use serde::Deserialize; | ||
|
@@ -13,7 +14,7 @@ use crate::{ | |
|
||
// === RawCursor === | ||
|
||
struct RawCursor(RawCursorInner); | ||
pub struct RawCursor(RawCursorInner); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs (preparing for |
||
|
||
enum RawCursorInner { | ||
Waiting(ResponseFuture), | ||
|
@@ -27,11 +28,18 @@ struct RawCursorLoading { | |
} | ||
|
||
impl RawCursor { | ||
fn new(response: Response) -> Self { | ||
pub(crate) fn new(response: Response) -> Self { | ||
Self(RawCursorInner::Waiting(response.into_future())) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably, we should also add This is de facto the standard for streams (with built-in combinators, e.g. concat) and should be used for owned types (like Alternatively, we can directly implement |
||
async fn next(&mut self) -> Result<Option<Bytes>> { | ||
pub fn newline(self) -> RawCursorNewline { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The It seems we can implement the |
||
RawCursorNewline { | ||
raw: self, | ||
bytes: BytesExt::default(), | ||
} | ||
} | ||
|
||
pub async fn next(&mut self) -> Result<Option<Bytes>> { | ||
if matches!(self.0, RawCursorInner::Waiting(_)) { | ||
self.resolve().await?; | ||
} | ||
|
@@ -86,6 +94,40 @@ fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T { | |
unsafe { &*(ptr as *const T) } | ||
} | ||
|
||
/// Similar to [`RawCursor`], but emits chunks of data split by the `\n` (`0x0a`) character. | ||
/// See [`RawCursorNewline::next`] for more details. | ||
pub struct RawCursorNewline { | ||
raw: RawCursor, | ||
bytes: BytesExt, | ||
} | ||
|
||
impl RawCursorNewline { | ||
/// Emits a chunk of data before the next `\n` (`0x0a`) character. | ||
/// | ||
/// With stream-friendly formats such as `CSV`, `JSONEachRow`, and similar, | ||
/// each iteration will produce the entire row (excluding the newline character itself). | ||
/// | ||
/// The result is unspecified if it's called after `Err` is returned. | ||
pub async fn next<'a>(&mut self) -> Result<Option<&'a [u8]>> { | ||
loop { | ||
if self.bytes.remaining() > 0 { | ||
let slice = workaround_51132(self.bytes.slice()); | ||
let newline_pos = slice.find_byte(b'\n'); | ||
if let Some(pos) = newline_pos { | ||
let (row, rest) = slice.split_at(pos); | ||
self.bytes.set_remaining(rest.len() - 1); // skip the newline character | ||
return Ok(Some(row)); | ||
} | ||
} | ||
|
||
match self.raw.next().await? { | ||
Some(chunk) => self.bytes.extend(chunk), | ||
None => return Ok(None), | ||
} | ||
} | ||
} | ||
} | ||
|
||
// === RowCursor === | ||
|
||
/// A cursor that emits rows. | ||
|
@@ -107,7 +149,7 @@ impl<T> RowCursor<T> { | |
|
||
/// Emits the next row. | ||
/// | ||
/// An result is unspecified if it's called after `Err` is returned. | ||
/// The result is unspecified if it's called after `Err` is returned. | ||
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>> | ||
where | ||
T: Deserialize<'b>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another real use case is to collect everything into one
Bytes
(orVec<u8>
). It's nice to have such an example, too!