diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d91ea42beb1d..42d69d8f0c31 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -20,8 +20,8 @@ use common_config::WalConfig; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; -use datanode::instance::InstanceRef; use datanode::region_server::RegionServer; +use datanode::Instance as InstanceRef; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::service_config::{ diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index c60e25906963..57d680e40411 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -18,7 +18,7 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use common_runtime::error::{Error, Result}; -use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction}; +use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction}; use common_telemetry::{debug, info}; use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; @@ -49,11 +49,11 @@ impl GreptimeDBTelemetryTask { GreptimeDBTelemetryTask::Disable } - pub fn start(&self, runtime: Runtime) -> Result<()> { + pub fn start(&self) -> Result<()> { match self { GreptimeDBTelemetryTask::Enable(task) => { print_anonymous_usage_data_disclaimer(); - task.start(runtime) + task.start(common_runtime::bg_runtime()) } GreptimeDBTelemetryTask::Disable => Ok(()), } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5bbed0a148d7..520ac9bca317 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -23,6 +23,7 @@ use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_config::WalConfig; use common_error::ext::BoxedError; +use common_greptimedb_telemetry::GreptimeDBTelemetryTask; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; use common_telemetry::info; @@ -52,6 +53,7 @@ use tokio::fs; use crate::error::{ CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, }; +use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; use crate::region_server::RegionServer; use crate::server::Services; @@ -399,6 +401,7 @@ pub struct Datanode { services: Option, heartbeat_task: Option, region_server: RegionServer, + greptimedb_telemetry_task: Arc, } impl Datanode { @@ -420,7 +423,7 @@ impl Datanode { .context(RuntimeResourceSnafu)?, ); - let mut region_server = RegionServer::new(query_engine, runtime); + let mut region_server = RegionServer::new(query_engine, runtime.clone()); let log_store = Self::build_log_store(&opts).await?; let object_store = store::new_object_store(&opts).await?; let engines = Self::build_store_engines(&opts, log_store, object_store).await?; @@ -439,12 +442,19 @@ impl Datanode { } Mode::Standalone => None, }; + let greptimedb_telemetry_task = get_greptimedb_telemetry_task( + Some(opts.storage.data_home.clone()), + &opts.mode, + opts.enable_telemetry, + ) + .await; Ok(Self { opts, services, heartbeat_task, region_server, + greptimedb_telemetry_task, }) } @@ -453,6 +463,7 @@ impl Datanode { if let Some(task) = &self.heartbeat_task { task.start().await?; } + let _ = self.greptimedb_telemetry_task.start(); self.start_services().await } @@ -476,6 +487,7 @@ impl Datanode { pub async fn shutdown(&self) -> Result<()> { // We must shutdown services first self.shutdown_services().await?; + let _ = self.greptimedb_telemetry_task.stop().await; if let Some(heartbeat_task) = &self.heartbeat_task { heartbeat_task .close() diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 3651419a88f9..fabc04024a03 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionStat}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -24,7 +25,8 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; -use meta_client::client::{HeartbeatSender, MetaClient}; +use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; +use meta_client::MetaClientOptions; use snafu::{OptionExt, ResultExt}; use tokio::sync::mpsc; use tokio::time::Instant; @@ -35,7 +37,6 @@ use crate::datanode::DatanodeOptions; use crate::error::{ self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result, }; -use crate::instance::new_metasrv_client; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -306,6 +307,39 @@ fn resolve_addr(bind_addr: &str, hostname_addr: &Option) -> String { } } +/// Create metasrv client instance and spawn heartbeat loop. +pub async fn new_metasrv_client( + node_id: u64, + meta_config: &MetaClientOptions, +) -> Result { + let cluster_id = 0; // TODO(hl): read from config + let member_id = node_id; + + let config = ChannelConfig::new() + .timeout(Duration::from_millis(meta_config.timeout_millis)) + .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) + .tcp_nodelay(meta_config.tcp_nodelay); + let channel_manager = ChannelManager::with_config(config); + + let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) + .enable_heartbeat() + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client + .start(&meta_config.metasrv_addrs) + .await + .context(MetaClientInitSnafu)?; + + // required only when the heartbeat_client is enabled + meta_client + .ask_leader() + .await + .context(MetaClientInitSnafu)?; + Ok(meta_client) +} + #[cfg(test)] mod tests { #[test] diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs deleted file mode 100644 index 9ce77c91a4d8..000000000000 --- a/src/datanode/src/instance.rs +++ /dev/null @@ -1,419 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use std::{fs, path}; - -use api::v1::meta::Role; -use catalog::local::MemoryCatalogManager; -use catalog::CatalogManagerRef; -use common_base::Plugins; -use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_config::WalConfig; -use common_error::ext::BoxedError; -use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; -use common_meta::heartbeat::handler::HandlerGroupExecutor; -use common_procedure::local::{LocalManager, ManagerConfig}; -use common_procedure::store::state_store::ObjectStateStore; -use common_procedure::ProcedureManagerRef; -use common_telemetry::logging::{debug, info}; -use file_table_engine::engine::immutable::ImmutableFileTableEngine; -use log_store::raft_engine::log_store::RaftEngineLogStore; -use meta_client::client::{MetaClient, MetaClientBuilder}; -use meta_client::MetaClientOptions; -use mito::config::EngineConfig as TableEngineConfig; -use mito::engine::MitoEngine; -use object_store::{util, ObjectStore}; -use query::query_engine::{QueryEngineFactory, QueryEngineRef}; -use servers::Mode; -use session::context::QueryContextBuilder; -use snafu::prelude::*; -use storage::compaction::{CompactionHandler, CompactionSchedulerRef}; -use storage::config::EngineConfig as StorageEngineConfig; -use storage::scheduler::{LocalScheduler, SchedulerConfig}; -use storage::EngineImpl; -use store_api::logstore::LogStore; -use store_api::path_utils::{CLUSTER_DIR, WAL_DIR}; -use table::engine::manager::MemoryTableEngineManager; -use table::engine::{TableEngine, TableEngineProcedureRef}; -use table::requests::FlushTableRequest; -use table::table::TableIdProviderRef; - -use crate::datanode::{DatanodeOptions, ProcedureConfig}; -use crate::error::{ - self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, - MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, - ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, -}; -use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; -use crate::heartbeat::HeartbeatTask; -use crate::row_inserter::RowInserter; -use crate::sql::{SqlHandler, SqlRequest}; -use crate::store; - -mod grpc; -pub mod sql; - -pub(crate) type DefaultEngine = MitoEngine>; - -// An abstraction to read/write services. -pub struct Instance { - pub(crate) query_engine: QueryEngineRef, - pub(crate) sql_handler: SqlHandler, - pub(crate) catalog_manager: CatalogManagerRef, - pub(crate) table_id_provider: Option, - row_inserter: RowInserter, - procedure_manager: ProcedureManagerRef, - greptimedb_telemetry_task: Arc, -} - -pub type InstanceRef = Arc; - -impl Instance { - pub async fn with_opts( - opts: &DatanodeOptions, - plugins: Arc, - ) -> Result<(InstanceRef, Option)> { - let meta_client = match opts.mode { - Mode::Standalone => None, - Mode::Distributed => { - let meta_client = new_metasrv_client( - opts.node_id.context(MissingNodeIdSnafu)?, - opts.meta_client_options - .as_ref() - .context(MissingMetasrvOptsSnafu)?, - ) - .await?; - Some(Arc::new(meta_client)) - } - }; - - let compaction_scheduler = create_compaction_scheduler(opts); - - Self::new(opts, meta_client, compaction_scheduler, plugins).await - } - - fn build_heartbeat_task( - opts: &DatanodeOptions, - meta_client: Option>, - ) -> Result> { - Ok(match opts.mode { - Mode::Standalone => None, - Mode::Distributed => { - let _node_id = opts.node_id.context(MissingNodeIdSnafu)?; - let _meta_client = meta_client.context(IncorrectInternalStateSnafu { - state: "meta client is not provided when building heartbeat task", - })?; - let _handlers_executor = - HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler)]); - - todo!("remove this method") - } - }) - } - - pub(crate) async fn new( - opts: &DatanodeOptions, - meta_client: Option>, - compaction_scheduler: CompactionSchedulerRef, - plugins: Arc, - ) -> Result<(InstanceRef, Option)> { - let data_home = util::normalize_dir(&opts.storage.data_home); - info!("The working home directory is: {}", data_home); - let object_store = store::new_object_store(opts).await?; - let log_store = Arc::new(create_log_store(&data_home, opts.wal.clone()).await?); - - let mito_engine = Arc::new(DefaultEngine::new( - TableEngineConfig { - compress_manifest: opts.storage.manifest.compress, - }, - EngineImpl::new( - StorageEngineConfig::from(opts), - log_store.clone(), - object_store.clone(), - compaction_scheduler, - ) - .unwrap(), - object_store.clone(), - )); - - let immutable_file_engine = Arc::new(ImmutableFileTableEngine::new( - file_table_engine::config::EngineConfig::default(), - object_store.clone(), - )); - - let engine_procedures = HashMap::from([ - ( - mito_engine.name().to_string(), - mito_engine.clone() as TableEngineProcedureRef, - ), - ( - immutable_file_engine.name().to_string(), - immutable_file_engine.clone() as TableEngineProcedureRef, - ), - ]); - let engine_manager = Arc::new( - MemoryTableEngineManager::with(vec![ - mito_engine.clone(), - immutable_file_engine.clone(), - ]) - .with_engine_procedures(engine_procedures), - ); - - // create remote catalog manager - let (catalog_manager, table_id_provider) = match opts.mode { - Mode::Standalone => { - let catalog = Arc::new( - catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) - .await - .context(CatalogSnafu)?, - ); - - ( - catalog.clone() as CatalogManagerRef, - Some(catalog as TableIdProviderRef), - ) - } - - Mode::Distributed => ( - MemoryCatalogManager::with_default_setup() as CatalogManagerRef, - None, - ), - }; - - let factory = - QueryEngineFactory::new_with_plugins(catalog_manager.clone(), None, false, plugins); - let query_engine = factory.query_engine(); - let procedure_manager = create_procedure_manager( - opts.node_id.unwrap_or(0), - &ProcedureConfig::default(), - object_store, - ) - .await?; - let sql_handler = SqlHandler::new( - engine_manager.clone(), - catalog_manager.clone(), - procedure_manager.clone(), - ); - // Register all procedures. - // Register procedures of the mito engine. - mito_engine.register_procedure_loaders(&*procedure_manager); - // Register procedures of the file table engine. - immutable_file_engine.register_procedure_loaders(&*procedure_manager); - // Register procedures in table-procedure crate. - table_procedure::register_procedure_loaders( - catalog_manager.clone(), - mito_engine.clone(), - mito_engine.clone(), - &*procedure_manager, - ); - let row_inserter = RowInserter::new(catalog_manager.clone()); - let greptimedb_telemetry_task = get_greptimedb_telemetry_task( - Some(opts.storage.data_home.clone()), - &opts.mode, - opts.enable_telemetry, - ) - .await; - - let instance = Arc::new(Self { - query_engine: query_engine.clone(), - sql_handler, - catalog_manager: catalog_manager.clone(), - table_id_provider, - row_inserter, - procedure_manager, - greptimedb_telemetry_task, - }); - - let heartbeat_task = Instance::build_heartbeat_task(opts, meta_client)?; - - Ok((instance, heartbeat_task)) - } - - pub async fn start(&self) -> Result<()> { - self.catalog_manager - .start() - .await - .context(NewCatalogSnafu)?; - - // Recover procedures after the catalog manager is started, so we can - // ensure we can access all tables from the catalog manager. - self.procedure_manager - .recover() - .await - .context(RecoverProcedureSnafu)?; - self.procedure_manager - .start() - .context(StartProcedureManagerSnafu)?; - let _ = self - .greptimedb_telemetry_task - .start(common_runtime::bg_runtime()) - .map_err(|e| { - debug!("Failed to start greptimedb telemetry task: {}", e); - }); - - Ok(()) - } - - pub async fn shutdown(&self) -> Result<()> { - self.procedure_manager - .stop() - .await - .context(StopProcedureManagerSnafu)?; - - self.flush_tables().await?; - - self.sql_handler - .close() - .await - .map_err(BoxedError::new) - .context(ShutdownInstanceSnafu) - } - - pub async fn flush_tables(&self) -> Result<()> { - info!("going to flush all schemas under {DEFAULT_CATALOG_NAME}"); - let schema_list = self - .catalog_manager - .schema_names(DEFAULT_CATALOG_NAME) - .await - .map_err(BoxedError::new) - .context(ShutdownInstanceSnafu)?; - let flush_requests = schema_list - .into_iter() - .map(|schema_name| { - SqlRequest::FlushTable(FlushTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name, - table_name: None, - region_number: None, - wait: Some(true), - }) - }) - .collect::>(); - let flush_result = - futures::future::try_join_all(flush_requests.into_iter().map(|request| { - self.sql_handler - .execute(request, QueryContextBuilder::default().build()) - })) - .await - .map_err(BoxedError::new) - .context(ShutdownInstanceSnafu); - info!("Flushed all tables result: {}", flush_result.is_ok()); - let _ = flush_result?; - - Ok(()) - } - - pub fn sql_handler(&self) -> &SqlHandler { - &self.sql_handler - } - - pub fn catalog_manager(&self) -> &CatalogManagerRef { - &self.catalog_manager - } - - pub fn query_engine(&self) -> QueryEngineRef { - self.query_engine.clone() - } -} - -fn create_compaction_scheduler(opts: &DatanodeOptions) -> CompactionSchedulerRef { - let config = SchedulerConfig::from(opts); - let handler = CompactionHandler::default(); - let scheduler = LocalScheduler::new(config, handler); - Arc::new(scheduler) -} - -/// Create metasrv client instance and spawn heartbeat loop. -pub async fn new_metasrv_client( - node_id: u64, - meta_config: &MetaClientOptions, -) -> Result { - let cluster_id = 0; // TODO(hl): read from config - let member_id = node_id; - - let config = ChannelConfig::new() - .timeout(Duration::from_millis(meta_config.timeout_millis)) - .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) - .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(config); - - let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) - .enable_heartbeat() - .enable_router() - .enable_store() - .channel_manager(channel_manager) - .build(); - meta_client - .start(&meta_config.metasrv_addrs) - .await - .context(MetaClientInitSnafu)?; - - // required only when the heartbeat_client is enabled - meta_client - .ask_leader() - .await - .context(MetaClientInitSnafu)?; - Ok(meta_client) -} - -pub(crate) async fn create_log_store( - data_home: &str, - wal_config: WalConfig, -) -> Result { - let wal_dir = format!("{}{WAL_DIR}", data_home); - - // create WAL directory - fs::create_dir_all(path::Path::new(&wal_dir)) - .context(error::CreateDirSnafu { dir: &wal_dir })?; - info!( - "Creating logstore with config: {:?} and storage path: {}", - wal_config, &wal_dir - ); - let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config) - .await - .map_err(Box::new) - .context(OpenLogStoreSnafu)?; - Ok(logstore) -} - -pub(crate) async fn create_procedure_manager( - datanode_id: u64, - procedure_config: &ProcedureConfig, - object_store: ObjectStore, -) -> Result { - info!( - "Creating procedure manager with config: {:?}", - procedure_config - ); - - let state_store = Arc::new(ObjectStateStore::new(object_store)); - - let dn_store_path = format!("{CLUSTER_DIR}dn-{datanode_id}/"); - - info!("The datanode internal storage path is: {}", dn_store_path); - - let manager_config = ManagerConfig { - parent_path: dn_store_path, - max_retry_times: procedure_config.max_retry_times, - retry_delay: procedure_config.retry_delay, - ..Default::default() - }; - - Ok(Arc::new(LocalManager::new(manager_config, state_store))) -} diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 5dda39a6019c..44bf15fb965c 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -225,56 +225,6 @@ impl Instance { } } -// TODO(LFC): Refactor consideration: move this function to some helper mod, -// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. -/// Converts maybe fully-qualified table name (`..`) to tuple. -pub fn table_idents_to_full_name( - obj_name: &ObjectName, - query_ctx: QueryContextRef, -) -> Result<(String, String, String)> { - match &obj_name.0[..] { - [table] => Ok(( - query_ctx.current_catalog().to_owned(), - query_ctx.current_schema().to_owned(), - table.value.clone(), - )), - [schema, table] => Ok(( - query_ctx.current_catalog().to_owned(), - schema.value.clone(), - table.value.clone(), - )), - [catalog, schema, table] => Ok(( - catalog.value.clone(), - schema.value.clone(), - table.value.clone(), - )), - _ => error::InvalidSqlSnafu { - msg: format!( - "expect table name to be ..
, .
or
, actual: {obj_name}", - ), - }.fail(), - } -} - -pub fn idents_to_full_database_name( - obj_name: &ObjectName, - query_ctx: &QueryContextRef, -) -> Result<(String, String)> { - match &obj_name.0[..] { - [database] => Ok(( - query_ctx.current_catalog().to_owned(), - database.value.clone(), - )), - [catalog, database] => Ok((catalog.value.clone(), database.value.clone())), - _ => error::InvalidSqlSnafu { - msg: format!( - "expect database name to be ., , found: {obj_name}", - ), - } - .fail(), - } -} - #[async_trait] impl SqlStatementExecutor for Instance { async fn execute_sql( diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 9e04bf59bf4b..50e4ff278c52 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -15,20 +15,31 @@ #![feature(assert_matches)] #![feature(trait_upcasting)] +use query::query_engine::SqlStatementExecutor; + pub mod alive_keeper; pub mod datanode; pub mod error; mod greptimedb_telemetry; pub mod heartbeat; -pub mod instance; pub mod metrics; -#[cfg(any(test, feature = "testing"))] -mod mock; pub mod region_server; -mod row_inserter; pub mod server; -pub mod sql; mod store; #[cfg(test)] #[allow(dead_code)] mod tests; + +// TODO(ruihang): remove this +pub struct Instance; + +#[async_trait::async_trait] +impl SqlStatementExecutor for Instance { + async fn execute_sql( + &self, + _stmt: sql::statements::statement::Statement, + _query_ctx: session::context::QueryContextRef, + ) -> query::error::Result { + unreachable!() + } +} diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs deleted file mode 100644 index 4114ae0e688c..000000000000 --- a/src/datanode/src/mock.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use api::v1::meta::Role; -use meta_client::client::{MetaClient, MetaClientBuilder}; -use meta_srv::mocks::MockInfo; -use storage::compaction::noop::NoopCompactionScheduler; - -use crate::datanode::DatanodeOptions; -use crate::error::Result; -use crate::heartbeat::HeartbeatTask; -use crate::instance::{Instance, InstanceRef}; - -impl Instance { - pub async fn with_mock_meta_client( - opts: &DatanodeOptions, - ) -> Result<(InstanceRef, Option)> { - let mock_info = meta_srv::mocks::mock_with_memstore().await; - Self::with_mock_meta_server(opts, mock_info).await - } - - pub async fn with_mock_meta_server( - opts: &DatanodeOptions, - meta_srv: MockInfo, - ) -> Result<(InstanceRef, Option)> { - let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); - let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - Instance::new( - opts, - Some(meta_client), - compaction_scheduler, - Default::default(), - ) - .await - } -} - -async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient { - let MockInfo { - server_addr, - channel_manager, - .. - } = mock_info; - - let id = (1000u64, 2000u64); - let mut meta_client = MetaClientBuilder::new(id.0, node_id, Role::Datanode) - .enable_heartbeat() - .enable_router() - .enable_store() - .channel_manager(channel_manager) - .build(); - meta_client.start(&[&server_addr]).await.unwrap(); - // // required only when the heartbeat_client is enabled - meta_client.ask_leader().await.unwrap(); - - meta_client -} diff --git a/src/datanode/src/row_inserter.rs b/src/datanode/src/row_inserter.rs deleted file mode 100644 index 8b2fde825d2c..000000000000 --- a/src/datanode/src/row_inserter.rs +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::helper; -use api::helper::ColumnDataTypeWrapper; -use api::v1::{RowInsertRequest, RowInsertRequests}; -use catalog::CatalogManagerRef; -use common_query::Output; -use datatypes::data_type::{ConcreteDataType, DataType}; -use futures_util::future; -use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; -use table::requests::InsertRequest; - -use crate::error::{ - CatalogSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, InsertSnafu, InvalidInsertRowLenSnafu, - JoinTaskSnafu, Result, TableNotFoundSnafu, -}; - -pub struct RowInserter { - catalog_manager: CatalogManagerRef, -} - -impl RowInserter { - pub fn new(catalog_manager: CatalogManagerRef) -> Self { - Self { catalog_manager } - } - - pub async fn handle_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> Result { - let insert_tasks = requests.inserts.into_iter().map(|insert| { - let catalog_manager = self.catalog_manager.clone(); - let catalog_name = ctx.current_catalog().to_owned(); - let schema_name = ctx.current_schema().to_owned(); - let table_name = insert.table_name.clone(); - - let insert_task = async move { - let Some(request) = - convert_to_table_insert_request(&catalog_name, &schema_name, insert)? - else { - // empty data - return Ok(0usize); - }; - - let table = catalog_manager - .table(&catalog_name, &schema_name, &table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format!("{catalog_name}.{schema_name}.{table_name}"), - })?; - - table.insert(request).await.with_context(|_| InsertSnafu { - table_name: format!("{catalog_name}.{schema_name}.{table_name}"), - }) - }; - - common_runtime::spawn_write(insert_task) - }); - - let results = future::try_join_all(insert_tasks) - .await - .context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - - Ok(Output::AffectedRows(affected_rows)) - } -} - -fn convert_to_table_insert_request( - catalog_name: &str, - schema_name: &str, - request: RowInsertRequest, -) -> Result> { - let table_name = request.table_name; - let region_number = request.region_number; - let Some(rows) = request.rows else { - return Ok(None); - }; - let schema = rows.schema; - let rows = rows.rows; - let num_columns = schema.len(); - let num_rows = rows.len(); - - if num_rows == 0 || num_columns == 0 { - return Ok(None); - } - - let mut columns_values = Vec::with_capacity(num_columns); - for column_schema in schema { - let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(column_schema.datatype) - .context(ColumnDataTypeSnafu)? - .into(); - let mutable_vector = datatype.create_mutable_vector(num_rows); - columns_values.push((column_schema.column_name, mutable_vector)); - } - - for row in rows { - ensure!( - row.values.len() == num_columns, - InvalidInsertRowLenSnafu { - table_name: format!("{catalog_name}.{schema_name}.{table_name}"), - expected: num_columns, - actual: row.values.len(), - } - ); - - for ((_, mutable_vector), value) in columns_values.iter_mut().zip(row.values.iter()) { - mutable_vector - .try_push_value_ref(helper::pb_value_to_value_ref(value)) - .context(CreateVectorSnafu)?; - } - } - - let columns_values = columns_values - .into_iter() - .map(|(k, mut v)| (k, v.to_vector())) - .collect(); - - let insert_request = InsertRequest { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name, - columns_values, - region_number, - }; - - Ok(Some(insert_request)) -} diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 35faeb2a877c..41ca6365cc52 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -28,8 +28,6 @@ use crate::error::{ }; use crate::region_server::RegionServer; -pub mod grpc; - /// All rpc services. pub struct Services { grpc_server: GrpcServer, diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs deleted file mode 100644 index 12ddecca14d5..000000000000 --- a/src/datanode/src/server/grpc.rs +++ /dev/null @@ -1,321 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; -use common_catalog::consts::IMMUTABLE_FILE_ENGINE; -use common_catalog::format_full_table_name; -use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; -use common_query::Output; -use common_telemetry::info; -use session::context::QueryContextRef; -use snafu::prelude::*; -use table::requests::{DropTableRequest, TruncateTableRequest}; - -use crate::error::{ - AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu, - IncorrectInternalStateSnafu, Result, TableNotFoundSnafu, -}; -use crate::instance::Instance; -use crate::sql::SqlRequest; - -impl Instance { - /// Handle gRPC create table requests. - pub(crate) async fn handle_create( - &self, - expr: CreateTableExpr, - ctx: QueryContextRef, - ) -> Result { - let table_name = format!( - "{}.{}.{}", - expr.catalog_name, expr.schema_name, expr.table_name - ); - - // TODO(LFC): Revisit table id related feature, add more tests. - // Also merge this mod with mod instance::grpc. - - // Respect CreateExpr's table id and region ids if present, or allocate table id - // from local table id provider and set region id to 0. - let table_id = if let Some(table_id) = &expr.table_id { - info!( - "Creating table {table_name} with table id {} from Frontend", - table_id.id - ); - table_id.id - } else { - let provider = - self.table_id_provider - .as_ref() - .context(IncorrectInternalStateSnafu { - state: "Table id provider absent in standalone mode", - })?; - let table_id = provider.next_table_id().await.context(BumpTableIdSnafu)?; - info!("Creating table {table_name} with table id {table_id} from TableIdProvider"); - table_id - }; - - let require_time_index = expr.engine != IMMUTABLE_FILE_ENGINE; - let request = create_expr_to_request(table_id, expr, require_time_index) - .context(CreateExprToRequestSnafu)?; - - self.sql_handler() - .execute(SqlRequest::CreateTable(request), ctx) - .await - } - - pub(crate) async fn handle_alter( - &self, - expr: AlterExpr, - ctx: QueryContextRef, - ) -> Result { - let table_id = self - .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - ), - })? - .table_info() - .table_id(); - - let request = alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?; - self.sql_handler() - .execute(SqlRequest::Alter(request), ctx) - .await - } - - pub(crate) async fn handle_drop_table( - &self, - expr: DropTableExpr, - ctx: QueryContextRef, - ) -> Result { - let table = self - .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - ), - })?; - - let req = DropTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - table_id: table.table_info().ident.table_id, - }; - self.sql_handler() - .execute(SqlRequest::DropTable(req), ctx) - .await - } - - pub(crate) async fn handle_truncate_table( - &self, - expr: TruncateTableExpr, - ctx: QueryContextRef, - ) -> Result { - let table = self - .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - ), - })?; - - let req = TruncateTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - table_id: table.table_info().ident.table_id, - }; - self.sql_handler() - .execute(SqlRequest::TruncateTable(req), ctx) - .await - } -} - -#[cfg(test)] -mod tests { - use api::v1::{column_def, ColumnDataType, ColumnDef, SemanticType, TableId}; - use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; - use common_grpc_expr::create_table_schema; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; - use datatypes::value::Value; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_create_expr_to_request() { - common_telemetry::init_default_ut_logging(); - let expr = testing_create_expr(); - let request = create_expr_to_request(1024, expr, true).unwrap(); - assert_eq!(request.id, MIN_USER_TABLE_ID); - assert_eq!(request.catalog_name, "greptime".to_string()); - assert_eq!(request.schema_name, "public".to_string()); - assert_eq!(request.table_name, "my-metrics"); - assert_eq!(request.desc, Some("blabla little magic fairy".to_string())); - assert_eq!(request.schema, expected_table_schema()); - assert_eq!(request.primary_key_indices, vec![1, 0]); - assert!(request.create_if_not_exists); - - let mut expr = testing_create_expr(); - expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = create_expr_to_request(1025, expr, true); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), - "{}", - err_msg - ); - } - - #[test] - fn test_create_table_schema() { - let mut expr = testing_create_expr(); - let schema = create_table_schema(&expr, true).unwrap(); - assert_eq!(schema, expected_table_schema()); - - expr.time_index = "not-exist-column".to_string(); - let result = create_table_schema(&expr, true); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Missing timestamp column"), - "actual: {err_msg}", - ); - } - - #[test] - fn test_create_column_schema() { - let column_def = ColumnDef { - name: "a".to_string(), - data_type: 1024, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - }; - let result = column_def::try_as_column_schema(&column_def); - assert!(matches!( - result.unwrap_err(), - api::error::Error::UnknownColumnDataType { .. } - )); - - let column_def = ColumnDef { - name: "a".to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - }; - let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - - let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); - let column_def = ColumnDef { - name: "a".to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: default_constraint.clone().try_into().unwrap(), - semantic_type: SemanticType::Tag as i32, - }; - let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - assert_eq!( - default_constraint, - *column_schema.default_constraint().unwrap() - ); - } - - fn testing_create_expr() -> CreateTableExpr { - let column_defs = vec![ - ColumnDef { - name: "host".to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - }, - ColumnDef { - name: "ts".to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - }, - ColumnDef { - name: "cpu".to_string(), - data_type: ColumnDataType::Float32 as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - }, - ColumnDef { - name: "memory".to_string(), - data_type: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - }, - ]; - CreateTableExpr { - catalog_name: "".to_string(), - schema_name: "".to_string(), - table_name: "my-metrics".to_string(), - desc: "blabla little magic fairy".to_string(), - column_defs, - time_index: "ts".to_string(), - primary_keys: vec!["ts".to_string(), "host".to_string()], - create_if_not_exists: true, - table_options: Default::default(), - table_id: Some(TableId { - id: MIN_USER_TABLE_ID, - }), - region_numbers: vec![0], - engine: MITO_ENGINE.to_string(), - } - } - - fn expected_table_schema() -> RawSchema { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ]; - - RawSchema::new(column_schemas) - } -} diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs deleted file mode 100644 index 0f01d3b3cb16..000000000000 --- a/src/datanode/src/sql.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use catalog::CatalogManagerRef; -use common_error::ext::BoxedError; -use common_procedure::ProcedureManagerRef; -use common_query::Output; -use common_telemetry::error; -use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; -use table::engine::manager::TableEngineManagerRef; -use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference}; -use table::requests::*; -use table::TableRef; - -use crate::error::{ - self, CloseTableEngineSnafu, EngineProcedureNotFoundSnafu, Result, TableEngineNotFoundSnafu, - TableNotFoundSnafu, -}; -use crate::instance::sql::table_idents_to_full_name; - -mod alter; -mod compact_table; -mod create; -mod create_external; -mod drop_table; -mod flush_table; -pub(crate) mod insert; -mod truncate_table; - -#[derive(Debug)] -pub enum SqlRequest { - CreateTable(CreateTableRequest), - CreateDatabase(CreateDatabaseRequest), - Alter(AlterTableRequest), - DropTable(DropTableRequest), - FlushTable(FlushTableRequest), - CompactTable(CompactTableRequest), - TruncateTable(TruncateTableRequest), -} - -// Handler to execute SQL except query -#[derive(Clone)] -pub struct SqlHandler { - table_engine_manager: TableEngineManagerRef, - catalog_manager: CatalogManagerRef, - procedure_manager: ProcedureManagerRef, -} - -impl SqlHandler { - pub fn new( - table_engine_manager: TableEngineManagerRef, - catalog_manager: CatalogManagerRef, - procedure_manager: ProcedureManagerRef, - ) -> Self { - Self { - table_engine_manager, - catalog_manager, - procedure_manager, - } - } - - pub async fn execute(&self, request: SqlRequest, query_ctx: QueryContextRef) -> Result { - let result = match request { - SqlRequest::CreateTable(req) => self.create_table(req).await, - SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await, - SqlRequest::Alter(req) => self.alter_table(req).await, - SqlRequest::DropTable(req) => self.drop_table(req).await, - SqlRequest::FlushTable(req) => self.flush_table(req).await, - SqlRequest::CompactTable(req) => self.compact_table(req).await, - SqlRequest::TruncateTable(req) => self.truncate_table(req).await, - }; - if let Err(e) = &result { - error!(e; "{query_ctx}"); - } - result - } - - pub async fn get_table(&self, table_ref: &TableReference<'_>) -> Result { - let TableReference { - catalog, - schema, - table, - } = table_ref; - let table = self - .catalog_manager - .table(catalog, schema, table) - .await - .context(error::CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - Ok(table) - } - - pub fn table_engine_manager(&self) -> &TableEngineManagerRef { - &self.table_engine_manager - } - - pub fn catalog_manager(&self) -> &CatalogManagerRef { - &self.catalog_manager - } - - pub fn table_engine(&self, table: TableRef) -> Result { - let engine_name = &table.table_info().meta.engine; - let engine = self - .table_engine_manager - .engine(engine_name) - .context(TableEngineNotFoundSnafu { engine_name })?; - Ok(engine) - } - - pub fn engine_procedure(&self, table: TableRef) -> Result { - let engine_name = &table.table_info().meta.engine; - let engine = self - .table_engine_manager - .engine_procedure(engine_name) - .context(EngineProcedureNotFoundSnafu { engine_name })?; - Ok(engine) - } - - pub async fn close(&self) -> Result<()> { - self.table_engine_manager - .close() - .await - .map_err(BoxedError::new) - .context(CloseTableEngineSnafu) - } -} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 5969afb32049..3f22900842ed 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -15,10 +15,7 @@ use std::any::Any; use std::sync::Arc; -use api::v1::greptime_request::Request as GrpcRequest; use api::v1::meta::HeartbeatResponse; -use api::v1::query_request::Query; -use api::v1::QueryRequest; use async_trait::async_trait; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::FunctionRef; @@ -31,43 +28,33 @@ use common_meta::instruction::{Instruction, InstructionReply, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; use common_runtime::Runtime; -use datatypes::prelude::ConcreteDataType; use query::dataframe::DataFrame; use query::plan::LogicalPlan; use query::planner::LogicalPlanner; use query::query_engine::DescribeResult; use query::QueryEngine; -use servers::query_handler::grpc::GrpcQueryHandler; -use session::context::{QueryContext, QueryContextRef}; -use table::engine::manager::TableEngineManagerRef; +use session::context::QueryContextRef; use table::TableRef; -use test_util::MockInstance; use tokio::sync::mpsc::{self, Receiver}; -use crate::instance::Instance; use crate::region_server::RegionServer; - -pub(crate) mod test_util; +use crate::Instance; struct HandlerTestGuard { - instance: MockInstance, + instance: Instance, mailbox: Arc, rx: Receiver<(MessageMeta, InstructionReply)>, - engine_manager_ref: TableEngineManagerRef, } -async fn prepare_handler_test(name: &str) -> HandlerTestGuard { - let mock_instance = MockInstance::new(name).await; - let instance = mock_instance.inner(); - let engine_manager = instance.sql_handler().table_engine_manager().clone(); +async fn prepare_handler_test(_name: &str) -> HandlerTestGuard { + let instance = Instance; let (tx, rx) = mpsc::channel(8); let mailbox = Arc::new(HeartbeatMailbox::new(tx)); HandlerTestGuard { - instance: mock_instance, + instance, mailbox, rx, - engine_manager_ref: engine_manager, } } @@ -122,43 +109,6 @@ fn open_region_instruction() -> Instruction { }) } -async fn prepare_table(instance: &Instance) -> TableRef { - test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) - .await - .unwrap() -} - -async fn assert_test_table_not_found(instance: &Instance) { - let query = GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql( - "INSERT INTO demo(host, cpu, memory, ts) VALUES \ - ('host1', 66.6, 1024, 1672201025000),\ - ('host2', 88.8, 333.3, 1672201026000)" - .to_string(), - )), - }); - let output = instance - .do_query(query, QueryContext::arc()) - .await - .unwrap_err(); - - assert_eq!(output.to_string(), "Failed to execute sql, source: Failure during query execution, source: Table not found: greptime.public.demo"); -} - -async fn assert_test_table_found(instance: &Instance) { - let query = GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql( - "INSERT INTO demo(host, cpu, memory, ts) VALUES \ - ('host1', 66.6, 1024, 1672201025000),\ - ('host2', 88.8, 333.3, 1672201026000)" - .to_string(), - )), - }); - let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); - - assert!(matches!(output, Output::AffectedRows(2))); -} - pub struct MockQueryEngine; #[async_trait] diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs deleted file mode 100644 index 28b597721bed..000000000000 --- a/src/datanode/src/tests/test_util.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use catalog::RegisterTableRequest; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, -}; -use common_config::WalConfig; -use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; -use servers::Mode; -use snafu::ResultExt; -use table::engine::{EngineContext, TableEngineRef}; -use table::requests::{CreateTableRequest, TableOptions}; -use table::TableRef; - -use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig}; -use crate::error::{CreateTableSnafu, Result}; -use crate::heartbeat::HeartbeatTask; -use crate::instance::{Instance, InstanceRef}; - -pub(crate) struct MockInstance { - instance: InstanceRef, - _heartbeat: Option, - _guard: TestGuard, -} - -impl MockInstance { - pub(crate) async fn new(name: &str) -> Self { - let (opts, _guard) = create_tmp_dir_and_datanode_opts(name); - - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); - instance.start().await.unwrap(); - if let Some(task) = heartbeat.as_ref() { - task.start().await.unwrap(); - } - - MockInstance { - instance, - _guard, - _heartbeat: heartbeat, - } - } - - pub(crate) fn inner(&self) -> &Instance { - &self.instance - } -} - -struct TestGuard { - _wal_tmp_dir: TempDir, - _data_tmp_dir: TempDir, -} - -fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { - let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); - let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); - let opts = DatanodeOptions { - wal: WalConfig::default(), - storage: StorageConfig { - data_home: data_tmp_dir.path().to_str().unwrap().to_string(), - store: ObjectStoreConfig::File(FileConfig {}), - ..Default::default() - }, - mode: Mode::Standalone, - ..Default::default() - }; - ( - opts, - TestGuard { - _wal_tmp_dir: wal_tmp_dir, - _data_tmp_dir: data_tmp_dir, - }, - ) -} - -pub(crate) async fn create_test_table( - instance: &Instance, - ts_type: ConcreteDataType, -) -> Result { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), true), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ts_type, true).with_time_index(true), - ]; - - let table_name = "demo"; - let table_engine: TableEngineRef = instance - .sql_handler() - .table_engine_manager() - .engine(MITO_ENGINE) - .unwrap(); - let table = table_engine - .create_table( - &EngineContext::default(), - CreateTableRequest { - id: MIN_USER_TABLE_ID, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - desc: Some(" a test table".to_string()), - schema: RawSchema::new(column_schemas), - create_if_not_exists: true, - primary_key_indices: vec![0], // "host" is in primary keys - table_options: TableOptions::default(), - region_numbers: vec![0], - engine: MITO_ENGINE.to_string(), - }, - ) - .await - .context(CreateTableSnafu { table_name })?; - - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id: table.table_info().ident.table_id, - table: table.clone(), - }; - let _ = instance.catalog_manager.register_table(req).await.unwrap(); - Ok(table) -} diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 65bad120dc99..e74d91ff9fa8 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -22,7 +22,6 @@ use api::v1::{ }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; -use datanode::instance::sql::table_idents_to_full_name; use datatypes::schema::ColumnSchema; use file_table_engine::table::immutable::ImmutableFileTableOptions; use query::sql::prepare_immutable_file_table_files_and_schema; @@ -41,6 +40,7 @@ use crate::error::{ EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareImmutableTableSnafu, Result, UnrecognizedTableOptionSnafu, }; +use crate::table::table_idents_to_full_name; #[derive(Debug, Copy, Clone)] pub struct CreateExprFactory; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 41f118a5ff3d..065c97756a62 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -42,9 +42,8 @@ use common_meta::key::TableMetadataManager; use common_query::Output; use common_telemetry::logging::info; use common_telemetry::{error, timer}; -use datanode::instance::sql::table_idents_to_full_name; -use datanode::instance::InstanceRef as DnInstanceRef; use datanode::region_server::RegionServer; +use datanode::Instance as DnInstanceRef; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; use partition::manager::PartitionRuleManager; @@ -75,7 +74,6 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; use self::distributed::DistRegionRequestHandler; -use self::standalone::StandaloneRegionRequestHandler; use crate::catalog::FrontendCatalogManager; use crate::delete::Deleter; use crate::error::{ @@ -87,11 +85,11 @@ use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; use crate::insert::Inserter; -use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; use crate::statement::StatementExecutor; +use crate::table::table_idents_to_full_name; #[async_trait] pub trait FrontendInstance: @@ -259,37 +257,38 @@ impl Instance { } pub async fn try_new_standalone( - dn_instance: DnInstanceRef, - region_server: RegionServer, + _dn_instance: DnInstanceRef, + _region_server: RegionServer, ) -> Result { - let catalog_manager = dn_instance.catalog_manager(); - let query_engine = dn_instance.query_engine(); - let script_executor = - Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - - let region_request_handler = StandaloneRegionRequestHandler::arc(region_server); - let statement_executor = Arc::new(StatementExecutor::new( - catalog_manager.clone(), - query_engine.clone(), - dn_instance.clone(), - region_request_handler.clone(), - )); - - let create_expr_factory = CreateExprFactory; - let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); - - Ok(Instance { - catalog_manager: catalog_manager.clone(), - script_executor, - create_expr_factory, - statement_executor, - query_engine, - grpc_query_handler, - region_request_handler, - plugins: Default::default(), - servers: Arc::new(HashMap::new()), - heartbeat_task: None, - }) + todo!() + // let catalog_manager = dn_instance.catalog_manager(); + // let query_engine = dn_instance.query_engine(); + // let script_executor = + // Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + + // let region_request_handler = StandaloneRegionRequestHandler::arc(region_server); + // let statement_executor = Arc::new(StatementExecutor::new( + // catalog_manager.clone(), + // query_engine.clone(), + // dn_instance.clone(), + // region_request_handler.clone(), + // )); + + // let create_expr_factory = CreateExprFactory; + // let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); + + // Ok(Instance { + // catalog_manager: catalog_manager.clone(), + // script_executor, + // create_expr_factory, + // statement_executor, + // query_engine, + // grpc_query_handler, + // region_request_handler, + // plugins: Default::default(), + // servers: Arc::new(HashMap::new()), + // heartbeat_task: None, + // }) } pub async fn build_servers(&mut self, opts: &FrontendOptions) -> Result<()> { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index f4452d5447ec..3e49dae2b189 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -42,7 +42,6 @@ use common_meta::table_name::TableName; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::info; -use datanode::instance::sql::table_idents_to_full_name; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use partition::manager::PartitionInfo; @@ -72,7 +71,7 @@ use crate::error::{ use crate::expr_factory; use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; -use crate::table::DistTable; +use crate::table::{table_idents_to_full_name, DistTable}; const MAX_VALUE: &str = "MAXVALUE"; diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 6d151b75f49d..4710d1c651e8 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -35,12 +35,6 @@ use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result}; pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef); -impl StandaloneGrpcQueryHandler { - pub(crate) fn arc(handler: GrpcQueryHandlerRef) -> Arc { - Arc::new(Self(handler)) - } -} - #[async_trait] impl GrpcQueryHandler for StandaloneGrpcQueryHandler { type Error = Error; @@ -58,6 +52,7 @@ pub(crate) struct StandaloneRegionRequestHandler { } impl StandaloneRegionRequestHandler { + #[allow(dead_code)] pub fn arc(region_server: RegionServer) -> Arc { Arc::new(Self { region_server }) } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index a98244af5b1a..f45f79bdba07 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -30,7 +30,6 @@ use common_error::ext::BoxedError; use common_query::Output; use common_time::range::TimestampRange; use common_time::Timestamp; -use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name}; use query::parser::QueryStatement; use query::plan::LogicalPlan; use query::query_engine::SqlStatementExecutorRef; @@ -39,6 +38,7 @@ use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; +use sqlparser::ast::ObjectName; use table::engine::TableReference; use table::requests::{ CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest, @@ -47,10 +47,11 @@ use table::TableRef; use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, - PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, + InvalidSqlSnafu, PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, }; use crate::req_convert::{delete, insert}; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; +use crate::table::table_idents_to_full_name; #[derive(Clone)] pub struct StatementExecutor { @@ -289,3 +290,22 @@ fn extract_timestamp(map: &HashMap, key: &str) -> Result Result<(String, String)> { + match &obj_name.0[..] { + [database] => Ok(( + query_ctx.current_catalog().to_owned(), + database.value.clone(), + )), + [catalog, database] => Ok((catalog.value.clone(), database.value.clone())), + _ => InvalidSqlSnafu { + err_msg: format!( + "expect database name to be ., , found: {obj_name}", + ), + } + .fail(), + } +} diff --git a/src/frontend/src/statement/describe.rs b/src/frontend/src/statement/describe.rs index 7ad3ccbecdf6..0c19dc15b57c 100644 --- a/src/frontend/src/statement/describe.rs +++ b/src/frontend/src/statement/describe.rs @@ -14,7 +14,6 @@ use common_error::ext::BoxedError; use common_query::Output; -use datanode::instance::sql::table_idents_to_full_name; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use sql::statements::describe::DescribeTable; @@ -23,6 +22,7 @@ use crate::error::{ CatalogSnafu, DescribeStatementSnafu, ExternalSnafu, Result, TableNotFoundSnafu, }; use crate::statement::StatementExecutor; +use crate::table::table_idents_to_full_name; impl StatementExecutor { pub(super) async fn describe_table( diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 478cea71236a..c1c18d7c812f 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -16,13 +16,15 @@ use std::sync::Arc; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; +use session::context::QueryContextRef; +use sqlparser::ast::ObjectName; use store_api::data_source::DataSource; use store_api::storage::ScanRequest; use table::metadata::{FilterPushDownType, TableInfoRef}; use table::thin_table::{ThinTable, ThinTableAdapter}; use table::TableRef; -use crate::error::NotSupportedSnafu; +use crate::error::{InvalidSqlSnafu, NotSupportedSnafu, Result}; #[derive(Clone)] pub struct DistTable; @@ -38,7 +40,10 @@ impl DistTable { pub struct DummyDataSource; impl DataSource for DummyDataSource { - fn get_stream(&self, _request: ScanRequest) -> Result { + fn get_stream( + &self, + _request: ScanRequest, + ) -> std::result::Result { NotSupportedSnafu { feat: "get stream from a distributed table", } @@ -47,6 +52,37 @@ impl DataSource for DummyDataSource { } } +// TODO(LFC): Refactor consideration: move this function to some helper mod, +// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. +/// Converts maybe fully-qualified table name (`..
`) to tuple. +pub fn table_idents_to_full_name( + obj_name: &ObjectName, + query_ctx: QueryContextRef, +) -> Result<(String, String, String)> { + match &obj_name.0[..] { + [table] => Ok(( + query_ctx.current_catalog().to_owned(), + query_ctx.current_schema().to_owned(), + table.value.clone(), + )), + [schema, table] => Ok(( + query_ctx.current_catalog().to_owned(), + schema.value.clone(), + table.value.clone(), + )), + [catalog, schema, table] => Ok(( + catalog.value.clone(), + schema.value.clone(), + table.value.clone(), + )), + _ => InvalidSqlSnafu { + err_msg: format!( + "expect table name to be ..
, .
or
, actual: {obj_name}", + ), + }.fail(), + } +} + #[cfg(test)] pub(crate) mod test { use std::collections::BTreeMap; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index da1a46fd3997..9b53f8a26384 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -239,9 +239,10 @@ impl MetaSrv { if let Err(e) = procedure_manager.recover().await { error!("Failed to recover procedures, error: {e}"); } - let _ = task_handler.start(common_runtime::bg_runtime()) - .map_err(|e| { - debug!("Failed to start greptimedb telemetry task, error: {e}"); + let _ = task_handler.start().map_err(|e| { + debug!( + "Failed to start greptimedb telemetry task, error: {e}" + ); }); } LeaderChangeMessage::StepDown(leader) => {