From ccaa76dc5ff6321304c87dc57ecd019481f424c0 Mon Sep 17 00:00:00 2001 From: Lillian Date: Sun, 3 Dec 2023 19:25:35 -0500 Subject: [PATCH 1/2] basic implementation of comma-separated json --- crates/corro-agent/src/api/public/mod.rs | 27 +++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 874883af..0eabc416 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -16,6 +16,7 @@ use hyper::StatusCode; use itertools::Itertools; use metrics::counter; use rusqlite::{named_params, params_from_iter, ToSql, Transaction}; +use serde::Deserialize; use spawn::spawn_counted; use tokio::{ sync::{ @@ -464,8 +465,14 @@ async fn build_query_rows_response( } } +#[derive(Deserialize)] +pub struct V1QueriesParams { + ndjson: bool, +} + pub async fn api_v1_queries( Extension(agent): Extension, + axum::extract::Query(pms): axum::extract::Query, axum::extract::Json(stmt): axum::extract::Json, ) -> impl IntoResponse { let (mut tx, body) = hyper::Body::channel(); @@ -474,6 +481,13 @@ pub async fn api_v1_queries( let (data_tx, mut data_rx) = channel(512); tokio::spawn(async move { + if !pms.ndjson { + if let Err(e) = tx.send_data(bytes::Bytes::from_static(b"[")).await { + error!("could not send data through body's channel: {e}"); + return; + } + } + let mut buf = BytesMut::new(); while let Some(row_res) = data_rx.recv().await { @@ -493,13 +507,23 @@ pub async fn api_v1_queries( } } - buf.extend_from_slice(b"\n"); + if !matches!(row_res, QueryEvent::EndOfQuery { .. }) { + buf.extend_from_slice(if pms.ndjson { b"\n" } else { b"," }); + } if let Err(e) = tx.send_data(buf.split().freeze()).await { error!("could not send data through body's channel: {e}"); return; } } + + if !pms.ndjson { + if let Err(e) = tx.send_data(bytes::Bytes::from_static(b"]")).await { + error!("could not send data through body's channel: {e}"); + return; + } + } + debug!("query body channel done"); }); @@ -783,6 +807,7 @@ mod tests { let res = api_v1_queries( Extension(agent.clone()), + axum::extract::Query(V1QueriesParams { ndjson: true }), axum::Json(Statement::Simple("select * from tests".into())), ) .await From 7263b32a140c447a63280d443110c00d6fc73f0e Mon Sep 17 00:00:00 2001 From: Lillian Date: Tue, 5 Dec 2023 08:38:20 -0500 Subject: [PATCH 2/2] use accept:application/x-ndjson header --- crates/corro-agent/src/api/public/mod.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 0eabc416..565f01db 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -16,7 +16,6 @@ use hyper::StatusCode; use itertools::Itertools; use metrics::counter; use rusqlite::{named_params, params_from_iter, ToSql, Transaction}; -use serde::Deserialize; use spawn::spawn_counted; use tokio::{ sync::{ @@ -465,23 +464,21 @@ async fn build_query_rows_response( } } -#[derive(Deserialize)] -pub struct V1QueriesParams { - ndjson: bool, -} - pub async fn api_v1_queries( Extension(agent): Extension, - axum::extract::Query(pms): axum::extract::Query, + headers: axum::headers::HeaderMap, axum::extract::Json(stmt): axum::extract::Json, ) -> impl IntoResponse { + // https://github.com/ndjson/ndjson-spec#33-mediatype-and-file-extensions + let ndjson = headers.get("accept").map(|a| a == "application/x-ndjson").unwrap_or(false); + let (mut tx, body) = hyper::Body::channel(); // TODO: timeout on data send instead of infinitely waiting for channel space. let (data_tx, mut data_rx) = channel(512); tokio::spawn(async move { - if !pms.ndjson { + if !ndjson { if let Err(e) = tx.send_data(bytes::Bytes::from_static(b"[")).await { error!("could not send data through body's channel: {e}"); return; @@ -508,7 +505,7 @@ pub async fn api_v1_queries( } if !matches!(row_res, QueryEvent::EndOfQuery { .. }) { - buf.extend_from_slice(if pms.ndjson { b"\n" } else { b"," }); + buf.extend_from_slice(if ndjson { b"\n" } else { b"," }); } if let Err(e) = tx.send_data(buf.split().freeze()).await { @@ -517,7 +514,7 @@ pub async fn api_v1_queries( } } - if !pms.ndjson { + if !ndjson { if let Err(e) = tx.send_data(bytes::Bytes::from_static(b"]")).await { error!("could not send data through body's channel: {e}"); return; @@ -637,6 +634,7 @@ pub async fn api_v1_db_schema( #[cfg(test)] mod tests { + use axum::http::HeaderValue; use bytes::Bytes; use corro_types::{api::RowId, config::Config, schema::SqliteType, base::Version}; use futures::Stream; @@ -805,9 +803,12 @@ mod tests { println!("transaction body: {body:?}"); + let mut headers = axum::headers::HeaderMap::new(); + headers.insert("Accept", HeaderValue::from_static("application/x-ndjson")); + let res = api_v1_queries( Extension(agent.clone()), - axum::extract::Query(V1QueriesParams { ndjson: true }), + headers, axum::Json(Statement::Simple("select * from tests".into())), ) .await