diff --git a/velox/benchmarks/CMakeLists.txt b/velox/benchmarks/CMakeLists.txt index 8ad8465ea4d25..cfd6650d6bd42 100644 --- a/velox/benchmarks/CMakeLists.txt +++ b/velox/benchmarks/CMakeLists.txt @@ -39,3 +39,27 @@ if(${VELOX_ENABLE_BENCHMARKS}) add_subdirectory(tpch) add_subdirectory(filesystem) endif() + +add_library(velox_query_benchmark QueryBenchmarkBase.cpp) +target_link_libraries( + velox_query_benchmark + velox_aggregates + velox_exec + velox_exec_test_lib + velox_dwio_common + velox_dwio_common_exception + velox_dwio_parquet_reader + velox_dwio_common_test_utils + velox_hive_connector + velox_exception + velox_memory + velox_process + velox_serialization + velox_encode + velox_type + velox_type_fbhive + velox_caching + velox_vector_test_lib + ${FOLLY_BENCHMARK} + Folly::folly + fmt::fmt) diff --git a/velox/benchmarks/QueryBenchmarkBase.cpp b/velox/benchmarks/QueryBenchmarkBase.cpp new file mode 100644 index 0000000000000..6274a1bd94108 --- /dev/null +++ b/velox/benchmarks/QueryBenchmarkBase.cpp @@ -0,0 +1,391 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/benchmarks/QueryBenchmarkBase.h" + +DEFINE_string(data_format, "parquet", "Data format"); + +DEFINE_validator( + data_format, + &facebook::velox::QueryBenchmarkBase::validateDataFormat); + +DEFINE_bool( + include_custom_stats, + false, + "Include custom statistics along with execution statistics"); +DEFINE_bool(include_results, false, "Include results in the output"); +DEFINE_int32(num_drivers, 4, "Number of drivers"); + +DEFINE_int32(num_splits_per_file, 10, "Number of splits per file"); +DEFINE_int32( + cache_gb, + 0, + "GB of process memory for cache and query.. if " + "non-0, uses mmap to allocator and in-process data cache."); +DEFINE_int32(num_repeats, 1, "Number of times to run each query"); +DEFINE_int32(num_io_threads, 8, "Threads for speculative IO"); +DEFINE_string( + test_flags_file, + "", + "Path to a file containing gflafs and " + "values to try. Produces results for each flag combination " + "sorted on performance"); +DEFINE_bool( + full_sorted_stats, + true, + "Add full stats to the report on --test_flags_file"); + +DEFINE_string(ssd_path, "", "Directory for local SSD cache"); +DEFINE_int32(ssd_cache_gb, 0, "Size of local SSD cache in GB"); +DEFINE_int32( + ssd_checkpoint_interval_gb, + 8, + "Checkpoint every n " + "GB new data in cache"); +DEFINE_bool( + clear_ram_cache, + false, + "Clear RAM cache before each query." + "Flushes in process and OS file system cache (if root on Linux)"); +DEFINE_bool( + clear_ssd_cache, + false, + "Clears SSD cache before " + "each query"); + +DEFINE_bool( + warmup_after_clear, + false, + "Runs one warmup of the query before " + "measured run. Use to run warm after clearing caches."); + +DEFINE_int64( + max_coalesced_bytes, + 128 << 20, + "Maximum size of single coalesced IO"); + +DEFINE_int32( + max_coalesced_distance_bytes, + 512 << 10, + "Maximum distance in bytes in which coalesce will combine requests"); + +DEFINE_int32( + parquet_prefetch_rowgroups, + 1, + "Number of next row groups to " + "prefetch. 1 means prefetch the next row group before decoding " + "the current one"); + +DEFINE_int32(split_preload_per_driver, 2, "Prefetch split metadata"); + +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::dwio::common; + +namespace facebook::velox { + +// static +bool QueryBenchmarkBase::validateDataFormat( + const char* flagname, + const std::string& value) { + if ((value.compare("parquet") == 0) || (value.compare("dwrf") == 0)) { + return true; + } + std::cout + << fmt::format( + "Invalid value for --{}: {}. Allowed values are [\"parquet\", \"dwrf\"]", + flagname, + value) + << std::endl; + return false; +} + +// static +void QueryBenchmarkBase::ensureTaskCompletion(exec::Task* task) { + // ASSERT_TRUE requires a function with return type void. + ASSERT_TRUE(exec::test::waitForTaskCompletion(task)); +} + +// static +void QueryBenchmarkBase::printResults( + const std::vector& results, + std::ostream& out) { + out << "Results:" << std::endl; + bool printType = true; + for (const auto& vector : results) { + // Print RowType only once. + if (printType) { + out << vector->type()->asRow().toString() << std::endl; + printType = false; + } + for (vector_size_t i = 0; i < vector->size(); ++i) { + out << vector->toString(i) << std::endl; + } + } +} + +void QueryBenchmarkBase::initialize() { + if (FLAGS_cache_gb) { + memory::MemoryManagerOptions options; + int64_t memoryBytes = FLAGS_cache_gb * (1LL << 30); + options.useMmapAllocator = true; + options.allocatorCapacity = memoryBytes; + options.useMmapArena = true; + options.mmapArenaCapacityRatio = 1; + memory::MemoryManager::testingSetInstance(options); + std::unique_ptr ssdCache; + if (FLAGS_ssd_cache_gb) { + constexpr int32_t kNumSsdShards = 16; + cacheExecutor_ = + std::make_unique(kNumSsdShards); + const cache::SsdCache::Config config( + FLAGS_ssd_path, + static_cast(FLAGS_ssd_cache_gb) << 30, + kNumSsdShards, + cacheExecutor_.get(), + static_cast(FLAGS_ssd_checkpoint_interval_gb) << 30); + ssdCache = std::make_unique(config); + } + + cache_ = cache::AsyncDataCache::create( + memory::memoryManager()->allocator(), std::move(ssdCache)); + cache::AsyncDataCache::setInstance(cache_.get()); + } else { + memory::MemoryManager::testingSetInstance({}); + } + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); + filesystems::registerLocalFileSystem(); + + ioExecutor_ = + std::make_unique(FLAGS_num_io_threads); + + // Add new values into the hive configuration... + auto configurationValues = std::unordered_map(); + configurationValues[connector::hive::HiveConfig::kMaxCoalescedBytes] = + std::to_string(FLAGS_max_coalesced_bytes); + configurationValues[connector::hive::HiveConfig::kMaxCoalescedDistanceBytes] = + std::to_string(FLAGS_max_coalesced_distance_bytes); + configurationValues[connector::hive::HiveConfig::kPrefetchRowGroups] = + std::to_string(FLAGS_parquet_prefetch_rowgroups); + auto properties = + std::make_shared(configurationValues); + + // Create hive connector with config... + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector(kHiveConnectorId, properties, ioExecutor_.get()); + connector::registerConnector(hiveConnector); +} + +std::vector> +QueryBenchmarkBase::listSplits( + const std::string& path, + int32_t numSplitsPerFile, + const exec::test::TpchPlan& plan) { + std::vector> result; + auto temp = HiveConnectorTestBase::makeHiveConnectorSplits( + path, numSplitsPerFile, plan.dataFileFormat); + for (auto& i : temp) { + result.push_back(i); + } + return result; +} + +void QueryBenchmarkBase::shutdown() { + if (cache_) { + cache_->shutdown(); + } +} + +std::pair, std::vector> +QueryBenchmarkBase::run(const TpchPlan& tpchPlan) { + int32_t repeat = 0; + try { + for (;;) { + CursorParameters params; + params.maxDrivers = FLAGS_num_drivers; + params.planNode = tpchPlan.plan; + params.queryConfigs[core::QueryConfig::kMaxSplitPreloadPerDriver] = + std::to_string(FLAGS_split_preload_per_driver); + const int numSplitsPerFile = FLAGS_num_splits_per_file; + + bool noMoreSplits = false; + auto addSplits = [&](exec::Task* task) { + if (!noMoreSplits) { + for (const auto& entry : tpchPlan.dataFiles) { + for (const auto& path : entry.second) { + auto splits = listSplits(path, numSplitsPerFile, tpchPlan); + for (auto split : splits) { + task->addSplit(entry.first, exec::Split(std::move(split))); + } + } + task->noMoreSplits(entry.first); + } + } + noMoreSplits = true; + }; + auto result = readCursor(params, addSplits); + ensureTaskCompletion(result.first->task().get()); + if (++repeat >= FLAGS_num_repeats) { + return result; + } + } + } catch (const std::exception& e) { + LOG(ERROR) << "Query terminated with: " << e.what(); + return {nullptr, std::vector()}; + } +} + +void QueryBenchmarkBase::readCombinations() { + std::ifstream file(FLAGS_test_flags_file); + std::string line; + while (std::getline(file, line)) { + ParameterDim dim; + int32_t previous = 0; + for (auto i = 0; i < line.size(); ++i) { + if (line[i] == ':') { + dim.flag = line.substr(0, i); + previous = i + 1; + } else if (line[i] == ',') { + dim.values.push_back(line.substr(previous, i - previous)); + previous = i + 1; + } + } + if (previous < line.size()) { + dim.values.push_back(line.substr(previous, line.size() - previous)); + } + if (!dim.flag.empty() && !dim.values.empty()) { + parameters_.push_back(dim); + } + } +} + +void QueryBenchmarkBase::runCombinations(int32_t level) { + if (level == parameters_.size()) { + if (FLAGS_clear_ram_cache) { +#ifdef linux + // system("echo 3 >/proc/sys/vm/drop_caches"); + bool success = false; + auto fd = open("/proc//sys/vm/drop_caches", O_WRONLY); + if (fd > 0) { + success = write(fd, "3", 1) == 1; + close(fd); + } + if (!success) { + LOG(ERROR) << "Failed to clear OS disk cache: errno=" << errno; + } +#endif + + if (cache_) { + cache_->clear(); + } + } + if (FLAGS_clear_ssd_cache) { + if (cache_) { + auto ssdCache = cache_->ssdCache(); + if (ssdCache) { + ssdCache->clear(); + } + } + } + if (FLAGS_warmup_after_clear) { + std::stringstream result; + RunStats ignore; + runMain(result, ignore); + } + RunStats stats; + std::stringstream result; + uint64_t micros = 0; + { + struct rusage start; + getrusage(RUSAGE_SELF, &start); + MicrosecondTimer timer(µs); + runMain(result, stats); + struct rusage final; + getrusage(RUSAGE_SELF, &final); + auto tvNanos = [](struct timeval tv) { + return tv.tv_sec * 1000000000 + tv.tv_usec * 1000; + }; + stats.userNanos = tvNanos(final.ru_utime) - tvNanos(start.ru_utime); + stats.systemNanos = tvNanos(final.ru_stime) - tvNanos(start.ru_stime); + } + stats.micros = micros; + stats.output = result.str(); + for (auto i = 0; i < parameters_.size(); ++i) { + std::string name; + gflags::GetCommandLineOption(parameters_[i].flag.c_str(), &name); + stats.flags[parameters_[i].flag] = name; + } + runStats_.push_back(std::move(stats)); + } else { + auto& flag = parameters_[level].flag; + for (auto& value : parameters_[level].values) { + std::string result = + gflags::SetCommandLineOption(flag.c_str(), value.c_str()); + if (result.empty()) { + LOG(ERROR) << "Failed to set " << flag << "=" << value; + } + std::cout << result << std::endl; + runCombinations(level + 1); + } + } +} + +void QueryBenchmarkBase::runOne(std::ostream& out, RunStats& stats) { + std::stringstream result; + uint64_t micros = 0; + { + struct rusage start; + getrusage(RUSAGE_SELF, &start); + MicrosecondTimer timer(µs); + runMain(out, stats); + struct rusage final; + getrusage(RUSAGE_SELF, &final); + auto tvNanos = [](struct timeval tv) { + return tv.tv_sec * 1000000000 + tv.tv_usec * 1000; + }; + stats.userNanos = tvNanos(final.ru_utime) - tvNanos(start.ru_utime); + stats.systemNanos = tvNanos(final.ru_stime) - tvNanos(start.ru_stime); + } + stats.micros = micros; + stats.output = result.str(); + out << result.str(); +} + +void QueryBenchmarkBase::runAllCombinations() { + readCombinations(); + runCombinations(0); + std::sort( + runStats_.begin(), + runStats_.end(), + [](const RunStats& left, const RunStats& right) { + return left.micros < right.micros; + }); + for (auto& stats : runStats_) { + std::cout << stats.toString(false); + } + if (FLAGS_full_sorted_stats) { + std::cout << "Detail for stats:" << std::endl; + for (auto& stats : runStats_) { + std::cout << stats.toString(true); + } + } +} + +} // namespace facebook::velox diff --git a/velox/benchmarks/QueryBenchmarkBase.h b/velox/benchmarks/QueryBenchmarkBase.h new file mode 100644 index 0000000000000..d3577fe53cf25 --- /dev/null +++ b/velox/benchmarks/QueryBenchmarkBase.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "velox/common/base/SuccinctPrinter.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/MmapAllocator.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/dwio/common/Options.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/Split.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/TpchQueryBuilder.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/parse/TypeResolver.h" + +DECLARE_string(test_flags_file); +DECLARE_bool(include_results); +DECLARE_bool(include_custom_stats); +DECLARE_string(data_format); + +namespace facebook::velox { + +struct RunStats { + std::map flags; + int64_t micros{0}; + int64_t rawInputBytes{0}; + int64_t userNanos{0}; + int64_t systemNanos{0}; + std::string output; + + std::string toString(bool detail) { + std::stringstream out; + out << succinctNanos(micros * 1000) << " " + << succinctBytes(rawInputBytes / (micros / 1000000.0)) << "/s raw, " + << succinctNanos(userNanos) << " user " << succinctNanos(systemNanos) + << " system (" << (100 * (userNanos + systemNanos) / (micros * 1000)) + << "%), flags: "; + for (auto& pair : flags) { + out << pair.first << "=" << pair.second << " "; + } + out << std::endl << "======" << std::endl; + if (detail) { + out << std::endl << output << std::endl; + } + return out.str(); + } +}; + +struct ParameterDim { + std::string flag; + std::vector values; +}; + +class QueryBenchmarkBase { + public: + virtual ~QueryBenchmarkBase() = default; + virtual void initialize(); + void shutdown(); + std::pair, std::vector> + run(const exec::test::TpchPlan& tpchPlan); + + virtual std::vector> listSplits( + const std::string& path, + int32_t numSplitsPerFile, + const exec::test::TpchPlan& plan); + + static void ensureTaskCompletion(exec::Task* task); + + static bool validateDataFormat( + const char* flagname, + const std::string& value); + + static void printResults( + const std::vector& results, + std::ostream& out); + + void readCombinations(); + + /// Entry point invoked with different settings to run the benchmark. + virtual void runMain(std::ostream& out, RunStats& runStats) = 0; + + void runOne(std::ostream& outtt, RunStats& stats); + + void runCombinations(int32_t level); + + void runAllCombinations(); + + protected: + std::unique_ptr ioExecutor_; + std::unique_ptr cacheExecutor_; + std::shared_ptr allocator_; + std::shared_ptr cache_; + // Parameter combinations to try. Each element specifies a flag and possible + // values. All permutations are tried. + std::vector parameters_; + + std::vector runStats_; +}; +} // namespace facebook::velox diff --git a/velox/benchmarks/tpch/CMakeLists.txt b/velox/benchmarks/tpch/CMakeLists.txt index ba7491f7e6037..3cbe9a0a6a6ba 100644 --- a/velox/benchmarks/tpch/CMakeLists.txt +++ b/velox/benchmarks/tpch/CMakeLists.txt @@ -16,6 +16,7 @@ add_library(velox_tpch_benchmark_lib TpchBenchmark.cpp) target_link_libraries( velox_tpch_benchmark_lib + velox_query_benchmark velox_aggregates velox_exec velox_exec_test_lib diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 413bd82e4a5d7..25d02224fc58d 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -14,75 +14,13 @@ * limitations under the License. */ -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "velox/common/base/SuccinctPrinter.h" -#include "velox/common/file/FileSystems.h" -#include "velox/common/memory/MmapAllocator.h" -#include "velox/connectors/hive/HiveConfig.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/dwio/common/Options.h" -#include "velox/exec/PlanNodeStats.h" -#include "velox/exec/Split.h" -#include "velox/exec/tests/utils/HiveConnectorTestBase.h" -#include "velox/exec/tests/utils/TpchQueryBuilder.h" -#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" -#include "velox/functions/prestosql/registration/RegistrationFunctions.h" -#include "velox/parse/TypeResolver.h" +#include "velox/benchmarks/QueryBenchmarkBase.h" using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; using namespace facebook::velox::dwio::common; -namespace { -static bool notEmpty(const char* /*flagName*/, const std::string& value) { - return !value.empty(); -} - -static bool validateDataFormat(const char* flagname, const std::string& value) { - if ((value.compare("parquet") == 0) || (value.compare("dwrf") == 0)) { - return true; - } - std::cout - << fmt::format( - "Invalid value for --{}: {}. Allowed values are [\"parquet\", \"dwrf\"]", - flagname, - value) - << std::endl; - return false; -} - -void ensureTaskCompletion(exec::Task* task) { - // ASSERT_TRUE requires a function with return type void. - ASSERT_TRUE(waitForTaskCompletion(task)); -} - -void printResults(const std::vector& results, std::ostream& out) { - out << "Results:" << std::endl; - bool printType = true; - for (const auto& vector : results) { - // Print RowType only once. - if (printType) { - out << vector->type()->asRow().toString() << std::endl; - printType = false; - } - for (vector_size_t i = 0; i < vector->size(); ++i) { - out << vector->toString(i) << std::endl; - } - } -} -} // namespace - DEFINE_string( data_path, "", @@ -100,6 +38,13 @@ DEFINE_string( "each table. If they are files, they contain a file system path for each " "data file, one per line. This allows running against cloud storage or " "HDFS"); +namespace { +static bool notEmpty(const char* /*flagName*/, const std::string& value) { + return !value.empty(); +} +} // namespace + +DEFINE_validator(data_path, ¬Empty); DEFINE_int32( run_query_verbose, @@ -112,216 +57,11 @@ DEFINE_int32( "include in IO meter query. The columns are sorted by name and the n% first " "are scanned"); -DEFINE_bool( - include_custom_stats, - false, - "Include custom statistics along with execution statistics"); -DEFINE_bool(include_results, false, "Include results in the output"); -DEFINE_int32(num_drivers, 4, "Number of drivers"); -DEFINE_string(data_format, "parquet", "Data format"); -DEFINE_int32(num_splits_per_file, 10, "Number of splits per file"); -DEFINE_int32( - cache_gb, - 0, - "GB of process memory for cache and query.. if " - "non-0, uses mmap to allocator and in-process data cache."); -DEFINE_int32(num_repeats, 1, "Number of times to run each query"); -DEFINE_int32(num_io_threads, 8, "Threads for speculative IO"); -DEFINE_string( - test_flags_file, - "", - "Path to a file containing gflafs and " - "values to try. Produces results for each flag combination " - "sorted on performance"); -DEFINE_bool( - full_sorted_stats, - true, - "Add full stats to the report on --test_flags_file"); - -DEFINE_string(ssd_path, "", "Directory for local SSD cache"); -DEFINE_int32(ssd_cache_gb, 0, "Size of local SSD cache in GB"); -DEFINE_int32( - ssd_checkpoint_interval_gb, - 8, - "Checkpoint every n " - "GB new data in cache"); -DEFINE_bool( - clear_ram_cache, - false, - "Clear RAM cache before each query." - "Flushes in process and OS file system cache (if root on Linux)"); -DEFINE_bool( - clear_ssd_cache, - false, - "Clears SSD cache before " - "each query"); - -DEFINE_bool( - warmup_after_clear, - false, - "Runs one warmup of the query before " - "measured run. Use to run warm after clearing caches."); - -DEFINE_validator(data_path, ¬Empty); -DEFINE_validator(data_format, &validateDataFormat); - -DEFINE_int64( - max_coalesced_bytes, - 128 << 20, - "Maximum size of single coalesced IO"); - -DEFINE_int32( - max_coalesced_distance_bytes, - 512 << 10, - "Maximum distance in bytes in which coalesce will combine requests"); - -DEFINE_int32( - parquet_prefetch_rowgroups, - 1, - "Number of next row groups to " - "prefetch. 1 means prefetch the next row group before decoding " - "the current one"); - -DEFINE_int32(split_preload_per_driver, 2, "Prefetch split metadata"); - -struct RunStats { - std::map flags; - int64_t micros{0}; - int64_t rawInputBytes{0}; - int64_t userNanos{0}; - int64_t systemNanos{0}; - std::string output; - - std::string toString(bool detail) { - std::stringstream out; - out << succinctNanos(micros * 1000) << " " - << succinctBytes(rawInputBytes / (micros / 1000000.0)) << "/s raw, " - << succinctNanos(userNanos) << " user " << succinctNanos(systemNanos) - << " system (" << (100 * (userNanos + systemNanos) / (micros * 1000)) - << "%), flags: "; - for (auto& pair : flags) { - out << pair.first << "=" << pair.second << " "; - } - out << std::endl << "======" << std::endl; - if (detail) { - out << std::endl << output << std::endl; - } - return out.str(); - } -}; - -struct ParameterDim { - std::string flag; - std::vector values; -}; - std::shared_ptr queryBuilder; -class TpchBenchmark { +class TpchBenchmark : public QueryBenchmarkBase { public: - void initialize() { - if (FLAGS_cache_gb) { - memory::MemoryManagerOptions options; - int64_t memoryBytes = FLAGS_cache_gb * (1LL << 30); - options.useMmapAllocator = true; - options.allocatorCapacity = memoryBytes; - options.useMmapArena = true; - options.mmapArenaCapacityRatio = 1; - memory::MemoryManager::testingSetInstance(options); - std::unique_ptr ssdCache; - if (FLAGS_ssd_cache_gb) { - constexpr int32_t kNumSsdShards = 16; - cacheExecutor_ = - std::make_unique(kNumSsdShards); - const cache::SsdCache::Config config( - FLAGS_ssd_path, - static_cast(FLAGS_ssd_cache_gb) << 30, - kNumSsdShards, - cacheExecutor_.get(), - static_cast(FLAGS_ssd_checkpoint_interval_gb) << 30); - ssdCache = std::make_unique(config); - } - - cache_ = cache::AsyncDataCache::create( - memory::memoryManager()->allocator(), std::move(ssdCache)); - cache::AsyncDataCache::setInstance(cache_.get()); - } else { - memory::MemoryManager::testingSetInstance({}); - } - functions::prestosql::registerAllScalarFunctions(); - aggregate::prestosql::registerAllAggregateFunctions(); - parse::registerTypeResolver(); - filesystems::registerLocalFileSystem(); - - ioExecutor_ = - std::make_unique(FLAGS_num_io_threads); - - // Add new values into the hive configuration... - auto configurationValues = std::unordered_map(); - configurationValues[connector::hive::HiveConfig::kMaxCoalescedBytes] = - std::to_string(FLAGS_max_coalesced_bytes); - configurationValues - [connector::hive::HiveConfig::kMaxCoalescedDistanceBytes] = - std::to_string(FLAGS_max_coalesced_distance_bytes); - configurationValues[connector::hive::HiveConfig::kPrefetchRowGroups] = - std::to_string(FLAGS_parquet_prefetch_rowgroups); - auto properties = - std::make_shared(configurationValues); - - // Create hive connector with config... - auto hiveConnector = - connector::getConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, properties, ioExecutor_.get()); - connector::registerConnector(hiveConnector); - } - - void shutdown() { - cache_->shutdown(); - } - - std::pair, std::vector> run( - const TpchPlan& tpchPlan) { - int32_t repeat = 0; - try { - for (;;) { - CursorParameters params; - params.maxDrivers = FLAGS_num_drivers; - params.planNode = tpchPlan.plan; - params.queryConfigs[core::QueryConfig::kMaxSplitPreloadPerDriver] = - std::to_string(FLAGS_split_preload_per_driver); - const int numSplitsPerFile = FLAGS_num_splits_per_file; - - bool noMoreSplits = false; - auto addSplits = [&](exec::Task* task) { - if (!noMoreSplits) { - for (const auto& entry : tpchPlan.dataFiles) { - for (const auto& path : entry.second) { - auto const splits = - HiveConnectorTestBase::makeHiveConnectorSplits( - path, numSplitsPerFile, tpchPlan.dataFileFormat); - for (const auto& split : splits) { - task->addSplit(entry.first, exec::Split(split)); - } - } - task->noMoreSplits(entry.first); - } - } - noMoreSplits = true; - }; - auto result = readCursor(params, addSplits); - ensureTaskCompletion(result.first->task().get()); - if (++repeat >= FLAGS_num_repeats) { - return result; - } - } - } catch (const std::exception& e) { - LOG(ERROR) << "Query terminated with: " << e.what(); - return {nullptr, std::vector()}; - } - } - - void runMain(std::ostream& out, RunStats& runStats) { + void runMain(std::ostream& out, RunStats& runStats) override { if (FLAGS_run_query_verbose == -1 && FLAGS_io_meter_column_pct == 0) { folly::runBenchmarks(); } else { @@ -363,130 +103,6 @@ class TpchBenchmark { << std::endl; } } - - void readCombinations() { - std::ifstream file(FLAGS_test_flags_file); - std::string line; - while (std::getline(file, line)) { - ParameterDim dim; - int32_t previous = 0; - for (auto i = 0; i < line.size(); ++i) { - if (line[i] == ':') { - dim.flag = line.substr(0, i); - previous = i + 1; - } else if (line[i] == ',') { - dim.values.push_back(line.substr(previous, i - previous)); - previous = i + 1; - } - } - if (previous < line.size()) { - dim.values.push_back(line.substr(previous, line.size() - previous)); - } - - parameters_.push_back(dim); - } - } - - void runCombinations(int32_t level) { - if (level == parameters_.size()) { - if (FLAGS_clear_ram_cache) { -#ifdef linux - // system("echo 3 >/proc/sys/vm/drop_caches"); - bool success = false; - auto fd = open("/proc//sys/vm/drop_caches", O_WRONLY); - if (fd > 0) { - success = write(fd, "3", 1) == 1; - close(fd); - } - if (!success) { - LOG(ERROR) << "Failed to clear OS disk cache: errno=" << errno; - } -#endif - - if (cache_) { - cache_->clear(); - } - } - if (FLAGS_clear_ssd_cache) { - if (cache_) { - auto ssdCache = cache_->ssdCache(); - if (ssdCache) { - ssdCache->clear(); - } - } - } - if (FLAGS_warmup_after_clear) { - std::stringstream result; - RunStats ignore; - runMain(result, ignore); - } - RunStats stats; - std::stringstream result; - uint64_t micros = 0; - { - struct rusage start; - getrusage(RUSAGE_SELF, &start); - MicrosecondTimer timer(µs); - runMain(result, stats); - struct rusage final; - getrusage(RUSAGE_SELF, &final); - auto tvNanos = [](struct timeval tv) { - return tv.tv_sec * 1000000000 + tv.tv_usec * 1000; - }; - stats.userNanos = tvNanos(final.ru_utime) - tvNanos(start.ru_utime); - stats.systemNanos = tvNanos(final.ru_stime) - tvNanos(start.ru_stime); - } - stats.micros = micros; - stats.output = result.str(); - for (auto i = 0; i < parameters_.size(); ++i) { - std::string name; - gflags::GetCommandLineOption(parameters_[i].flag.c_str(), &name); - stats.flags[parameters_[i].flag] = name; - } - runStats_.push_back(std::move(stats)); - } else { - auto& flag = parameters_[level].flag; - for (auto& value : parameters_[level].values) { - std::string result = - gflags::SetCommandLineOption(flag.c_str(), value.c_str()); - if (result.empty()) { - LOG(ERROR) << "Failed to set " << flag << "=" << value; - } - std::cout << result << std::endl; - runCombinations(level + 1); - } - } - } - - void runAllCombinations() { - readCombinations(); - runCombinations(0); - std::sort( - runStats_.begin(), - runStats_.end(), - [](const RunStats& left, const RunStats& right) { - return left.micros < right.micros; - }); - for (auto& stats : runStats_) { - std::cout << stats.toString(false); - } - if (FLAGS_full_sorted_stats) { - std::cout << "Detail for stats:" << std::endl; - for (auto& stats : runStats_) { - std::cout << stats.toString(true); - } - } - } - - std::unique_ptr ioExecutor_; - std::unique_ptr cacheExecutor_; - std::shared_ptr allocator_; - std::shared_ptr cache_; - // Parameter combinations to try. Each element specifies a flag and possible - // values. All permutations are tried. - std::vector parameters_; - - std::vector runStats_; }; TpchBenchmark benchmark; diff --git a/velox/experimental/wave/common/GpuArena.cpp b/velox/experimental/wave/common/GpuArena.cpp index 5605b6428617c..770e28532b43c 100644 --- a/velox/experimental/wave/common/GpuArena.cpp +++ b/velox/experimental/wave/common/GpuArena.cpp @@ -32,7 +32,11 @@ uint64_t GpuSlab::roundBytes(uint64_t bytes) { if (FLAGS_wave_buffer_end_guard) { bytes += sizeof(int64_t); } - return bits::nextPowerOfTwo(std::max(16, bytes)); + if (bytes > 32 << 10) { + return bits::roundUp(bytes, 32 << 10); + } else { + return bits::nextPowerOfTwo(std::max(16, bytes)); + } } GpuSlab::GpuSlab(void* ptr, size_t capacityBytes, GpuAllocator* allocator) @@ -285,12 +289,18 @@ GpuArena::Buffers::Buffers() { } } -GpuArena::GpuArena(uint64_t singleArenaCapacity, GpuAllocator* allocator) - : singleArenaCapacity_(singleArenaCapacity), allocator_(allocator) { +GpuArena::GpuArena( + uint64_t singleArenaCapacity, + GpuAllocator* allocator, + uint64_t standbyCapacity) + : singleArenaCapacity_(singleArenaCapacity), + standbyCapacity_(standbyCapacity), + allocator_(allocator) { auto arena = std::make_shared( allocator_->allocate(singleArenaCapacity), singleArenaCapacity, allocator_); + capacity_ += arena->byteSize(); arenas_.emplace(reinterpret_cast(arena->address()), arena); currentArena_ = arena; } @@ -342,6 +352,7 @@ WaveBufferPtr GpuArena::allocateBytes(uint64_t bytes) { auto newArena = std::make_shared( allocator_->allocate(arenaBytes), arenaBytes, allocator_); arenas_.emplace(reinterpret_cast(newArena->address()), newArena); + capacity_ += newArena->byteSize(); currentArena_ = newArena; result = currentArena_->allocate(bytes); if (result) { @@ -365,7 +376,8 @@ void GpuArena::free(Buffer* buffer) { iter->first + singleArenaCapacity_, addressU64 + buffer->size_); } iter->second->free(buffer->ptr_, buffer->size_); - if (iter->second->empty() && iter->second != currentArena_) { + if (iter->second->empty() && iter->second != currentArena_ && + capacity_ - iter->second->byteSize() >= standbyCapacity_) { arenas_.erase(iter); } buffer->ptr_ = firstFreeBuffer_; @@ -405,4 +417,12 @@ ArenaStatus GpuArena::checkBuffers() { return status; } +std::string GpuArena::toString() const { + std::stringstream out; + for (auto& pair : arenas_) { + out << pair.second->toString() << std::endl; + } + return out.str(); +} + } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/common/GpuArena.h b/velox/experimental/wave/common/GpuArena.h index 1b03ce84a7ac3..982138ead6983 100644 --- a/velox/experimental/wave/common/GpuArena.h +++ b/velox/experimental/wave/common/GpuArena.h @@ -130,7 +130,10 @@ struct ArenaStatus { /// fragmentation happens. class GpuArena { public: - GpuArena(uint64_t singleArenaCapacity, GpuAllocator* allocator); + GpuArena( + uint64_t singleArenaCapacity, + GpuAllocator* allocator, + uint64_t standbyCapacity = 0); WaveBufferPtr allocateBytes(uint64_t bytes); @@ -156,6 +159,8 @@ class GpuArena { /// sizes are padded to larger. ArenaStatus checkBuffers(); + std::string toString() const; + private: // A preallocated array of Buffer handles for memory of 'this'. struct Buffers { @@ -178,9 +183,15 @@ class GpuArena { // Head of Buffer free list. Buffer* firstFreeBuffer_{nullptr}; + // Total capacity in all arenas. + uint64_t capacity_{0}; + // Capacity in bytes for a single GpuSlab managed by this. const uint64_t singleArenaCapacity_; + // Lower bound of capacity to keep around even if usage is below this. + uint64_t standbyCapacity_{0}; + GpuAllocator* const allocator_; // A sorted list of GpuSlab by its initial address diff --git a/velox/experimental/wave/common/ResultStaging.cpp b/velox/experimental/wave/common/ResultStaging.cpp index 77209c8f2c241..0f63e79cfe469 100644 --- a/velox/experimental/wave/common/ResultStaging.cpp +++ b/velox/experimental/wave/common/ResultStaging.cpp @@ -32,7 +32,7 @@ void ResultStaging::registerPointerInternal( bool clear) { VELOX_CHECK_LT(id, offsets_.size()); VELOX_CHECK_NOT_NULL(pointer); -#ifndef NDEBUG +#if 0 // ndef NDEBUG for (auto& pair : patch_) { VELOX_CHECK( pair.second != pointer, "Must not register the same pointer twice"); diff --git a/velox/experimental/wave/common/tests/CudaTest.cpp b/velox/experimental/wave/common/tests/CudaTest.cpp index c9cafab29511d..9f924d962f8e7 100644 --- a/velox/experimental/wave/common/tests/CudaTest.cpp +++ b/velox/experimental/wave/common/tests/CudaTest.cpp @@ -1464,7 +1464,7 @@ TEST_F(CudaTest, reduceMatrix) { return; } - std::vector modes = {/*"unified", "device",*/ "devicecoalesced"}; + std::vector modes = {"unified", "device", "devicecoalesced"}; std::vector batchMBValues = {30, 100}; std::vector numThreadsValues = {1, 2, 3}; std::vector workPerThreadValues = {2, 4}; diff --git a/velox/experimental/wave/dwio/ColumnReader.h b/velox/experimental/wave/dwio/ColumnReader.h index 7848409cc3b28..7da99d960e5a9 100644 --- a/velox/experimental/wave/dwio/ColumnReader.h +++ b/velox/experimental/wave/dwio/ColumnReader.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/common/io/IoStatistics.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/experimental/wave/dwio/FormatData.h" #include "velox/experimental/wave/exec/Wave.h" @@ -113,6 +114,8 @@ class ReadStream : public Executable { ReadStream( StructColumnReader* columnReader, WaveStream& waveStream, + velox::io::IoStatistics* ioStats, + FileInfo& fileInfo, const OperandSet* firstColumns = nullptr); void setNullable(const AbstractOperand& op, bool nullable) { @@ -160,6 +163,8 @@ class ReadStream : public Executable { void prepareRead(); void makeControl(); + io::IoStatistics* const ioStats_; + // Makes steps to align values from non-last filters to the selection of the // last filter. void makeCompact(bool isSerial); @@ -211,6 +216,7 @@ class ReadStream : public Executable { // Set to true when after first griddize() and akeOps(). bool inited_{false}; + FileInfo fileInfo_; }; } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/dwio/FormatData.cpp b/velox/experimental/wave/dwio/FormatData.cpp index aa588ed5b2d3a..6eb116d922fe9 100644 --- a/velox/experimental/wave/dwio/FormatData.cpp +++ b/velox/experimental/wave/dwio/FormatData.cpp @@ -16,9 +16,15 @@ #include "velox/experimental/wave/dwio/FormatData.h" #include "velox/experimental/wave/dwio/ColumnReader.h" +#include "velox/experimental/wave/exec/Wave.h" DECLARE_int32(wave_reader_rows_per_tb); +DEFINE_int32( + staging_bytes_per_thread, + 300000, + "Make a parallel memcpy shard per this many bytes"); + namespace facebook::velox::wave { BufferId SplitStaging::add(Staging& staging) { @@ -38,7 +44,7 @@ void SplitStaging::registerPointerInternal( if (clear) { *ptr = nullptr; } -#ifndef NDEBUG +#if 0 // ndef NDEBUG for (auto& pair : patch_) { VELOX_CHECK(pair.second != ptr, "Must not register the same pointer twice"); } @@ -46,29 +52,110 @@ void SplitStaging::registerPointerInternal( patch_.push_back(std::make_pair(id, ptr)); } +void SplitStaging::copyColumns( + int32_t begin, + int32_t end, + char* destination, + bool release) { + for (auto i = begin; i < end; ++i) { + memcpy(destination, staging_[i].hostData, staging_[i].size); + destination += staging_[i].size; + } + if (release) { + sem_.release(); + } +} + +// Shared pool of 1-2GB of pinned host memory for staging. May +// transiently exceed 2GB but settles to 2GB after the peak. +GpuArena& getTransferArena() { + static std::unique_ptr arena = std::make_unique( + 1UL << 30, getHostAllocator(nullptr), 2UL << 30); + return *arena; +} + // Starts the transfers registered with add(). 'stream' is set to a stream // where operations depending on the transfer may be queued. void SplitStaging::transfer( WaveStream& waveStream, Stream& stream, - bool recordEvent) { + bool recordEvent, + std::function asyncTail) { if (fill_ == 0 || deviceBuffer_ != nullptr) { + if (recordEvent) { + event_ = std::make_unique(); + event_->record(stream); + } + if (asyncTail) { + asyncTail(waveStream, stream); + } return; } - deviceBuffer_ = waveStream.arena().allocate(fill_); - auto universal = deviceBuffer_->as(); - for (auto i = 0; i < offsets_.size(); ++i) { - memcpy(universal + offsets_[i], staging_[i].hostData, staging_[i].size); + WaveTime startTime = WaveTime::now(); + deviceBuffer_ = waveStream.deviceArena().allocate(fill_); + hostBuffer_ = getTransferArena().allocate(fill_); + auto transferBuffer = hostBuffer_->as(); + int firstToCopy = 0; + int32_t numCopies = staging_.size(); + int64_t copySize = 0; + auto targetCopySize = FLAGS_staging_bytes_per_thread; + int32_t numThreads = 0; + if (fill_ > 2000000) { + for (auto i = 0; i < staging_.size(); ++i) { + auto columnSize = staging_[i].size; + copySize += columnSize; + if (copySize >= targetCopySize && i < staging_.size() - 1) { + ++numThreads; + WaveStream::copyExecutor()->add( + [i, firstToCopy, transferBuffer, this]() { + copyColumns(firstToCopy, i + 1, transferBuffer, true); + }); + transferBuffer += copySize; + copySize = 0; + firstToCopy = i + 1; + } + } } - stream.prefetch( - getDevice(), deviceBuffer_->as(), deviceBuffer_->size()); + auto deviceData = deviceBuffer_->as(); for (auto& pair : patch_) { *reinterpret_cast(pair.second) += - reinterpret_cast(universal) + offsets_[pair.first]; + reinterpret_cast(deviceData) + offsets_[pair.first]; } - if (recordEvent) { - event_ = std::make_unique(); - event_->record(stream); + + if (asyncTail) { + WaveStream::syncExecutor()->add([firstToCopy, + numThreads, + transferBuffer, + asyncTail, + &waveStream, + &stream, + recordEvent, + startTime, + this]() { + copyColumns(firstToCopy, staging_.size(), transferBuffer, false); + for (auto i = 0; i < numThreads; ++i) { + sem_.acquire(); + } + stream.hostToDeviceAsync( + deviceBuffer_->as(), hostBuffer_->as(), fill_); + waveStream.stats().stagingTime += WaveTime::now() - startTime; + if (recordEvent) { + event_ = std::make_unique(); + event_->record(stream); + } + }); + } else { + copyColumns(firstToCopy, staging_.size(), transferBuffer, false); + for (auto i = 0; i < numThreads; ++i) { + sem_.acquire(); + } + stream.hostToDeviceAsync( + deviceBuffer_->as(), hostBuffer_->as(), fill_); + waveStream.stats().stagingTime += WaveTime::now() - startTime; + if (recordEvent) { + event_ = std::make_unique(); + event_->record(stream); + } } } diff --git a/velox/experimental/wave/dwio/FormatData.h b/velox/experimental/wave/dwio/FormatData.h index a30a479cea97b..5a10a5335132b 100644 --- a/velox/experimental/wave/dwio/FormatData.h +++ b/velox/experimental/wave/dwio/FormatData.h @@ -16,14 +16,16 @@ #pragma once +#include +#include "velox/common/base/Semaphore.h" +#include "velox/common/caching/AsyncDataCache.h" +#include "velox/common/file/Region.h" #include "velox/dwio/common/ScanSpec.h" #include "velox/dwio/common/Statistics.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/experimental/wave/dwio/decode/DecodeStep.h" #include "velox/experimental/wave/vector/WaveVector.h" -#include - namespace facebook::velox::wave { class ReadStream; @@ -32,8 +34,10 @@ class WaveStream; // Describes how a column is staged on GPU, for example, copy from host RAM, // direct read, already on device etc. struct Staging { - Staging(const void* hostData, int32_t size) - : hostData(hostData), size(size) {} + Staging(const void* hostData, int32_t size, const common::Region& region) + : hostData(hostData), + size(hostData ? size : region.length), + fileOffset(region.offset) {} // Pointer to data in pageable host memory, if applicable. const void* hostData{nullptr}; @@ -41,7 +45,15 @@ struct Staging { // Size in bytes. size_t size; - // Add members here to describe locations in storage for GPU direct transfer. + /// If 'hostData' is nullptr, this is the start offset for 'size' + /// bytes in 'fileInfo_' of containing SplitStaging. + int64_t fileOffset{0}; +}; + +struct FileInfo { + ReadFile* file{nullptr}; + StringIdLease* fileId{nullptr}; + cache::AsyncDataCache* cache{nullptr}; }; /// Describes how columns to be read together are staged on device. This is @@ -49,6 +61,8 @@ struct Staging { /// data already on device. class SplitStaging { public: + SplitStaging(FileInfo& fileInfo) : fileInfo_(fileInfo) {} + /// Adds a transfer described by 'staging'. Returns an id of the /// device side buffer. The id will be mapped to an actual buffer /// when the transfers are queud. At this time, pointers that @@ -72,12 +86,20 @@ class SplitStaging { int64_t bytesToDevice() const { return fill_; } - // Starts the transfers registered with add( on 'stream'). Does nothing after - // first call or if no pointers are registered. If 'recordEvent' is true, - // records an event that is completed after the transfer arrives. Use event() - // to access the event. - void - transfer(WaveStream& waveStream, Stream& stream, bool recordEvent = false); + // Starts the transfers registered with add( on 'stream'). Does + // nothing after first call or if no pointers are registered. If + // 'recordEvent' is true, records an event that is completed after + // the transfer arrives. Use event() to access the event. If + // 'asyncTail' is non-nullptr, it is called after the data transfer + // is enqueued. The call is on an executor and transfer() returns as + // soon as the work is enqueud. If asyncTail is not given, + // transfer() returns after the transfer is enqueued on + // 'stream'. event() is not set until the transfer is enqueued. + void transfer( + WaveStream& waveStream, + Stream& stream, + bool recordEvent = false, + std::function asyncTail = nullptr); Event* event() const { return event_.get(); @@ -86,6 +108,8 @@ class SplitStaging { private: void registerPointerInternal(BufferId id, void** ptr, bool clear); + void copyColumns(int32_t begin, int32_t end, char* destination, bool release); + // Pinned host memory for transfer to device. May be nullptr if using unified // memory. WaveBufferPtr hostBuffer_; @@ -107,6 +131,11 @@ class SplitStaging { // Optional event recorded after end of transfer. Use to sync dependent // kernels on other streams. std::unique_ptr event_; + + // Synchronizes arrival of multithreaded memcpy + Semaphore sem_{0}; + + FileInfo& fileInfo_; }; using RowSet = folly::Range; diff --git a/velox/experimental/wave/dwio/ReadStream.cpp b/velox/experimental/wave/dwio/ReadStream.cpp index e961f1929d6a6..9f192307b3fb1 100644 --- a/velox/experimental/wave/dwio/ReadStream.cpp +++ b/velox/experimental/wave/dwio/ReadStream.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/process/TraceContext.h" #include "velox/experimental/wave/dwio/ColumnReader.h" #include "velox/experimental/wave/dwio/StructColumnReader.h" @@ -49,13 +50,15 @@ void allOperands( ReadStream::ReadStream( StructColumnReader* columnReader, WaveStream& _waveStream, + io::IoStatistics* ioStats, + FileInfo& fileInfo, const OperandSet* firstColumns) - : Executable() { + : Executable(), ioStats_(ioStats), fileInfo_(fileInfo) { waveStream = &_waveStream; allOperands(columnReader, outputOperands, &abstractOperands_); output.resize(outputOperands.size()); reader_ = columnReader; - reader_->splitStaging().push_back(std::make_unique()); + reader_->splitStaging().push_back(std::make_unique(fileInfo_)); currentStaging_ = reader_->splitStaging().back().get(); } @@ -99,7 +102,9 @@ void ReadStream::makeGrid(Stream* stream) { } if (!programs_.programs.empty()) { WaveStats& stats = waveStream->stats(); - stats.bytesToDevice += currentStaging_->bytesToDevice(); + auto bytes = currentStaging_->bytesToDevice(); + ioStats_->incRawBytesRead(bytes); + stats.bytesToDevice += bytes; ++stats.numKernels; stats.numPrograms += programs_.programs.size(); stats.numThreads += @@ -108,12 +113,16 @@ void ReadStream::makeGrid(Stream* stream) { deviceStaging_.makeDeviceBuffer(waveStream->arena()); currentStaging_->transfer(*waveStream, *stream); WaveBufferPtr extra; - launchDecode(programs_, &waveStream->arena(), extra, stream); + { + PrintTime l("grid"); + launchDecode(programs_, &waveStream->arena(), extra, stream); + } reader_->recordGriddize(*stream); if (extra) { commands_.push_back(std::move(extra)); } - reader_->splitStaging().push_back(std::make_unique()); + reader_->splitStaging().push_back( + std::make_unique(fileInfo_)); currentStaging_ = reader_->splitStaging().back().get(); } } @@ -321,7 +330,9 @@ void ReadStream::launch( readStream->prepareRead(); for (;;) { bool done = readStream->makePrograms(needSync); - stats.bytesToDevice += readStream->currentStaging_->bytesToDevice(); + auto bytes = readStream->currentStaging_->bytesToDevice(); + readStream->ioStats_->incRawBytesRead(bytes); + stats.bytesToDevice += bytes; ++stats.numKernels; stats.numPrograms += readStream->programs_.programs.size(); stats.numThreads += readStream->programs_.programs.size() * @@ -342,13 +353,16 @@ void ReadStream::launch( } } firstLaunch = false; - launchDecode( - readStream->programs(), &waveStream->arena(), extra, stream); + { + PrintTime l("decode"); + launchDecode( + readStream->programs(), &waveStream->arena(), extra, stream); + } if (extra) { readStream->commands_.push_back(std::move(extra)); } readStream->reader_->splitStaging().push_back( - std::make_unique()); + std::make_unique(readStream->fileInfo_)); readStream->currentStaging_ = readStream->reader_->splitStaging().back().get(); if (needSync) { @@ -363,11 +377,14 @@ void ReadStream::launch( readStream->setBlockStatusAndTemp(); readStream->deviceStaging_.makeDeviceBuffer(waveStream->arena()); WaveBufferPtr extra; - launchDecode( - readStream->programs(), - &readStream->waveStream->arena(), - extra, - stream); + { + PrintTime l("decode-f"); + launchDecode( + readStream->programs(), + &readStream->waveStream->arena(), + extra, + stream); + } if (extra) { readStream->commands_.push_back(std::move(extra)); } diff --git a/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh b/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh index c830e026256e3..2d218b9813b4e 100644 --- a/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh +++ b/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh @@ -22,7 +22,6 @@ namespace facebook::velox::wave { namespace detail { - template __device__ void decodeTrivial(GpuDecode::Trivial& op) { auto address = reinterpret_cast(op.input); @@ -90,6 +89,7 @@ __device__ void storeResult( } } +#if 1 template __device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { int32_t i = op.begin + threadIdx.x; @@ -109,19 +109,111 @@ __device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { int32_t wordIndex = bitIndex >> 6; int32_t bit = bitIndex & 63; uint64_t word = words[wordIndex]; - uint64_t low = word >> bit; - if (bitWidth + bit <= 64) { - uint64_t index = low & mask; - storeResult(i, index, baseline, dict, scatter, result); - } else { + uint64_t index = word >> bit; + if (bitWidth + bit > 64) { + uint64_t nextWord = words[wordIndex + 1]; + index |= nextWord << (64 - bit); + } + index &= mask; + storeResult(i, index, baseline, dict, scatter, result); + } +} +#elif 0 + +template +__device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { + int32_t i = op.begin + threadIdx.x; + __shared__ int32_t end; + __shared__ uint64_t address; + __shared__ int32_t alignOffset; + __shared__ uint64_t* words; + __shared__ const T* dict; + __shared__ const int32_t* scatter; + __shared__ int64_t baseline; + __shared__ uint8_t bitWidth; + __shared__ uint64_t mask; + __shared__ T* result; + + if (threadIdx.x == 0) { + end = op.end; + address = reinterpret_cast(op.indices); + alignOffset = (address & 7) * 8; + address &= ~7UL; + words = reinterpret_cast(address); + dict = reinterpret_cast(op.alphabet); + scatter = op.scatter; + baseline = op.baseline; + bitWidth = op.bitWidth; + mask = (1LU << bitWidth) - 1; + result = reinterpret_cast(op.result); + } + __syncthreads(); + for (; i < end; i += blockDim.x) { + int32_t bitIndex = i * bitWidth + alignOffset; + int32_t wordIndex = bitIndex >> 6; + int32_t bit = bitIndex & 63; + uint64_t word = words[wordIndex]; + uint64_t index = word >> bit; + if (bitWidth + bit > 64) { + uint64_t nextWord = words[wordIndex + 1]; + index |= nextWord << (64 - bit); + } + index &= mask; + storeResult(i, index, baseline, dict, scatter, result); + } +} +#elif 0 + +template +__device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { + int32_t i = op.begin + threadIdx.x; + auto address = reinterpret_cast(op.indices); + int32_t alignOffset = (address & 7) * 8; + address &= ~7UL; + auto words = reinterpret_cast(address); + const T* dict = reinterpret_cast(op.alphabet); + auto scatter = op.scatter; + auto baseline = op.baseline; + auto bitWidth = op.bitWidth; + uint64_t mask = (1LU << bitWidth) - 1; + for (; i < op.end; i += blockDim.x) { + int32_t bitIndex = i * op.bitWidth + alignOffset; + int32_t wordIndex = bitIndex >> 6; + int32_t bit = bitIndex & 63; + uint64_t word = words[wordIndex]; + uint64_t index = word >> bit; + if (bitWidth + bit > 64) { uint64_t nextWord = words[wordIndex + 1]; - int32_t bitsFromNext = bitWidth - (64 - bit); - uint64_t index = - low | ((nextWord & ((1LU << bitsFromNext) - 1)) << (64 - bit)); - storeResult(i, index, baseline, dict, scatter, result); + index |= nextWord << (64 - bit); } + index &= mask; + storeResult( + i, index, baseline, dict, scatter, reinterpret_cast(op.result)); } } +#elif 0 +template +__device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { + int32_t i = op.begin + threadIdx.x; + auto scatter = op.scatter; + auto baseline = op.baseline; + for (; i < op.end; i += blockDim.x) { + uint64_t index; + if (op.bitWidth <= 32) { + index = loadBits32(op.indices, i * op.bitWidth, op.bitWidth); + } else { + index = loadBits64(op.indices, i * op.bitWidth, op.bitWidth); + } + storeResult( + i, + index, + baseline, + reinterpret_cast(op.alphabet), + scatter, + reinterpret_cast(op.result)); + } +} +#endif template __device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { @@ -509,9 +601,9 @@ __device__ void makeScatterIndices(GpuDecode::MakeScatterIndices& op) { } } -template +template inline __device__ T randomAccessDecode(const GpuDecode* op, int32_t idx) { - switch (op->encoding) { + switch (kEncoding) { case DecodeStep::kDictionaryOnBitpack: { const auto& d = op->data.dictionaryOnBitpack; auto width = d.bitWidth; @@ -536,7 +628,12 @@ __device__ bool testFilter(const GpuDecode* op, T data) { } } -template +template < + typename T, + int32_t kBlockSize, + bool kHasFilter, + bool kHasResult, + bool kHasNulls> __device__ void makeResult( const GpuDecode* op, T data, @@ -546,7 +643,7 @@ __device__ void makeResult( uint8_t nullFlag, int32_t* temp) { auto base = nthLoop * kBlockSize; - if (op->filterKind != WaveFilterKind::kAlwaysTrue) { + if (kHasFilter) { int32_t resultIdx = exclusiveSum( static_cast(filterPass), nullptr, @@ -560,9 +657,9 @@ __device__ void makeResult( if (filterPass) { resultIdx += base; op->resultRows[resultIdx] = row; - if (op->result) { + if (kHasResult) { reinterpret_cast(op->result)[resultIdx] = data; - if (op->resultNulls) { + if (kHasNulls) { op->resultNulls[resultIdx] = nullFlag; } } @@ -575,31 +672,66 @@ __device__ void makeResult( } auto resultIdx = base + threadIdx.x; reinterpret_cast(op->result)[resultIdx] = data; - if (op->resultNulls) { + if (kHasNulls && op->resultNulls) { op->resultNulls[resultIdx] = nullFlag; } } } -template +template < + typename T, + int32_t kBlockSize, + DecodeStep kEncoding, + WaveFilterKind kFilterKind, + bool kHasResult> __device__ void decodeSelective(GpuDecode* op) { - int32_t dataIdx; - int32_t nthLoop = 0; switch (op->nullMode) { case NullMode::kDenseNonNull: { +#if 0 + // No-filter case with everything inlined. +auto base = op->baseRow; + auto i = threadIdx.x; + auto& d = op->data.dictionaryOnBitpack; + auto end = op->maxRow - op->baseRow; + auto address = reinterpret_cast(d.indices); + int32_t alignOffset = (address & 7) * 8; + address &= ~7UL; + auto words = reinterpret_cast(address); + auto baseline = d.baseline; + auto bitWidth = d.bitWidth; + uint64_t mask = (1LU << bitWidth) - 1; + auto* result = reinterpret_cast(op->result); + for (; i < end; i += blockDim.x) { + int32_t bitIndex = (i + base) * bitWidth + alignOffset; + int32_t wordIndex = bitIndex >> 6; + int32_t bit = bitIndex & 63; + uint64_t word = words[wordIndex]; + uint64_t index = word >> bit; + if (bitWidth + bit > 64) { + uint64_t nextWord = words[wordIndex + 1]; + index |= nextWord << (64 - bit); + } + index &= mask; + result[i] = index + baseline; + } +#else do { int32_t row = threadIdx.x + op->baseRow + nthLoop * kBlockSize; bool filterPass = false; T data{}; if (row < op->maxRow) { - data = randomAccessDecode(op, row); - filterPass = - testFilter(op, data); + data = randomAccessDecode(op, row); + filterPass = testFilter(op, data); } - makeResult( - op, data, row, filterPass, nthLoop, kNotNull, op->temp); + makeResult< + T, + kBlockSize, + kFilterKind != WaveFilterKind::kAlwaysTrue, + kHasResult, + false>(op, data, row, filterPass, nthLoop, kNotNull, op->temp); } while (++nthLoop < op->numRowsPerThread); +#endif break; } case NullMode::kSparseNonNull: @@ -610,12 +742,15 @@ __device__ void decodeSelective(GpuDecode* op) { int32_t row = 0; if (threadIdx.x < numRows) { row = op->rows[threadIdx.x + nthLoop * kBlockSize]; - data = randomAccessDecode(op, row); - filterPass = - testFilter(op, data); + data = randomAccessDecode(op, row); + filterPass = testFilter(op, data); } - makeResult( - op, data, row, filterPass, nthLoop, kNotNull, op->temp); + makeResult< + T, + kBlockSize, + kFilterKind != WaveFilterKind::kAlwaysTrue, + kHasResult, + false>(op, data, row, filterPass, nthLoop, kNotNull, op->temp); } while (++nthLoop < op->numRowsPerThread); break; case NullMode::kDenseNullable: { @@ -644,13 +779,17 @@ __device__ void decodeSelective(GpuDecode* op) { filterPass = false; } } else { - data = randomAccessDecode(op, dataIdx); - filterPass = - testFilter(op, data); + data = randomAccessDecode(op, dataIdx); + filterPass = testFilter(op, data); } } } - makeResult( + makeResult< + T, + kBlockSize, + kFilterKind != WaveFilterKind::kAlwaysTrue, + kHasResult, + true>( op, data, base + threadIdx.x, @@ -677,7 +816,7 @@ __device__ void decodeSelective(GpuDecode* op) { } else { bool filterPass = true; T data{}; - dataIdx = + int32_t dataIdx = nonNullIndex256Sparse(op->nulls, op->rows + base, numRows, state); filterPass = threadIdx.x < numRows; if (filterPass) { @@ -686,12 +825,17 @@ __device__ void decodeSelective(GpuDecode* op) { filterPass = false; } } else { - data = randomAccessDecode(op, dataIdx); + data = randomAccessDecode(op, dataIdx); filterPass = testFilter(op, data); } } - makeResult( + makeResult< + T, + kBlockSize, + kFilterKind != WaveFilterKind::kAlwaysTrue, + kHasResult, + true>( op, data, op->rows[base + threadIdx.x], @@ -859,14 +1003,47 @@ __device__ void countBits(GpuDecode& step) { } } +template +__device__ void selectiveFilter(GpuDecode* op) { + switch (op->filterKind) { + case WaveFilterKind::kAlwaysTrue: + decodeSelective< + T, + kBlockSize, + kEncoding, + WaveFilterKind::kAlwaysTrue, + true>(op); + break; + case WaveFilterKind::kBigintRange: + decodeSelective< + T, + kBlockSize, + kEncoding, + WaveFilterKind::kBigintRange, + true>(op); + break; + default: + assert(false); + } +} + +template +__device__ void selectiveSwitch(GpuDecode* op) { + if (op->encoding == DecodeStep::kDictionaryOnBitpack) { + selectiveFilter(op); + } else { + assert(false); + } +} + template __device__ void decodeSwitch(GpuDecode& op) { switch (op.step) { case DecodeStep::kSelective32: - detail::decodeSelective(&op); + selectiveSwitch(&op); break; case DecodeStep::kSelective64: - detail::decodeSelective(&op); + selectiveSwitch(&op); break; case DecodeStep::kCompact64: detail::compactValues(&op); @@ -903,14 +1080,17 @@ __device__ void decodeSwitch(GpuDecode& op) { break; default: if (threadIdx.x == 0) { - printf("ERROR: Unsupported DecodeStep (with shared memory)\n"); + printf( + "ERROR: Unsupported DecodeStep %d\n", + static_cast(op.step)); } } } template __global__ void decodeGlobal(GpuDecode* plan) { - decodeSwitch(plan[blockIdx.x]); + detail::decodeSwitch(plan[blockIdx.x]); + __syncthreads(); } template diff --git a/velox/experimental/wave/dwio/decode/GpuDecoder.cu b/velox/experimental/wave/dwio/decode/GpuDecoder.cu index b955d7d574e53..2e832053b517e 100644 --- a/velox/experimental/wave/dwio/decode/GpuDecoder.cu +++ b/velox/experimental/wave/dwio/decode/GpuDecoder.cu @@ -54,13 +54,18 @@ struct alignas(16) GpuDecodeParams { void __global__ __launch_bounds__(1024) decodeKernel(GpuDecodeParams inlineParams) { - // asm volatile (".maxnregs 40;"); - GpuDecodeParams* params = - inlineParams.external ? inlineParams.external : &inlineParams; - int32_t programStart = blockIdx.x == 0 ? 0 : params->ends[blockIdx.x - 1]; - int32_t programEnd = params->ends[blockIdx.x]; - GpuDecode* ops = - reinterpret_cast(¶ms->ends[0] + roundUp(gridDim.x, 4)); + __shared__ GpuDecodeParams* params; + __shared__ int32_t programStart; + __shared__ int32_t programEnd; + __shared__ GpuDecode* ops; + if (threadIdx.x == 0) { + params = inlineParams.external ? inlineParams.external : &inlineParams; + programStart = blockIdx.x == 0 ? 0 : params->ends[blockIdx.x - 1]; + programEnd = params->ends[blockIdx.x]; + ops = + reinterpret_cast(¶ms->ends[0] + roundUp(gridDim.x, 4)); + } + __syncthreads(); for (auto i = programStart; i < programEnd; ++i) { detail::decodeSwitch(ops[i]); } @@ -74,9 +79,15 @@ void launchDecode( Stream* stream) { int32_t numBlocks = programs.programs.size(); int32_t numOps = 0; + bool allSingle = true; int32_t shared = 0; for (auto& program : programs.programs) { - numOps += program.size(); + int numSteps = program.size(); + ; + if (numSteps != 1) { + allSingle = false; + } + numOps += numSteps; for (auto& step : program) { shared = std::max( shared, detail::sharedMemorySizeForDecode(step->step)); @@ -87,7 +98,8 @@ void launchDecode( } GpuDecodeParams localParams; GpuDecodeParams* params = &localParams; - if (numOps > GpuDecodeParams::kMaxInlineOps) { + + if (numOps > GpuDecodeParams::kMaxInlineOps || allSingle) { extra = arena->allocate( (numOps + 1) * (sizeof(GpuDecode) + sizeof(int32_t)) + 16); uintptr_t aligned = @@ -105,6 +117,14 @@ void launchDecode( decodes[fill++] = *op; } } + if (allSingle) { + stream->prefetch(getDevice(), extra->as(), extra->size()); + detail::decodeGlobal + <<stream()->stream>>>(decodes); + CUDA_CHECK(cudaGetLastError()); + programs.result.transfer(*stream); + return; + } if (extra) { localParams.external = params; stream->prefetch(getDevice(), extra->as(), extra->size()); @@ -117,5 +137,10 @@ void launchDecode( } REGISTER_KERNEL("decode", decodeKernel); +namespace { +static bool decSingles_reg = registerKernel( + "decodeSingle", + reinterpret_cast(detail::decodeGlobal)); +} } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu b/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu index 5bd5eb3f28d0e..d5c3afb27aa24 100644 --- a/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu +++ b/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu @@ -29,6 +29,9 @@ namespace { using namespace facebook::velox; +// define to use the flexible call path wiht multiple ops per TB +#define USE_PROGRAM_API + // Returns the number of bytes the "values" will occupy after varint encoding. uint64_t bulkVarintSize(const uint64_t* values, int count) { constexpr uint8_t kLookupSizeTable64[64] = { @@ -84,6 +87,10 @@ inline const T* addBytes(const T* ptr, int bytes) { return reinterpret_cast(reinterpret_cast(ptr) + bytes); } +void prefetchToDevice(void* ptr, size_t size) { + CUDA_CHECK_FATAL(cudaMemPrefetchAsync(ptr, size, FLAGS_device_id, nullptr)); +} + template void makeBitpackDict( int32_t bitWidth, @@ -92,8 +99,9 @@ void makeBitpackDict( T*& dict, uint64_t*& bits, T*& result, - int32_t** scatter) { - int64_t dictBytes = sizeof(T) << bitWidth; + int32_t** scatter, + bool bitsOnly) { + int64_t dictBytes = bitsOnly ? 0 : sizeof(T) << bitWidth; int64_t bitBytes = (roundUp(numValues * bitWidth, 128) / 8) + 16; int64_t resultBytes = numValues * sizeof(T); int scatterBytes = @@ -104,7 +112,7 @@ void makeBitpackDict( cudaPtr = allocate(dictBytes + bitBytes + scatterBytes + resultBytes); T* memory = (T*)cudaPtr.get(); - dict = memory; + dict = bitsOnly ? nullptr : memory; static int sequence = 1; ++sequence; @@ -126,6 +134,7 @@ void makeBitpackDict( } result = addBytes( reinterpret_cast(memory), dictBytes + bitBytes + scatterBytes); + prefetchToDevice(memory, dictBytes + bitBytes + scatterBytes + resultBytes); } class GpuDecoderTest : public ::testing::Test { @@ -220,7 +229,8 @@ class GpuDecoderTest : public ::testing::Test { int32_t bitWidth, int64_t numValues, int numBlocks, - bool useScatter) { + bool useScatter, + bool bitsOnly = false) { gpu::CudaPtr ptr; T* dict; uint64_t* bits; @@ -233,7 +243,8 @@ class GpuDecoderTest : public ::testing::Test { dict, bits, result, - useScatter ? &scatter : nullptr); + useScatter ? &scatter : nullptr, + bitsOnly); result[numValues] = 0xdeadbeef; int valuesPerOp = roundUp(numValues / numBlocks, kBlockSize); int numOps = roundUp(numValues, valuesPerOp) / valuesPerOp; @@ -254,21 +265,29 @@ class GpuDecoderTest : public ::testing::Test { } testCase( fmt::format( - "bitpack dictplan {} numValues={} useScatter={}", + "bitpack dictplan {} -> {} numValues={} useScatter={}", + bitWidth, sizeof(T) * 8, numValues, useScatter), - [&] { decodeGlobal(ops.get(), numOps); }, + [&] { +#ifdef USE_PROGRAM_API + callViaPrograms(ops.get(), numOps); +#else + decodeGlobal(ops.get(), numOps); +#endif + }, numValues * sizeof(T), 10); if (!scatter) { EXPECT_EQ(0xdeadbeef, result[numValues]); } - auto mask = (1u << bitWidth) - 1; + auto mask = (1uL << bitWidth) - 1; for (auto i = 0; i < numValues; ++i) { int32_t bit = i * bitWidth; uint64_t word = *addBytes(bits, bit / 8); - T expected = dict[(word >> (bit & 7)) & mask]; + uint64_t index = (word >> (bit & 7)) & mask; + T expected = bitsOnly ? index : dict[index]; ASSERT_EQ(result[scatter ? scatter[i] : i], expected) << i; } } @@ -512,6 +531,20 @@ class GpuDecoderTest : public ::testing::Test { } } + void callViaPrograms(GpuDecode* ops, int32_t numOps) { + auto stream = std::make_unique(); + WaveBufferPtr extra; + DecodePrograms programs; + for (int i = 0; i < numOps; ++i) { + programs.programs.emplace_back(); + programs.programs.back().push_back(std::make_unique()); + auto opPtr = programs.programs.back().front().get(); + *opPtr = ops[i]; + } + launchDecode(programs, arena_.get(), extra, stream.get()); + stream->wait(); + } + void testMakeScatterIndicesStream(int numValues, int numBlocks) { auto bits = allocate((numValues * numBlocks + 7) / 8); fillRandomBits(bits.get(), 0.5, numValues * numBlocks); @@ -601,6 +634,15 @@ TEST_F(GpuDecoderTest, dictionaryOnBitpack) { dictTestPlan(11, 40'000'003, 1024, true); } +TEST_F(GpuDecoderTest, bitpack) { + dictTestPlan(27, 4000001, 1024, false, true); + dictTestPlan(28, 4'000'037, 1024, false, true); + dictTestPlan(26, 40'000'003, 1024, false, true); + dictTestPlan(30, 40'000'003, 1024, false, true); + dictTestPlan(47, 40'000'003, 1024, false, true); + dictTestPlan(22, 40'000'003, 1024, true, true); +} + TEST_F(GpuDecoderTest, sparseBool) { testSparseBool<256>(40013, 1024); } @@ -676,5 +718,8 @@ int main(int argc, char** argv) { printFuncAttrs("decode blocksize 512", attrs); CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, detail::decodeGlobal<1024>)); printFuncAttrs("decode blocksize 1024", attrs); + printFuncAttrs("decode2", attrs); + + printKernels(); return RUN_ALL_TESTS(); } diff --git a/velox/experimental/wave/exec/Instruction.h b/velox/experimental/wave/exec/Instruction.h index a93e62c7ed76e..0419d24ef662a 100644 --- a/velox/experimental/wave/exec/Instruction.h +++ b/velox/experimental/wave/exec/Instruction.h @@ -96,13 +96,13 @@ struct AdvanceResult { return numRows == 0 && !isRetry; } - /// Max umber of result rows. + /// Max number of result rows. int32_t numRows{0}; /// The sequence number of kernel launch that needs continue. int32_t nthLaunch{0}; - /// The ordinal of the program i the launch. + /// The ordinal of the program in the launch. int32_t programIdx{0}; /// The instruction where to pick up. If not 0, must have 'isRetry' true. diff --git a/velox/experimental/wave/exec/Project.cpp b/velox/experimental/wave/exec/Project.cpp index fb109fe5bf203..3f3f9bec9f66e 100644 --- a/velox/experimental/wave/exec/Project.cpp +++ b/velox/experimental/wave/exec/Project.cpp @@ -15,6 +15,7 @@ */ #include "velox/experimental/wave/exec/Project.h" +#include "velox/common/process/TraceContext.h" #include "velox/experimental/wave/exec/ToWave.h" #include "velox/experimental/wave/exec/Wave.h" #include "velox/experimental/wave/exec/WaveDriver.h" @@ -118,11 +119,14 @@ void Project::schedule(WaveStream& stream, int32_t maxRows) { inputControl, out); stream.setState(WaveStream::State::kParallel); - reinterpret_cast(out)->call( - out, - exes.size() * blocksPerExe, - control->sharedMemorySize, - control->params); + { + PrintTime c("expr"); + reinterpret_cast(out)->call( + out, + exes.size() * blocksPerExe, + control->sharedMemorySize, + control->params); + } // A sink at the end has no output params but need to wait for host // return event before reusing the stream. if (exes.size() == 1 && exes[0]->programShared->isSink()) { diff --git a/velox/experimental/wave/exec/TableScan.cpp b/velox/experimental/wave/exec/TableScan.cpp index eecccacd78b1f..f2c5189494749 100644 --- a/velox/experimental/wave/exec/TableScan.cpp +++ b/velox/experimental/wave/exec/TableScan.cpp @@ -18,6 +18,7 @@ #include "velox/common/time/Timer.h" #include "velox/exec/Task.h" #include "velox/experimental/wave/exec/WaveDriver.h" +#include "velox/experimental/wave/exec/WaveSplitReader.h" #include "velox/expression/Expr.h" namespace facebook::velox::wave { @@ -53,10 +54,36 @@ void TableScan::schedule(WaveStream& stream, int32_t maxRows) { waveDataSource_->schedule(stream, maxRows); nextAvailableRows_ = waveDataSource_->canAdvance(stream); if (nextAvailableRows_ == 0) { + updateStats( + waveDataSource_->splitReader()->runtimeStats(), + waveDataSource_->splitReader().get()); needNewSplit_ = true; } } +void TableScan::updateStats( + std::unordered_map connectorStats, + WaveSplitReader* splitReader) { + auto lockedStats = stats().wlock(); + if (splitReader) { + lockedStats->rawInputPositions = splitReader->getCompletedRows(); + lockedStats->rawInputBytes = splitReader->getCompletedBytes(); + } + for (const auto& [name, counter] : connectorStats) { + if (name == "ioWaitNanos") { + ioWaitNanos_ += counter.value - lastIoWaitNanos_; + lastIoWaitNanos_ = counter.value; + } + if (UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) { + lockedStats->runtimeStats.insert( + std::make_pair(name, RuntimeMetric(counter.unit))); + } else { + VELOX_CHECK_EQ(lockedStats->runtimeStats.at(name).unit, counter.unit); + } + lockedStats->runtimeStats.at(name).addValue(counter.value); + } +} + BlockingReason TableScan::nextSplit(ContinueFuture* future) { exec::Split split; blockingReason_ = driverCtx_->task->getSplitOrFuture( @@ -73,21 +100,7 @@ BlockingReason TableScan::nextSplit(ContinueFuture* future) { if (!split.hasConnectorSplit()) { noMoreSplits_ = true; if (dataSource_) { - auto connectorStats = dataSource_->runtimeStats(); - auto lockedStats = stats().wlock(); - for (const auto& [name, counter] : connectorStats) { - if (name == "ioWaitNanos") { - ioWaitNanos_ += counter.value - lastIoWaitNanos_; - lastIoWaitNanos_ = counter.value; - } - if (UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) { - lockedStats->runtimeStats.insert( - std::make_pair(name, RuntimeMetric(counter.unit))); - } else { - VELOX_CHECK_EQ(lockedStats->runtimeStats.at(name).unit, counter.unit); - } - lockedStats->runtimeStats.at(name).addValue(counter.value); - } + updateStats(dataSource_->runtimeStats()); } return BlockingReason::kNotBlocked; } @@ -129,6 +142,8 @@ BlockingReason TableScan::nextSplit(ContinueFuture* future) { waveDataSource_->addSplit(connectorSplit); } } + ++stats().wlock()->numSplits; + for (const auto& entry : pendingDynamicFilters_) { waveDataSource_->addDynamicFilter(entry.first, entry.second); } diff --git a/velox/experimental/wave/exec/TableScan.h b/velox/experimental/wave/exec/TableScan.h index a569b03eb7d9f..cb3767c51263b 100644 --- a/velox/experimental/wave/exec/TableScan.h +++ b/velox/experimental/wave/exec/TableScan.h @@ -98,6 +98,15 @@ class TableScan : public WaveSourceOperator { // needed before prepare is done, it will be made when needed. void preload(std::shared_ptr split); + // Adds 'stats' to operator stats of the containing WaveDriver. Some + // stats come from DataSource, others from SplitReader. If + // 'splitReader' is given, the completed rows/bytes from + // 'splitReader' are added. These do not come in the runtimeStats() + // map. + void updateStats( + std::unordered_map stats, + WaveSplitReader* splitReader = nullptr); + // Process-wide IO wait time. static std::atomic ioWaitNanos_; diff --git a/velox/experimental/wave/exec/ToWave.cpp b/velox/experimental/wave/exec/ToWave.cpp index c66ce86cdc9e7..aed2effcd41aa 100644 --- a/velox/experimental/wave/exec/ToWave.cpp +++ b/velox/experimental/wave/exec/ToWave.cpp @@ -464,6 +464,7 @@ void CompileState::makeAggregateAccumulate(const core::AggregationNode* node) { folly::F14FastSet programs; std::vector allArgs; std::vector aggregates; + int numPrograms = allPrograms_.size(); for (auto& key : node->groupingKeys()) { auto arg = findCurrentValue(key); allArgs.push_back(arg); @@ -520,7 +521,6 @@ void CompileState::makeAggregateAccumulate(const core::AggregationNode* node) { sourceList.push_back(s); } } - int numPrograms = allPrograms_.size(); auto aggInstruction = instruction.get(); addInstruction(std::move(instruction), nullptr, sourceList); if (allPrograms_.size() > numPrograms) { diff --git a/velox/experimental/wave/exec/Wave.cpp b/velox/experimental/wave/exec/Wave.cpp index a7f02f43ecc17..74b3de9631e82 100644 --- a/velox/experimental/wave/exec/Wave.cpp +++ b/velox/experimental/wave/exec/Wave.cpp @@ -15,10 +15,27 @@ */ #include "velox/experimental/wave/exec/Wave.h" +#include #include "velox/experimental/wave/exec/Vectors.h" +DEFINE_bool(wave_timing, true, "Enable Wave perf timers"); +DEFINE_bool( + wave_print_time, + false, + "Enables printing times inside PrinTime guard."); + namespace facebook::velox::wave { +PrintTime::PrintTime(const char* title) + : title_(title), + start_(FLAGS_wave_print_time ? getCurrentTimeMicro() : 0) {} + +PrintTime::~PrintTime() { + if (FLAGS_wave_print_time) { + std::cout << title_ << "=" << getCurrentTimeMicro() - start_; + } +} + std::string WaveTime::toString() const { if (micros < 20) { return fmt::format("{} ({} clocks)", succinctNanos(micros * 1000), clocks); @@ -38,6 +55,7 @@ void WaveStats::add(const WaveStats& other) { hostOnlyTime += other.hostOnlyTime; hostParallelTime += other.hostParallelTime; waitTime += other.waitTime; + stagingTime += other.stagingTime; } void WaveStats::clear() { @@ -178,6 +196,28 @@ std::mutex WaveStream::reserveMutex_; std::vector> WaveStream::streamsForReuse_; std::vector> WaveStream::eventsForReuse_; bool WaveStream::exitInited_{false}; +std::unique_ptr WaveStream::copyExecutor_; +std::unique_ptr WaveStream::syncExecutor_; + +folly::CPUThreadPoolExecutor* WaveStream::copyExecutor() { + return getExecutor(copyExecutor_); +} + +folly::CPUThreadPoolExecutor* WaveStream::syncExecutor() { + return getExecutor(syncExecutor_); +} + +folly::CPUThreadPoolExecutor* WaveStream::getExecutor( + std::unique_ptr& ptr) { + if (ptr) { + return ptr.get(); + } + std::lock_guard l(reserveMutex_); + if (!ptr) { + ptr = std::make_unique(32); + } + return ptr.get(); +} Stream* WaveStream::newStream() { auto stream = streamFromReserve(); @@ -429,9 +469,12 @@ bool WaveStream::isArrived( int32_t timeoutMicro) { OperandSet waitSet; if (hostReturnEvent_) { + hostReturnEvent_->wait(); bool done = hostReturnEvent_->query(); if (done) { releaseStreamsAndEvents(); + } else { + printf("bing\n"); } return done; } @@ -455,6 +498,11 @@ bool WaveStream::isArrived( releaseStreamsAndEvents(); return true; } + if (1) { + waitSet.forEach([&](int32_t id) { streams_[id]->wait(); }); + releaseStreamsAndEvents(); + return true; + } if (sleepMicro == -1) { return false; } @@ -893,6 +941,9 @@ AdvanceResult Program::canAdvance( if (stateId.has_value()) { state = stream.operatorState(stateId.value()); } + if (state == nullptr) { + return {}; + } return source->canAdvance(stream, control, state, programIdx); } diff --git a/velox/experimental/wave/exec/Wave.h b/velox/experimental/wave/exec/Wave.h index 19d2b197f170b..2737f02f1f97f 100644 --- a/velox/experimental/wave/exec/Wave.h +++ b/velox/experimental/wave/exec/Wave.h @@ -26,20 +26,42 @@ #include "velox/experimental/wave/exec/ExprKernel.h" #include "velox/experimental/wave/vector/WaveVector.h" +#include + +DECLARE_bool(wave_timing); + namespace facebook::velox::wave { +/// Scoped guard, prints the time spent inside if +class PrintTime { + public: + PrintTime(const char* title); + ~PrintTime(); + + private: + const char* title_; + uint64_t start_; +}; + /// A host side time point for measuring wait and launch prepare latency. Counts /// both wall microseconds and clocks. struct WaveTime { size_t micros{0}; uint64_t clocks{0}; + static uint64_t getMicro() { + return FLAGS_wave_timing ? getCurrentTimeMicro() : 0; + } + static WaveTime now() { + if (!FLAGS_wave_timing) { + return {0, 0}; + } return {getCurrentTimeMicro(), folly::hardware_timestamp()}; } WaveTime operator-(const WaveTime right) const { - return {right.micros - micros, right.clocks - clocks}; + return {micros - right.micros, clocks - right.clocks}; } WaveTime operator+(const WaveTime right) const { @@ -99,6 +121,12 @@ struct WaveStats { /// Time a host thread waits for device. WaveTime waitTime; + /// Time a host thread is synchronously staging data to device. This is either + /// the wall time of multithreaded memcpy to pinned host or the wall time of + /// multithreaded GPU Direct NVME read. This does not include the time of + /// hostToDeviceAsync. + WaveTime stagingTime; + void clear(); void add(const WaveStats& other); }; @@ -557,11 +585,11 @@ class WaveStream { WaveStream( GpuArena& arena, - GpuArena& hostArena, + GpuArena& deviceArena, const std::vector>* operands, OperatorStateMap* stateMap) : arena_(arena), - hostArena_(hostArena), + deviceArena_(deviceArena), operands_(operands), taskStateMap_(stateMap) { operandNullable_.resize(operands_->size(), true); @@ -580,6 +608,10 @@ class WaveStream { return arena_; } + GpuArena& deviceArena() { + return deviceArena_; + } + /// Sets nullability of a source column. This is runtime, since may depend on /// the actual presence of nulls in the source, e.g. file. Nullability /// defaults to nullable. @@ -805,6 +837,9 @@ class WaveStream { hasError_ = true; } + static folly::CPUThreadPoolExecutor* copyExecutor(); + static folly::CPUThreadPoolExecutor* syncExecutor(); + std::string toString() const; private: @@ -821,11 +856,20 @@ class WaveStream { static std::vector> eventsForReuse_; static std::vector> streamsForReuse_; static bool exitInited_; + static std::unique_ptr copyExecutor_; + static std::unique_ptr syncExecutor_; static void clearReusable(); + static folly::CPUThreadPoolExecutor* getExecutor( + std::unique_ptr& ptr); + + // Unified memory. GpuArena& arena_; - GpuArena& hostArena_; + + // Device memory. + GpuArena& deviceArena_; + const std::vector>* const operands_; // True at '[i]' if in this stream 'operands_[i]' should have null flags. std::vector operandNullable_; diff --git a/velox/experimental/wave/exec/WaveDriver.cpp b/velox/experimental/wave/exec/WaveDriver.cpp index 6a9a07c0006ff..558f7281401c9 100644 --- a/velox/experimental/wave/exec/WaveDriver.cpp +++ b/velox/experimental/wave/exec/WaveDriver.cpp @@ -15,6 +15,8 @@ */ #include "velox/experimental/wave/exec/WaveDriver.h" +#include +#include "velox/common/process/TraceContext.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/Task.h" #include "velox/experimental/wave/exec/Instruction.h" @@ -51,8 +53,8 @@ WaveDriver::WaveDriver( states_(std::move(states)) { VELOX_CHECK(!waveOperators.empty()); auto returnBatchSize = 10000 * outputType_->size() * 10; - hostArena_ = std::make_unique( - returnBatchSize * 10, getHostAllocator(getDevice())); + deviceArena_ = + std::make_unique(100000000, getDeviceAllocator(getDevice())); pipelines_.emplace_back(); for (auto& op : waveOperators) { op->setDriver(this); @@ -92,6 +94,7 @@ RowVectorPtr WaveDriver::getOutput() { auto status = advance(i); switch (status) { case Advance::kBlocked: + updateStats(); return nullptr; case Advance::kResult: if (i == last) { @@ -116,6 +119,7 @@ RowVectorPtr WaveDriver::getOutput() { } else { // Last finished. finished_ = true; + updateStats(); return nullptr; } } @@ -123,10 +127,12 @@ RowVectorPtr WaveDriver::getOutput() { } } } catch (const std::exception& e) { + updateStats(); setError(); throw; } finished_ = true; + updateStats(); return nullptr; } @@ -206,12 +212,13 @@ void WaveDriver::waitForArrival(Pipeline& pipeline) { incStats((pipeline.running[i]->stats())); pipeline.running[i]->setState(WaveStream::State::kNotRunning); moveTo(pipeline.running, i, pipeline.arrived); - totalWaitLoops += waitLoops; } ++waitLoops; } } + totalWaitLoops += waitLoops; } + namespace { bool shouldStop(exec::StopReason taskStopReason) { return taskStopReason != exec::StopReason::kNone && @@ -222,6 +229,13 @@ bool shouldStop(exec::StopReason taskStopReason) { Advance WaveDriver::advance(int pipelineIdx) { auto& pipeline = pipelines_[pipelineIdx]; int64_t waitLoops = 0; + // Set to true when any stream is seen not ready, false when any stream is + // seen ready. + bool isWaiting = false; + // Time when a stream was first seen not ready. + int64_t waitingSince = 0; + // Total wait time. Incremented when isWaiting is set to false from true. + int64_t waitUs = 0; for (;;) { const exec::StopReason taskStopReason = operatorCtx()->driverCtx()->task->shouldStop(); @@ -232,6 +246,9 @@ Advance WaveDriver::advance(int pipelineIdx) { // A point for test code injection. common::testutil::TestValue::adjust( "facebook::velox::wave::WaveDriver::getOutput::yield", this); + totalWaitLoops += waitLoops; + waveStats_.waitTime.micros += waitUs; + return Advance::kBlocked; } @@ -254,15 +271,24 @@ Advance WaveDriver::advance(int pipelineIdx) { auto& op = *pipeline.operators.back(); auto& lastSet = op.syncSet(); for (auto i = 0; i < pipeline.running.size(); ++i) { - if (pipeline.running[i]->isArrived(lastSet)) { + bool isArrived; + int64_t start = WaveTime::getMicro(); + isArrived = pipeline.running[i]->isArrived(lastSet); + waveStats_.waitTime.micros += WaveTime::getMicro() - start; + if (isArrived) { auto arrived = pipeline.running[i].get(); arrived->setState(WaveStream::State::kNotRunning); incStats(arrived->stats()); + if (isWaiting) { + waitUs += WaveTime::getMicro() - waitingSince; + isWaiting = false; + } moveTo(pipeline.running, i, pipeline.arrived); if (pipeline.makesHostResult) { result_ = makeResult(*arrived, lastSet); if (result_ && result_->size() != 0) { totalWaitLoops += waitLoops; + waveStats_.waitTime.micros += waitUs; return Advance::kResult; } --i; @@ -270,16 +296,21 @@ Advance WaveDriver::advance(int pipelineIdx) { pipeline.sinkFull = true; waitForArrival(pipeline); totalWaitLoops += waitLoops; + waveStats_.waitTime.micros += waitUs; return Advance::kResult; } + } else if (!isWaiting) { + waitingSince = WaveTime::getMicro(); + isWaiting = true; + } else { + ++waitLoops; } - ++waitLoops; } if (pipeline.finished.empty() && pipeline.running.size() + pipeline.arrived.size() < FLAGS_max_streams_per_driver) { auto stream = std::make_unique( - *arena_, *hostArena_, &operands(), &stateMap_); + *arena_, *deviceArena_, &operands(), &stateMap_); ++stream->stats().numWaves; stream->setState(WaveStream::State::kHost); pipeline.arrived.push_back(std::move(stream)); @@ -368,18 +399,22 @@ void WaveDriver::updateStats() { "wave.bytesToHost", RuntimeCounter(waveStats_.bytesToHost, RuntimeCounter::Unit::kBytes)); lockedStats->addRuntimeStat( - "wave.hostOnlyTime", + "wave.hostOnlyNanos", RuntimeCounter( waveStats_.hostOnlyTime.micros * 1000, RuntimeCounter::Unit::kNanos)); lockedStats->addRuntimeStat( - "wave.hostParallelTime", + "wave.hostParallelNanos", RuntimeCounter( waveStats_.hostParallelTime.micros * 1000, RuntimeCounter::Unit::kNanos)); lockedStats->addRuntimeStat( - "wave.waitTime", + "wave.waitNanos", RuntimeCounter( waveStats_.waitTime.micros * 1000, RuntimeCounter::Unit::kNanos)); + lockedStats->addRuntimeStat( + "wave.stagingNanos", + RuntimeCounter( + waveStats_.stagingTime.micros * 1000, RuntimeCounter::Unit::kNanos)); } } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/exec/WaveDriver.h b/velox/experimental/wave/exec/WaveDriver.h index dfcfad7f277fa..2b593520dec8d 100644 --- a/velox/experimental/wave/exec/WaveDriver.h +++ b/velox/experimental/wave/exec/WaveDriver.h @@ -59,10 +59,6 @@ class WaveDriver : public exec::SourceOperator { return *arena_; } - GpuArena& hostArena() const { - return *hostArena_; - } - const std::vector>& operands() { return operands_; } @@ -160,7 +156,6 @@ class WaveDriver : public exec::SourceOperator { std::unique_ptr arena_; std::unique_ptr deviceArena_; - std::unique_ptr hostArena_; ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; exec::BlockingReason blockingReason_; diff --git a/velox/experimental/wave/exec/tests/CMakeLists.txt b/velox/experimental/wave/exec/tests/CMakeLists.txt index 008261b7f1d44..e10cce40ba422 100644 --- a/velox/experimental/wave/exec/tests/CMakeLists.txt +++ b/velox/experimental/wave/exec/tests/CMakeLists.txt @@ -17,10 +17,53 @@ add_subdirectory(utils) add_executable(velox_wave_exec_test FilterProjectTest.cpp TableScanTest.cpp AggregationTest.cpp Main.cpp) +target_link_libraries( + velox_wave_exec_test + velox_wave_exec + velox_wave_mock_reader + velox_aggregates + velox_dwio_common + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_exec + velox_exec_test_lib + velox_functions_json + velox_functions_lib + velox_functions_prestosql + velox_functions_test_lib + velox_hive_connector + velox_memory + velox_serialization + velox_test_util + velox_type + velox_vector + velox_vector_fuzzer + Boost::atomic + Boost::context + Boost::date_time + Boost::filesystem + Boost::program_options + Boost::regex + Boost::thread + Boost::system + gtest + gtest_main + gmock + Folly::folly + gflags::gflags + glog::glog + fmt::fmt + ${FILESYSTEM}) + add_test(velox_wave_exec_test velox_wave_exec_test) +add_executable(velox_wave_benchmark WaveBenchmark.cpp) + target_link_libraries( - velox_wave_exec_test + velox_wave_benchmark + velox_query_benchmark velox_wave_exec velox_wave_mock_reader velox_aggregates @@ -53,6 +96,7 @@ target_link_libraries( gtest gtest_main gmock + ${FOLLY_BENCHMARK} Folly::folly gflags::gflags glog::glog diff --git a/velox/experimental/wave/exec/tests/WaveBenchmark.cpp b/velox/experimental/wave/exec/tests/WaveBenchmark.cpp new file mode 100644 index 0000000000000..1f4f433c62d48 --- /dev/null +++ b/velox/experimental/wave/exec/tests/WaveBenchmark.cpp @@ -0,0 +1,372 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/benchmarks/QueryBenchmarkBase.h" +#include "velox/common/process/TraceContext.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/dwio/dwrf/writer/WriterContext.h" +#include "velox/experimental/wave/exec/ToWave.h" +#include "velox/experimental/wave/exec/WaveHiveDataSource.h" +#include "velox/experimental/wave/exec/tests/utils/FileFormat.h" +#include "velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::dwio::common; + +DEFINE_string( + data_path, + "", + "Root path of test data. Data layout must follow Hive-style partitioning. " + "If the content are directories, they contain the data files for " + "each table. If they are files, they contain a file system path for each " + "data file, one per line. This allows running against cloud storage or " + "HDFS"); + +DEFINE_bool( + generate, + true, + "Generate input data. If false, data_path must " + "contain a directory with a subdirectory per table."); + +DEFINE_bool(preload, false, "Preload Wave data into RAM before starting query"); + +DEFINE_bool(wave, true, "Run benchmark with Wave"); + +DEFINE_int32(num_columns, 10, "Number of columns in test table"); + +DEFINE_int64(filter_pass_pct, 100, "Passing % for one filter"); + +DEFINE_int32(null_pct, 0, "Pct of null values in columns"); + +DEFINE_int32(num_column_filters, 0, "Number of columns wit a filter"); + +DEFINE_int32(num_expr_filters, 0, "Number of columns with a filter expr"); + +DEFINE_int32( + num_arithmetic, + 0, + "Number of arithmetic ops per column after filters"); + +DEFINE_int32(rows_per_stripe, 200000, "Rows in a stripe"); + +DEFINE_int64(num_rows, 1000000000, "Rows in test table"); + +DEFINE_int32( + run_query_verbose, + -1, + "Run a given query and print execution statistics"); + +class WaveBenchmark : public QueryBenchmarkBase { + public: + ~WaveBenchmark() { + wave::test::Table::dropAll(); + } + + void initialize() override { + QueryBenchmarkBase::initialize(); + if (FLAGS_wave) { + wave::registerWave(); + wave::WaveHiveDataSource::registerConnector(); + wave::test::WaveTestSplitReader::registerTestSplitReader(); + } + rootPool_ = memory::memoryManager()->addRootPool("WaveBenchmark"); + leafPool_ = rootPool_->addLeafChild("WaveBenchmark"); + } + + void makeData( + const RowTypePtr& type, + int32_t numVectors, + int32_t vectorSize, + float nullPct = 0) { + auto vectors = makeVectors(type, numVectors, vectorSize, nullPct / 100); + int32_t cnt = 0; + for (auto& vector : vectors) { + makeRange(vector, 1000000000, nullPct == 0); + auto rn = vector->childAt(type->size() - 1)->as>(); + for (auto i = 0; i < rn->size(); ++i) { + rn->set(i, cnt++); + } + } + if (FLAGS_wave) { + makeTable(FLAGS_data_path + "/test.wave", vectors); + if (FLAGS_generate) { + auto table = + wave::test::Table::getTable(FLAGS_data_path + "/test.wave"); + table->toFile(FLAGS_data_path + "/test.wave"); + } + } else { + std::string temp = FLAGS_data_path + "/data.dwrf"; + auto config = std::make_shared(); + config->set(dwrf::Config::COMPRESSION, common::CompressionKind_NONE); + config->set( + dwrf::Config::STRIPE_SIZE, + static_cast(FLAGS_rows_per_stripe * FLAGS_num_columns * 4)); + writeToFile(temp, vectors, config, vectors.front()->type()); + } + } + + std::vector makeVectors( + const RowTypePtr& rowType, + int32_t numVectors, + int32_t rowsPerVector, + float nullRatio = 0) { + std::vector vectors; + options_.vectorSize = rowsPerVector; + options_.nullRatio = nullRatio; + fuzzer_ = std::make_unique(options_, leafPool_.get()); + for (int32_t i = 0; i < numVectors; ++i) { + auto vector = fuzzer_->fuzzInputFlatRow(rowType); + vectors.push_back(vector); + } + return vectors; + } + + void makeRange( + RowVectorPtr row, + int64_t mod = std::numeric_limits::max(), + bool notNull = true) { + for (auto i = 0; i < row->type()->size(); ++i) { + auto child = row->childAt(i); + if (auto ints = child->as>()) { + for (auto i = 0; i < child->size(); ++i) { + if (!notNull && ints->isNullAt(i)) { + continue; + } + ints->set(i, ints->valueAt(i) % mod); + } + } + if (notNull) { + child->clearNulls(0, row->size()); + } + } + } + + wave::test::SplitVector makeTable( + const std::string& name, + std::vector& rows) { + wave::test::Table::dropTable(name); + return wave::test::Table::defineTable(name, rows)->splits(); + } + + void writeToFile( + const std::string& filePath, + const std::vector& vectors, + std::shared_ptr config, + const TypePtr& schema) { + dwrf::WriterOptions options; + options.config = config; + options.schema = schema; + auto localWriteFile = + std::make_unique(filePath, true, false); + auto sink = std::make_unique( + std::move(localWriteFile), filePath); + auto childPool = + rootPool_->addAggregateChild("HiveConnectorTestBase.Writer"); + options.memoryPool = childPool.get(); + facebook::velox::dwrf::Writer writer{std::move(sink), options}; + for (size_t i = 0; i < vectors.size(); ++i) { + writer.write(vectors[i]); + } + writer.close(); + } + + exec::test::TpchPlan getQueryPlan(int32_t query) { + switch (query) { + case 1: { + if (!type_) { + type_ = makeType(); + } + + exec::test::TpchPlan plan; + if (FLAGS_wave) { + plan.dataFiles["0"] = {FLAGS_data_path + "/test.wave"}; + } else { + plan.dataFiles["0"] = {FLAGS_data_path + "/data.dwrf"}; + } + int64_t bound = (1'000'000'000LL * FLAGS_filter_pass_pct) / 100; + std::vector scanFilters; + for (auto i = 0; i < FLAGS_num_column_filters; ++i) { + scanFilters.push_back(fmt::format("c{} < {}", i, bound)); + } + auto builder = + PlanBuilder(leafPool_.get()).tableScan(type_, scanFilters); + + for (auto i = 0; i < FLAGS_num_expr_filters; ++i) { + builder = builder.filter( + fmt::format("c{} < {}", FLAGS_num_column_filters + i, bound)); + } + + std::vector aggInputs; + if (FLAGS_num_arithmetic > 0) { + std::vector projects; + for (auto c = 0; c < type_->size(); ++c) { + std::string expr = fmt::format("c{} ", c); + for (auto i = 0; i < FLAGS_num_arithmetic; ++i) { + expr += fmt::format(" + c{}", c); + } + expr += fmt::format(" as f{}", c); + projects.push_back(std::move(expr)); + aggInputs.push_back(fmt::format("f{}", c)); + } + builder = builder.project(std::move(projects)); + } else { + for (auto i = 0; i < type_->size(); ++i) { + aggInputs.push_back(fmt::format("c{}", i)); + } + } + std::vector aggs; + for (auto i = 0; i < aggInputs.size(); ++i) { + aggs.push_back(fmt::format("sum({})", aggInputs[i])); + } + + plan.plan = builder.singleAggregation({}, aggs).planNode(); + + plan.dataFileFormat = + FLAGS_wave ? FileFormat::UNKNOWN : FileFormat::DWRF; + return plan; + } + default: + VELOX_FAIL("Bad query number"); + } + } + + void prepareQuery(int32_t query) { + switch (query) { + case 1: { + type_ = makeType(); + auto numVectors = + std::max(1, FLAGS_num_rows / FLAGS_rows_per_stripe); + if (FLAGS_generate) { + makeData( + type_, numVectors, FLAGS_num_rows / numVectors, FLAGS_null_pct); + } else { + loadData(); + } + break; + } + default: + VELOX_FAIL("Bad query number"); + } + } + + void loadData() { + if (FLAGS_wave) { + auto table = + wave::test::Table::getTable(FLAGS_data_path + "/test.wave", true); + table->fromFile(FLAGS_data_path + "/test.wave"); + if (FLAGS_preload) { + table->loadData(leafPool_); + } + } + } + + std::vector> listSplits( + const std::string& path, + int32_t numSplitsPerFile, + const TpchPlan& plan) override { + if (plan.dataFileFormat == FileFormat::UNKNOWN) { + auto table = wave::test::Table::getTable(path); + return table->splits(); + } + return QueryBenchmarkBase::listSplits(path, numSplitsPerFile, plan); + } + + void runMain(std::ostream& out, RunStats& runStats) override { + if (FLAGS_run_query_verbose == -1) { + folly::runBenchmarks(); + } else { + const auto queryPlan = getQueryPlan(FLAGS_run_query_verbose); + auto [cursor, actualResults] = run(queryPlan); + if (!cursor) { + LOG(ERROR) << "Query terminated with error. Exiting"; + exit(1); + } + auto task = cursor->task(); + ensureTaskCompletion(task.get()); + if (FLAGS_include_results) { + printResults(actualResults, out); + out << std::endl; + } + const auto stats = task->taskStats(); + int64_t rawInputBytes = 0; + for (auto& pipeline : stats.pipelineStats) { + auto& first = pipeline.operatorStats[0]; + if (first.operatorType == "TableScan" || first.operatorType == "Wave") { + rawInputBytes += first.rawInputBytes; + } + } + runStats.rawInputBytes = rawInputBytes; + out << fmt::format( + "Execution time: {}", + succinctMillis( + stats.executionEndTimeMs - stats.executionStartTimeMs)) + << std::endl; + out << fmt::format( + "Splits total: {}, finished: {}", + stats.numTotalSplits, + stats.numFinishedSplits) + << std::endl; + out << printPlanWithStats( + *queryPlan.plan, stats, FLAGS_include_custom_stats) + << std::endl; + } + } + + RowTypePtr makeType() { + std::vector names; + std::vector types; + for (auto i = 0; i < FLAGS_num_columns; ++i) { + names.push_back(fmt::format("c{}", i)); + types.push_back(BIGINT()); + } + return ROW(std::move(names), std::move(types)); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + RowTypePtr type_; + VectorFuzzer::Options options_; + std::unique_ptr fuzzer_; +}; + +void waveBenchmarkMain() { + auto benchmark = std::make_unique(); + benchmark->initialize(); + if (FLAGS_run_query_verbose != -1) { + benchmark->prepareQuery(FLAGS_run_query_verbose); + } + if (FLAGS_test_flags_file.empty()) { + RunStats stats; + benchmark->runOne(std::cout, stats); + std::cout << stats.toString(false); + } else { + benchmark->runAllCombinations(); + } + benchmark->shutdown(); +} + +int main(int argc, char** argv) { + std::string kUsage( + "This program benchmarks Wave. Run 'velox_wave_benchmark -helpon=WaveBenchmark' for available options.\n"); + gflags::SetUsageMessage(kUsage); + folly::Init init{&argc, &argv, false}; + facebook::velox::wave::printKernels(); + waveBenchmarkMain(); + return 0; +} diff --git a/velox/experimental/wave/exec/tests/utils/FileFormat.cpp b/velox/experimental/wave/exec/tests/utils/FileFormat.cpp index d6800e201cb44..cd7c2a929c90a 100644 --- a/velox/experimental/wave/exec/tests/utils/FileFormat.cpp +++ b/velox/experimental/wave/exec/tests/utils/FileFormat.cpp @@ -16,6 +16,9 @@ #include "velox/experimental/wave/exec/tests/utils/FileFormat.h" #include +#include "velox/common/config/Config.h" +#include "velox/common/file/FileSystems.h" +#include "velox/vector/VectorSaver.h" namespace facebook::velox::wave::test { @@ -364,8 +367,9 @@ void Writer::finishStripe() { for (auto& encoder : encoders_) { columns.push_back(encoder->toColumn()); } + int32_t numRows = columns[0]->numValues; stripes_.push_back(std::make_unique( - std::move(columns), dwio::common::TypeWithId::create(type_))); + std::move(columns), dwio::common::TypeWithId::create(type_), numRows)); encoders_.clear(); rowsInStripe_ = 0; } @@ -377,6 +381,234 @@ Table* Writer::finalize(std::string tableName) { return table; } +template +void writeNumber(std::ostream& stream, const T& n) { + stream.write(reinterpret_cast(&n), sizeof(T)); +} + +void Column::load( + std::unique_ptr& file, + const std::string& path, + memory::MemoryPool* pool) { + VELOX_CHECK(region.length > 0 || nulls || !children.empty()); + if (nulls) { + nulls->load(file, path, pool); + } + if (alphabet) { + alphabet->load(file, path, pool); + } + if (region.length > 0) { + values = AlignedBuffer::allocate(region.length, pool); + if (!file) { + auto fileSystem = filesystems::getFileSystem(path, nullptr); + file = fileSystem->openFileForRead(path); + } + file->pread(region.offset, region.length, values->asMutable()); + } + for (auto& child : children) { + child->load(file, path, pool); + } +} + +template +void readNumber(std::istream& stream, T& n) { + stream.read(reinterpret_cast(&n), sizeof(n)); +} + +template +void appendNumber(WriteFile& file, T n) { + file.append(std::string_view(reinterpret_cast(&n), sizeof(n))); +} + +void writeColumn(Column& column, WriteFile& file, std::stringstream& footer) { + writeNumber(footer, column.encoding); + writeNumber(footer, column.kind); + if (column.nulls) { + writeColumn(*column.nulls, file, footer); + } else { + writeNumber(footer, Encoding::kNone); + } + writeNumber(footer, column.numValues); + if (Encoding::kStruct == column.encoding) { + VELOX_NYI(); + return; + } + if (Encoding::kDict == column.encoding) { + writeColumn(*column.alphabet, file, footer); + } + writeNumber(footer, column.bitWidth); + writeNumber(footer, column.baseline); + int64_t offset = file.size(); + writeNumber(footer, offset); + int64_t size = column.values->size(); + writeNumber(footer, size); + file.append(std::string_view(column.values->as(), size)); +} + +void writeColumns( + std::vector>& columns, + Column* nulls, + WriteFile& file, + std::stringstream& footer) { + writeNumber(footer, Encoding::kStruct); + writeNumber(footer, TypeKind::ROW); + if (nulls) { + writeColumn(*nulls, file, footer); + } else { + writeNumber(footer, Encoding::kNone); + } + writeNumber(footer, columns[0]->numValues); + int32_t numColumns = columns.size(); + writeNumber(footer, numColumns); + std::vector columnFooters; + int32_t columnStart = 0; + for (auto columnIdx = 0; columnIdx < columns.size(); ++columnIdx) { + std::stringstream columnFooter; + writeColumn(*columns[columnIdx], file, columnFooter); + auto footerString = columnFooter.str(); + writeNumber(footer, columnStart); + columnStart += footerString.size(); + columnFooters.push_back(std::move(footerString)); + } + for (auto columnFooter : columnFooters) { + footer.write(columnFooter.data(), columnFooter.size()); + } +} + +std::vector> readColumns(std::stringstream& in); + +std::unique_ptr readColumn(std::stringstream& in) { + Encoding encoding; + readNumber(in, encoding); + if (encoding == Encoding::kNone) { + return nullptr; + } + auto column = std::make_unique(); + column->encoding = encoding; + readNumber(in, column->kind); + column->nulls = readColumn(in); + readNumber(in, column->numValues); + if (encoding == Encoding::kStruct) { + column->children = readColumns(in); + return column; + } + if (encoding == Encoding::kDict) { + column->alphabet = readColumn(in); + } + readNumber(in, column->bitWidth); + readNumber(in, column->baseline); + readNumber(in, column->region.offset); + readNumber(in, column->region.length); + return column; +} + +std::vector> readColumns(std::stringstream& in) { + int32_t numColumns; + readNumber(in, numColumns); + std::vector starts(numColumns); + in.read(reinterpret_cast(starts.data()), numColumns * sizeof(int32_t)); + std::vector> children(numColumns); + for (auto i = 0; i < numColumns; ++i) { + children[i] = readColumn(in); + } + return children; +} + +void Table::toFile(const std::string& path) { + std::vector stripeFooters; + std::vector stripeStart; + auto type = stripes_.front()->typeWithId->type(); + auto fileSystem = filesystems::getFileSystem(path, nullptr); + try { + fileSystem->remove(path); + } catch (const std::exception& e) { + } + auto file = fileSystem->openFileForWrite(path); + std::vector footers; + for (auto stripeIdx = 0; stripeIdx < stripes_.size(); ++stripeIdx) { + auto stripe = stripes_[stripeIdx].get(); + stripeStart.push_back(file->size()); + std::stringstream footer; + writeColumns(stripe->columns, nullptr, *file, footer); + footers.push_back(footer.str()); + } + int64_t footerStart = file->size(); + std::vector footerStarts; + for (auto& footer : footers) { + footerStarts.push_back(file->size()); + file->append(std::string_view(footer.data(), footer.size())); + } + int64_t typeStart = file->size(); + std::stringstream typeStream; + saveType(type, typeStream); + auto typeString = typeStream.str(); + file->append(std::string_view(typeString.data(), typeString.size())); + int64_t offsetStart = file->size(); + for (auto i = 0; i < stripeStart.size(); ++i) { + appendNumber(*file, stripeStart[i]); + appendNumber(*file, footerStarts[i]); + } + int32_t numStripes = stripes_.size(); + appendNumber(*file, numStripes); + appendNumber(*file, offsetStart); + appendNumber(*file, typeStart); + appendNumber(*file, footerStart); + file->close(); +} + +void Table::fromFile( + const std::string& path, + int64_t splitStart, + int64_t splitSize) { + auto fileSystem = filesystems::getFileSystem(path, nullptr); + auto file = fileSystem->openFileForRead(path); + int64_t size = file->size(); + std::string tail; + tail.resize(std::min(size, 100000)); + file->pread(size - tail.size(), tail.size(), tail.data()); + char* end = tail.data() + tail.size(); + auto numStripes = *reinterpret_cast(end - 28); + auto offsetStart = size - *reinterpret_cast(end - 24); + auto typeStart = size - *reinterpret_cast(end - 16); + auto footerStart = size - *reinterpret_cast(end - 8); + int64_t tailSize = footerStart; + if (tailSize > tail.size()) { + std::string moreTail; + moreTail.resize(tailSize - tail.size()); + file->pread(size - footerStart, moreTail.size(), moreTail.data()); + moreTail += tail; + tail = std::move(moreTail); + } + // Read the type. + std::stringstream typeStream(tail); + typeStream.seekg(tail.size() - typeStart); + auto type = restoreType(typeStream); + auto typeWithIdUnique = dwio::common::TypeWithId::create( + std::static_pointer_cast(type)); + std::shared_ptr typeWithId( + typeWithIdUnique.release()); + // Loop over offsets and make stripes for the ones in range. + auto offset = tail.size() - offsetStart; + std::vector> stripes; + for (auto i = 0; i < numStripes; ++i) { + auto dataStart = *reinterpret_cast(tail.data() + offset + i * 16); + if (dataStart >= splitStart && dataStart < splitStart + splitSize) { + auto footerStart = + size - *reinterpret_cast(tail.data() + offset + 8 + i * 16); + auto footerOff = tail.size() - footerStart; + std::stringstream footerStream(tail); + // skip 3 first,i.e. kStruct, TypeKind::ROW, kNone. + footerStream.seekg(footerOff + 3); + int32_t numRows; + readNumber(footerStream, numRows); + auto columns = readColumns(footerStream); + stripes.push_back(std::make_unique( + std::move(columns), typeWithId, numRows, path)); + } + } + addStripes(std::move(stripes), nullptr); +} + void Table::addStripes( std::vector>&& stripes, std::shared_ptr pool) { @@ -386,6 +618,19 @@ void Table::addStripes( allStripes_[s->name] = s.get(); stripes_.push_back(std::move(s)); } + if (pool) { + pools_.push_back(pool); + } +} + +void Table::loadData(std::shared_ptr pool) { + std::unique_ptr file; + VELOX_CHECK(pools_.empty()); + for (auto& stripe : stripes_) { + for (auto& column : stripe->columns) { + column->load(file, stripe->path, pool.get()); + } + } pools_.push_back(pool); } diff --git a/velox/experimental/wave/exec/tests/utils/FileFormat.h b/velox/experimental/wave/exec/tests/utils/FileFormat.h index 2a0140e709a43..58e837d6beec8 100644 --- a/velox/experimental/wave/exec/tests/utils/FileFormat.h +++ b/velox/experimental/wave/exec/tests/utils/FileFormat.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/file/Region.h" #include "velox/connectors/Connector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/dwio/common/TypeWithId.h" @@ -26,12 +27,17 @@ namespace facebook::velox::wave::test { class Table; -enum Encoding { kFlat, kDict }; +enum Encoding : uint8_t { kFlat, kDict, kStruct, kNone }; struct Column { - TypeKind kind; + void load( + std::unique_ptr& file, + const std::string& path, + memory::MemoryPool* pool); + Encoding encoding; - // Number of encoded values. + TypeKind kind; + // Number of encoded values including nulls. int32_t numValues{0}; // Distinct values in kDict. @@ -52,23 +58,48 @@ struct Column { /// Encoded column with 'numValues' null bits, nullptr if no nulls. If set, /// 'values' has an entry for each non-null. std::unique_ptr nulls; + + /// Location of raw data in the backing file, or 0,0 if no backing file. + common::Region region; + + std::vector> children; }; struct Stripe { Stripe( std::vector>&& in, - const std::shared_ptr& type) - : typeWithId(type), columns(std::move(in)) {} + const std::shared_ptr& type, + int32_t numRows, + std::string path = "") + : typeWithId(type), + numRows(numRows), + columns(std::move(in)), + path(std::move(path)) {} const Column* findColumn(const dwio::common::TypeWithId& child) const; + bool isLoaded() const { + for (auto i = 0; i < columns.size(); ++i) { + if (!columns[i]->values) { + return false; + } + } + return true; + } + // Unique name assigned when associating with a Table. std::string name; std::shared_ptr typeWithId; + /// Number of rows. Needed for counting if no columns. + int32_t numRows{0}; + // Top level columns. std::vector> columns; + + /// Path of file referenced by Region in Columns. + std::string path; }; class StringSet { @@ -212,6 +243,19 @@ class Table { return it->second.get(); } + void toFile(const std::string& path); + + /// Initializes from 'path' for stripes whose start falls between 'start' and + /// 'start + size'. + void fromFile( + const std::string& path, + int64_t start = 0, + int64_t size = std::numeric_limits::max()); + + /// Reads the encoded data for all columns in column->buffer, allocating from + /// 'pool'. + void loadData(std::shared_ptr pool); + static void dropTable(const std::string& name); static Stripe* getStripe(const std::string& path) { diff --git a/velox/experimental/wave/exec/tests/utils/TestFormatReader.cpp b/velox/experimental/wave/exec/tests/utils/TestFormatReader.cpp index c8b9e80c4d1e8..3902999f03a23 100644 --- a/velox/experimental/wave/exec/tests/utils/TestFormatReader.cpp +++ b/velox/experimental/wave/exec/tests/utils/TestFormatReader.cpp @@ -45,7 +45,8 @@ int TestFormatData::stageNulls( } Staging staging( nulls->values->as(), - bits::nwords(column_->numValues) * sizeof(uint64_t)); + bits::nwords(column_->numValues) * sizeof(uint64_t), + column_->region); auto id = splitStaging.add(staging); splitStaging.registerPointer(id, &grid_.nulls, true); return id; @@ -90,7 +91,8 @@ void TestFormatData::startOp( stageNulls(deviceStaging, splitStaging); if (!staged_) { staged_ = true; - Staging staging(column_->values->as(), column_->values->size()); + Staging staging( + column_->values->as(), column_->values->size(), column_->region); id = splitStaging.add(staging); } auto rowsPerBlock = FLAGS_wave_reader_rows_per_tb; diff --git a/velox/experimental/wave/exec/tests/utils/TestFormatReader.h b/velox/experimental/wave/exec/tests/utils/TestFormatReader.h index 409d5283dd18d..55332c02b4c00 100644 --- a/velox/experimental/wave/exec/tests/utils/TestFormatReader.h +++ b/velox/experimental/wave/exec/tests/utils/TestFormatReader.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/connectors/hive/FileHandle.h" #include "velox/experimental/wave/dwio/ColumnReader.h" #include "velox/experimental/wave/exec/tests/utils/FileFormat.h" #include "velox/type/Subfield.h" @@ -69,6 +70,7 @@ class TestFormatData : public wave::FormatData { int32_t stageNulls(ResultStaging& deviceStaging, SplitStaging& splitStaging); const OperandId operand_; + int32_t totalRows_{0}; const test::Column* column_; diff --git a/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.cpp b/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.cpp index 15c1139855b91..bdbaab479d75f 100644 --- a/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.cpp +++ b/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.cpp @@ -27,10 +27,29 @@ WaveTestSplitReader::WaveTestSplitReader( const std::shared_ptr& split, const SplitReaderParams& params, const DefinesMap* defines) { + params_ = params; auto hiveSplit = dynamic_cast(split.get()); VELOX_CHECK_NOT_NULL(hiveSplit); + stripe_ = test::Table::getStripe(hiveSplit->filePath); + if (!stripe_->isLoaded()) { + try { + fileHandle_ = params_.fileHandleFactory->generate( + stripe_->path, + hiveSplit->properties.has_value() ? &*hiveSplit->properties + : nullptr); + VELOX_CHECK_NOT_NULL(fileHandle_.get()); + } catch (const VeloxRuntimeError& e) { + if (e.errorCode() == error_code::kFileNotFound && + params_.hiveConfig->ignoreMissingFiles( + params_.connectorQueryCtx->sessionProperties())) { + emptySplit_ = true; + return; + } + throw; + } + } VELOX_CHECK_NOT_NULL(stripe_); TestFormatParams formatParams( *params.connectorQueryCtx->memoryPool(), readerStats_, stripe_); @@ -43,6 +62,11 @@ WaveTestSplitReader::WaveTestSplitReader( empty, *defines, true); + if (fileHandle_.get()) { + fileInfo_.file = fileHandle_->file.get(); + fileInfo_.fileId = &fileHandle_->uuid; + fileInfo_.cache = params_.connectorQueryCtx->cache(); + } } int32_t WaveTestSplitReader::canAdvance(WaveStream& stream) { @@ -68,7 +92,10 @@ void WaveTestSplitReader::schedule(WaveStream& waveStream, int32_t maxRows) { } if (!exe) { exe = std::make_unique( - reinterpret_cast(columnReader_.get()), waveStream); + reinterpret_cast(columnReader_.get()), + waveStream, + params_.ioStats.get(), + fileInfo_); } ReadStream::launch(std::move(exe), nextRow_, rowSet); nextRow_ += scheduledRows_; diff --git a/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.h b/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.h index 2881f6de03f1e..320c8a08784e6 100644 --- a/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.h +++ b/velox/experimental/wave/exec/tests/utils/WaveTestSplitReader.h @@ -31,7 +31,7 @@ class WaveTestSplitReader : public WaveSplitReader { const DefinesMap* defines); bool emptySplit() override { - return !stripe_ || stripe_->columns[0]->numValues == 0; + return !stripe_ || stripe_->columns[0]->numValues == 0 || emptySplit_; } int32_t canAdvance(WaveStream& stream) override; @@ -43,11 +43,11 @@ class WaveTestSplitReader : public WaveSplitReader { bool isFinished() const override; uint64_t getCompletedBytes() override { - return 0; + return params_.ioStats->rawBytesRead(); } uint64_t getCompletedRows() override { - return 0; + return nextRow_; } std::unordered_map runtimeStats() override { @@ -63,6 +63,8 @@ class WaveTestSplitReader : public WaveSplitReader { std::shared_ptr split_; SplitReaderParams params_; + FileHandleCachedPtr fileHandleCachePtr; + cache::AsyncDataCache* cache_{nullptr}; test::Stripe* stripe_{nullptr}; std::unique_ptr columnReader_; // First unscheduled row. @@ -70,6 +72,9 @@ class WaveTestSplitReader : public WaveSplitReader { int32_t scheduledRows_{0}; dwio::common::ColumnReaderStatistics readerStats_; raw_vector rows_; + FileHandleCachedPtr fileHandle_; + FileInfo fileInfo_; + bool emptySplit_{false}; }; } // namespace facebook::velox::wave::test