Skip to content

Commit

Permalink
Enable Spark query runner in aggregate fuzzer test
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Oct 29, 2024
1 parent 27d0527 commit d0b6ed9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 37 deletions.
46 changes: 32 additions & 14 deletions .github/workflows/experimental.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,37 +180,55 @@ jobs:
/tmp/aggregate_fuzzer_repro
/tmp/server.log
linux-spark-fuzzer-run:
runs-on: ubuntu-latest
needs: compile
spark-java-aggregation-fuzzer-run:
runs-on: 16-core-ubuntu
container: ghcr.io/facebookincubator/velox-dev:spark-server
timeout-minutes: 120
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache/"
LINUX_DISTRO: "centos"
steps:

- name: "Restore ccache"
uses: actions/cache@v3
with:
path: "${{ env.CCACHE_DIR }}"
# We are using the benchmark ccache as it has all
# required features enabled, so no need to create a new one
key: ccache-spark-${{ github.sha }}
restore-keys: |
ccache-spark-
- name: "Checkout Repo"
uses: actions/checkout@v3
with:
path: velox
submodules: 'recursive'
ref: "${{ inputs.ref || 'main' }}"

- name: "Install dependencies"
run: source ./scripts/setup-ubuntu.sh && install_apt_deps

- name: Download spark aggregation fuzzer
uses: actions/download-artifact@v3
with:
name: spark_aggregation_fuzzer
- name: "Build"
run: |
cd velox
source /opt/rh/gcc-toolset-12/enable
make debug NUM_THREADS="${{ inputs.numThreads || 8 }}" MAX_HIGH_MEM_JOBS="${{ inputs.maxHighMemJobs || 8 }}" MAX_LINK_JOBS="${{ inputs.maxLinkJobs || 4 }}" EXTRA_CMAKE_FLAGS="-DVELOX_ENABLE_ARROW=ON ${{ inputs.extraCMakeFlags }}"
ccache -s
- name: "Run Spark Aggregate Fuzzer"
run: |
cd velox
bash /opt/start-spark.sh
# Sleep for 60 seconds to allow Spark server to start.
sleep 60
mkdir -p /tmp/spark_aggregate_fuzzer_repro/
rm -rfv /tmp/spark_aggregate_fuzzer_repro/*
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 1800 \
--duration_sec 3600 \
--logtostderr=1 \
--minloglevel=0 \
--repro_persist_path=/tmp/spark_aggregate_fuzzer_repro \
--enable_sorted_aggregations=true \
--enable_sorted_aggregations=false \
&& echo -e "\n\nSpark Aggregation Fuzzer run finished successfully."
- name: Archive Spark aggregate production artifacts
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ jobs:
spark-aggregate-fuzzer-run:
name: Spark Aggregate Fuzzer
runs-on: ubuntu-latest
container: ghcr.io/facebookincubator/velox-dev:centos9
container: ghcr.io/facebookincubator/velox-dev:spark-server
needs: compile
timeout-minutes: 60
timeout-minutes: 120
steps:

- name: Download spark aggregation fuzzer
Expand All @@ -466,12 +466,16 @@ jobs:

- name: Run Spark Aggregate Fuzzer
run: |
bash /opt/start-spark.sh
# Sleep for 60 seconds to allow Spark server to start.
sleep 60
mkdir -p /tmp/spark_aggregate_fuzzer_repro/logs/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec $DURATION \
--enable_sorted_aggregations=false \
--minloglevel=0 \
--stderrthreshold=2 \
--log_dir=/tmp/spark_aggregate_fuzzer_repro/logs \
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ void setupMemory(
void registerHiveConnector(
const std::unordered_map<std::string, std::string>& hiveConfigs) {
auto configs = hiveConfigs;
// Make sure not to run out of open file descriptors.
configs[connector::hive::HiveConfig::kNumCacheFileHandles] = "1000";
if (!connector::hasConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
connector::registerConnectorFactory(
Expand Down
37 changes: 16 additions & 21 deletions velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

#include "velox/exec/fuzzer/AggregationFuzzerOptions.h"
#include "velox/exec/fuzzer/AggregationFuzzerRunner.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/TransformResultVerifier.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/functions/sparksql/fuzzer/SparkQueryRunner.h"

DEFINE_int64(
seed,
Expand Down Expand Up @@ -53,10 +53,13 @@ int main(int argc, char** argv) {
facebook::velox::functions::prestosql::registerInternalFunctions();
facebook::velox::memory::MemoryManager::initialize({});

// TODO: List of the functions that at some point crash or fail and need to
// be fixed before we can enable. Constant argument of bloom_filter_agg cause
// fuzzer test fail.
std::unordered_set<std::string> skipFunctions = {"bloom_filter_agg"};
// Spark does not provide user-accessible aggregate functions with the
// following names.
std::unordered_set<std::string> skipFunctions = {
"bloom_filter_agg",
"first_ignore_null",
"last_ignore_null",
"regr_replacement"};

using facebook::velox::exec::test::TransformResultVerifier;

Expand Down Expand Up @@ -95,21 +98,9 @@ int main(int argc, char** argv) {
size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool{
facebook::velox::memory::memoryManager()->addRootPool()};
auto duckQueryRunner =
std::make_unique<facebook::velox::exec::test::DuckQueryRunner>(
rootPool.get());
duckQueryRunner->disableAggregateFunctions(
{// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis"});
auto sparkQueryRunner = std::make_unique<
facebook::velox::functions::sparksql::fuzzer::SparkQueryRunner>(
rootPool.get(), "localhost:15002", "fuzzer", "aggregate");

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
using Options = facebook::velox::exec::test::AggregationFuzzerOptions;
Expand All @@ -119,5 +110,9 @@ int main(int argc, char** argv) {
options.skipFunctions = skipFunctions;
options.customVerificationFunctions = customVerificationFunctions;
options.orderableGroupKeys = true;
return Runner::run(initialSeed, std::move(duckQueryRunner), options);
options.timestampPrecision =
facebook::velox::VectorFuzzer::Options::TimestampPrecision::kMicroSeconds;
options.hiveConfigs = {
{facebook::velox::connector::hive::HiveConfig::kReadTimestampUnit, "6"}};
return Runner::run(initialSeed, std::move(sparkQueryRunner), options);
}

0 comments on commit d0b6ed9

Please sign in to comment.