Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: export promql service in server #924

Merged
merged 12 commits into from
Feb 3, 2023
4 changes: 4 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::promql::PromqlOptions;
use frontend::Plugins;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct StandaloneOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
Expand All @@ -88,6 +90,7 @@ impl Default for StandaloneOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
Expand All @@ -106,6 +109,7 @@ impl StandaloneOptions {
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
promql_options: self.promql_options,
mode: self.mode,
meta_client_opts: None,
}
Expand Down
26 changes: 25 additions & 1 deletion src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
// limitations under the License.

use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use query::parser::{QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
Expand Down Expand Up @@ -209,6 +212,16 @@ impl SqlQueryHandler for Instance {
vec![result]
}

async fn do_promql_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let result = self.execute_promql(query, query_ctx).await;
vec![result]
}

async fn do_statement_query(
&self,
stmt: Statement,
Expand All @@ -227,6 +240,17 @@ impl SqlQueryHandler for Instance {
}
}

#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu { query })
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed";
pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed";
pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed";
pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed";
9 changes: 9 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ pub enum Error {
#[snafu(backtrace)]
source: partition::error::Error,
},

// TODO(ruihang): merge all query execution error kinds
#[snafu(display("failed to execute PromQL query {}, source: {}", query, source))]
ExecutePromql {
query: String,
#[snafu(backtrace)]
source: servers::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -351,6 +359,7 @@ impl ErrorExt for Error {
Error::NotSupported { .. } => StatusCode::Unsupported,

Error::RuntimeResource { source, .. } => source.status_code(),
Error::ExecutePromql { source, .. } => source.status_code(),

Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prometheus::PrometheusOptions;
use crate::promql::PromqlOptions;
use crate::server::Services;
use crate::Plugins;

Expand All @@ -41,6 +42,7 @@ pub struct FrontendOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub meta_client_opts: Option<MetaClientOpts>,
}
Expand All @@ -55,6 +57,7 @@ impl Default for FrontendOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
meta_client_opts: None,
}
Expand Down
39 changes: 38 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::promql::{PromqlHandler, PromqlHandlerRef};
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef};
use servers::query_handler::{
Expand All @@ -57,7 +58,9 @@ use sql::statements::statement::Statement;

use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result};
use crate::error::{
self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu, Result,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler};
Expand All @@ -71,6 +74,7 @@ pub trait FrontendInstance:
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ ScriptHandler
+ PromqlHandler
+ Send
+ Sync
+ 'static
Expand All @@ -88,6 +92,7 @@ pub struct Instance {
script_handler: Option<ScriptHandlerRef>,
sql_handler: SqlQueryHandlerRef<Error>,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
promql_handler: Option<PromqlHandlerRef>,

create_expr_factory: CreateExprFactoryRef,

Expand Down Expand Up @@ -123,6 +128,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
})
}
Expand Down Expand Up @@ -164,6 +170,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()),
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
}
}
Expand All @@ -176,6 +183,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
}
}
Expand Down Expand Up @@ -447,6 +455,21 @@ impl SqlQueryHandler for Instance {
}
}

async fn do_promql_query(&self, query: &str, _: QueryContextRef) -> Vec<Result<Output>> {
if let Some(handler) = &self.promql_handler {
let result = handler
.do_query(query)
.await
.context(ExecutePromqlSnafu { query });
vec![result]
} else {
vec![Err(NotSupportedSnafu {
feat: "PromQL Query",
}
.build())]
}
}

async fn do_statement_query(
&self,
stmt: Statement,
Expand Down Expand Up @@ -496,6 +519,20 @@ impl ScriptHandler for Instance {
}
}

#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await
} else {
server_error::NotSupportedSnafu {
feat: "PromQL query in Frontend",
}
.fail()
}
}
}

#[cfg(test)]
mod tests {
use std::borrow::Cow;
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ impl SqlQueryHandler for DistInstance {
self.handle_sql(query, query_ctx).await
}

async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}

async fn do_statement_query(
&self,
stmt: Statement,
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ impl SqlQueryHandler for StandaloneSqlQueryHandler {
.collect()
}

async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}

async fn do_statement_query(
&self,
stmt: Statement,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prometheus;
pub mod promql;
mod server;
mod sql;
mod table;
Expand Down
39 changes: 39 additions & 0 deletions src/frontend/src/promql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromqlOptions {
pub addr: String,
}

impl Default for PromqlOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4004".to_string(),
}
}
}

#[cfg(test)]
mod tests {
use super::PromqlOptions;

#[test]
fn test_prometheus_options() {
let default = PromqlOptions::default();
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
}
}
19 changes: 17 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use servers::http::HttpServer;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::promql::PromqlServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
Expand Down Expand Up @@ -154,7 +155,7 @@ impl Services {
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
if let Some(user_provider) = user_provider {
if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
}

Expand All @@ -181,12 +182,26 @@ impl Services {
None
};

let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options {
let promql_addr = parse_addr(&promql_options.addr)?;
waynexia marked this conversation as resolved.
Show resolved Hide resolved

let mut promql_server = PromqlServer::create_server(instance.clone());
if let Some(user_provider) = user_provider {
promql_server.set_user_provider(user_provider);
}

Some((promql_server as _, promql_addr))
} else {
None
};

try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr),
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr)
start_server(opentsdb_server_and_addr),
start_server(promql_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Server for GrpcServer {
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
info!("GRPC server is bound to {}", addr);
info!("gRPC server is bound to {}", addr);

*shutdown_tx = Some(tx);

Expand Down
Loading