Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Apr 24, 2023
1 parent 6b5035a commit d7906aa
Showing 1 changed file with 2 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,18 @@ package com.github.sharpdata.sharpetl.spark.end2end.delta

import com.github.sharpdata.sharpetl.core.util.ETLLogger
import com.github.sharpdata.sharpetl.spark.end2end.ETLSuit.runJob
import com.github.sharpdata.sharpetl.spark.extension.UdfInitializer
import com.github.sharpdata.sharpetl.spark.test.DataFrameComparer
import com.github.sharpdata.sharpetl.spark.utils.ETLSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.scalatest.{BeforeAndAfterEach, DoNotDiscover}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should

import java.sql.Timestamp
import org.scalatest.{BeforeAndAfterEach, DoNotDiscover}

@DoNotDiscover
class FlyDeltaSpec extends AnyFunSpec
with should.Matchers
// with ETLSuit
with DeltaSuit
// with HiveSuit
with DeltaSuit
with DataFrameComparer
with BeforeAndAfterEach {

/*val spark: SparkSession = {
//ETLSparkSession.local = false
val session = SparkSession
.builder()
.master("local")
.appName("spark session")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()
UdfInitializer.init(session)
session
}*/

val data = Seq(
Row("qqq", Timestamp.valueOf("2021-01-01 08:00:00")),
Row("super qqq", Timestamp.valueOf("2021-01-01 08:00:00"))
)

val schema = List(
StructField("name", StringType, true),
StructField("update_time", TimestampType, true)
)

val sampleDataDf: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)

val expected: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(
Seq(
Row("qqq", Timestamp.valueOf("2021-01-01 08:00:00"), 199),
Row("super qqq", Timestamp.valueOf("2021-01-01 08:00:00"), 199)
)
),
StructType(List(
StructField("new_name", StringType, true),
StructField("update_time", TimestampType, true),
StructField("test_expression", IntegerType, true)
))
)

it("should just run with delta") {
if (spark.version.startsWith("2.3")) {
ETLLogger.error("Delta Lake does NOT support Spark 2.3.x")
Expand Down

0 comments on commit d7906aa

Please sign in to comment.