Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] SIGSEGV / SIGBUS error in streaming mode LightGBM #2333

Open
3 of 19 tasks
fonhorst opened this issue Jan 4, 2025 · 0 comments
Open
3 of 19 tasks

[BUG] SIGSEGV / SIGBUS error in streaming mode LightGBM #2333

fonhorst opened this issue Jan 4, 2025 · 0 comments

Comments

@fonhorst
Copy link

fonhorst commented Jan 4, 2025

SynapseML version

1.0.8

System information

  • Python 3.10.12, Scala 2.12
  • Spark 3.5.3
  • Spark Platform: Synapse

Describe the problem

I'm experiencing strange errors during running synapseml lightgbm in a streaming mode (dataTransferMode: streaming). If I run SynapseML LightGBM with one or several executors having more than one core each, the app encounter either SIGSEGV/SIGBUS errors or errors related to memory corruption on lightgbm side.

The exact error may look like:

  1. Example 1
    JRE version: OpenJDK Runtime Environment Temurin-17.0.13+11 (17.0.13+11) (build 17.0.13+11)
    Java VM: OpenJDK 64-Bit Server VM Temurin-17.0.13+11 (17.0.13+11, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
    Problematic frame:
    [thread 277 also had an error]
    C [lib_lightgbm.so+0x1acac3][LightGBM] [Warning] [LightGBM] [Warning] std::bad_alloc
    std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    LightGBM::SparseBin::Push(int, int, unsigned int)+0x33
    [LightGBM] [Warning] [LightGBM] [Warning] std::bad_alloc
    std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    terminate called without an active exception

  2. Example 2
    [LightGBM] [Info] Loaded reference dataset: 59 features, 1199951 num_data
    double free or corruption (!prev)

  3. Example 3
    SIGBUS (0x7) at pc=0x00007f2d7475aac3, pid=15, tid=348
    Problematic frame:
    C [lib_lightgbm.so+0x1acac3] LightGBM::SparseBin::Push(int, int, unsigned int)+0x33
    [LightGBM] [Warning] [LightGBM] [Warning] std::bad_allocstd::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [LightGBM] [Warning] std::bad_alloc
    [thread 261 also had an error]
    malloc_consolidate(): invalid chunk size

I have noticed several moments in the behavior of Synapse ML Lightgbm:

  1. Errors don't appear if executors have only one core.
  2. Errors don't appear if the app is running in bulk mode (dataTransferMode: bulk)
  3. Errors don't appear on dataset company_bankruptcy_dataset under any settings
    (https://www.kaggle.com/datasets/fedesoriano/company-bankruptcy-prediction)
  4. Errors appear on two more datasets of different sizes
    (used_cars and lama_test_dataset)
  5. Some configurations for the app may lead to sporadic appearance of the errors. For instance, if app is running with 2 executors each having 2 cores, the error appear only in a fraction of runs.

Statistics of failed and successful runs for different app settings is presented in the table below:
dataset | instances | cores | success_percent
0 | company_bankruptcy_dataset | 1 | 1 | 100.000000
1 | company_bankruptcy_dataset | 1 | 4 | 100.000000
2 | company_bankruptcy_dataset | 2 | 1 | 100.000000
3 | company_bankruptcy_dataset | 2 | 2 | 100.000000
4 | lama_test_dataset | 1 | 1 | 100.000000
5 | lama_test_dataset | 1 | 4 | 0.000000
6 | lama_test_dataset | 2 | 1 | 100.000000
7 | lama_test_dataset | 2 | 2 | 100.000000
8 | used_cars_dataset | 1 | 1 | 100.000000
9 | used_cars_dataset | 1 | 4 | 0.000000
10 | used_cars_dataset | 2 | 1 | 66.666667
11 | used_cars_dataset | 2 | 2 | 54.545455

The datasets, where errors appear on, don't have any suspicious to me columns of values (there are no NaN values in the datasets).
All datasets I'm running with are available on the link: Google Drive

The script that runs Synapse ML Lightgbm is in the attachment. Exact parameters of LightGBM can be found there.

I would appreciate any help on this issue. For me It seems to be a bug related to race conditions in dataset / buffer preparation for native LightGBM, but may be there is something wrong with may settings.

Code to reproduce issue

import logging
import os
import signal
import sys
import time
from typing import Optional, Any, Dict, Tuple
import psutil
from pyspark.sql import functions as sf
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession, DataFrame
from synapse.ml.lightgbm import LightGBMClassifier, LightGBMRegressor


logger = logging.getLogger(__name__)


GENERAL_RUN_PARAMS = {
    'featuresCol': 'Mod_0_LightGBM_vassembler_features',
    'verbosity': 1,
    'dataTransferMode': 'streaming',
    'useSingleDatasetMode': True,
    'useBarrierExecutionMode': False,
    'isProvideTrainingMetric': True,
    'chunkSize': 10_000,
    'defaultListenPort': 13614,
    'learningRate': 0.03,
    'numLeaves': 64,
    'featureFraction': 0.7,
    'baggingFraction': 0.7,
    'baggingFreq': 1,
    'maxDepth': -1,
    'minGainToSplit': 0.0,
    'maxBin': 255,
    'minDataInLeaf': 5,
    'numIterations': 50,
    'earlyStoppingRound': 200,
    'numTasks': None,
    'numThreads': None,
    # 'maxStreamingOMPThreads': 1,
}


def get_lightgbm_params(dataset_name: str) -> Dict[str, Any]:
    match dataset_name:
        case "company_bankruptcy_dataset":
            dataset_specific_params = {
                'labelCol': "Bankrupt?",
                'objective': 'binary',
                'metric': 'auc',
                'rawPredictionCol': 'raw_prediction',
                'probabilityCol': 'Mod_0_LightGBM_prediction_0',
                'predictionCol': 'prediction',
                'isUnbalance': True
            }
        case "lama_test_dataset":
            dataset_specific_params = {
                'labelCol': 'TARGET',
                'objective': 'binary',
                'metric': 'auc',
                'rawPredictionCol': 'raw_prediction',
                'probabilityCol': 'Mod_0_LightGBM_prediction_0',
                'predictionCol': 'prediction',
                'isUnbalance': True
            }
        case "used_cars_dataset":
            dataset_specific_params = {
                'labelCol': "price",
                'objective': 'regression',
                'metric': 'rmse',
                'predictionCol': 'prediction'
            }
        case "adv_used_cars_dataset":
            dataset_specific_params = {
                'labelCol': "price",
                'objective': 'regression',
                'metric': 'rmse',
                'predictionCol': 'prediction'
            }
        case _:
            raise ValueError("Unknown dataset")

    return {
        **GENERAL_RUN_PARAMS,
        **dataset_specific_params
    }


def get_spark_session(partitions_num: Optional[int] = None):
    partitions_num = partitions_num if partitions_num else 6

    if os.environ.get("SCRIPT_ENV", None) == "cluster":
        spark_sess = SparkSession.builder.getOrCreate()
    else:

        extra_jvm_options = "-Dio.netty.tryReflectionSetAccessible=true "

        spark_sess = (
            SparkSession.builder.master(f"local[{partitions_num}]")
            # .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.8")
            .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.8")
            .config("spark.jars", "jars/spark-lightautoml_2.12-0.1.1.jar")
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
            .config("spark.driver.extraJavaOptions", extra_jvm_options)
            .config("spark.executor.extraJavaOptions", extra_jvm_options)
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.kryoserializer.buffer.max", "512m")
            .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
            .config("spark.cleaner.referenceTracking", "true")
            .config("spark.cleaner.periodicGC.interval", "1min")
            .config("spark.sql.shuffle.partitions", f"{partitions_num}")
            .config("spark.default.parallelism", f"{partitions_num}")
            .config("spark.driver.memory", "4g")
            .config("spark.executor.memory", "4g")
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")
            .getOrCreate()
        )

    spark_sess.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")

    spark_sess.sparkContext.setLogLevel("WARN")

    return spark_sess


def load_data(spark: SparkSession, data_path: str, partitions_coefficient: int = 1) -> DataFrame:
    data = spark.read.csv(data_path, header=True, inferSchema=True, encoding="UTF-8")

    execs = int(spark.conf.get("spark.executor.instances", "1"))
    cores = int(spark.conf.get("spark.executor.cores", "8"))

    data = data.repartition(execs * cores * partitions_coefficient).cache()
    data.write.mode("overwrite").format("noop").save()

    return data


def load_test_and_train(
    spark: SparkSession, data_path: str, seed: int = 42, test_size: float = 0.2, partitions_coefficient: int = 1
) -> Tuple[DataFrame, DataFrame]:
    assert 0 <= test_size <= 1

    data = spark.read.csv(data_path, header=True, inferSchema=True, encoding="UTF-8")

    execs = int(spark.conf.get("spark.executor.instances", "1"))
    cores = int(spark.conf.get("spark.executor.cores", "8"))

    data = data.repartition(execs * cores * partitions_coefficient).cache()
    data.write.mode("overwrite").format("noop").save()

    train_data, test_data = data.randomSplit([1 - test_size, test_size], seed)

    return train_data, test_data


def clean_java_processes():
    if os.environ.get("SCRIPT_ENV", None) == "cluster":
        time.sleep(10)
        pids = [proc.pid for proc in psutil.process_iter() if "java" in proc.name()]
        print(f"Found unstopped java processes: {pids}")
        for pid in pids:
            try:
                os.kill(pid, signal.SIGKILL)
            except:
                logger.warning(f"Exception during killing the java process with pid {pid}", exc_info=True)


def main():
    dataset_name = sys.argv[1]

    print(f"Working with dataset: {dataset_name}")

    spark = get_spark_session()

    train_df, test_df = load_test_and_train(
        spark=spark,
        data_path=f"hdfs://node21.bdcl:9000/opt/preprocessed_datasets/CSV/{dataset_name}.csv"
    )

    print(f"ASSEMBLED DATASET SIZE: {train_df.count()}")

    run_params = get_lightgbm_params(dataset_name)
    features = [c for c in train_df.columns if c not in [run_params['labelCol'], '_id', 'reader_fold_num', 'is_val']]
    assembler = VectorAssembler(inputCols=features, outputCol=run_params['featuresCol'], handleInvalid="error")

    match run_params['objective']:
        case 'regression':
            lgbm = LightGBMRegressor(**run_params)
        case 'binary':
            lgbm = LightGBMClassifier(**run_params)
        case _:
            raise ValueError()

    df = assembler.transform(train_df)
    model = lgbm.fit(df)
    print("Training is finished")

    df = assembler.transform(test_df)
    predicts_df = model.transform(df)
    predicts_df.write.mode("overwrite").format("noop").save()
    print("Predicting is finished")

    spark.stop()
    clean_java_processes()


if __name__ == "__main__":
    main()

Other info / logs

log_error_1.txt
log_error_2.txt
log_error_3.txt
log_error_4.txt
log_error_5.txt

What component(s) does this bug affect?

  • area/cognitive: Cognitive project
  • area/core: Core project
  • area/deep-learning: DeepLearning project
  • area/lightgbm: Lightgbm project
  • area/opencv: Opencv project
  • area/vw: VW project
  • area/website: Website
  • area/build: Project build system
  • area/notebooks: Samples under notebooks folder
  • area/docker: Docker usage
  • area/models: models related issue

What language(s) does this bug affect?

  • language/scala: Scala source code
  • language/python: Pyspark APIs
  • language/r: R APIs
  • language/csharp: .NET APIs
  • language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • integrations/synapse: Azure Synapse integrations
  • integrations/azureml: Azure ML integrations
  • integrations/databricks: Databricks integrations
@fonhorst fonhorst added the bug label Jan 4, 2025
@github-actions github-actions bot added the triage label Jan 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant