Skip to content

Commit

Permalink
Support store logs in DataBricks Runtime (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Apr 24, 2023
1 parent 653e0f3 commit 4776828
Show file tree
Hide file tree
Showing 73 changed files with 2,533 additions and 133 deletions.
7 changes: 6 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ dependencies {
implementation "joda-time:joda-time:2.9.9"
implementation "org.mybatis:mybatis:3.5.9"
implementation 'com.zaxxer:HikariCP:2.6.1'
implementation 'org.flywaydb:flyway-core:7.14.0'
implementation('io.github.coolbeevip:flyway-core:9.15.2.2') {
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-toml'
}
implementation('io.github.coolbeevip:flyway-mysql:9.15.2.2') {
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-toml'
}
implementation group: 'org.apache.poi', name: 'poi', version: '4.1.0'
implementation group: 'org.apache.poi', name: 'poi-ooxml', version: '4.1.0'
implementation 'org.apache.commons:commons-lang3:3.10'
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/resources/db/mysql/migration/V1__init.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
create table job_log
(
job_id bigint auto_increment primary key,
job_id varchar(128) primary key,
workflow_name varchar(128) charset utf8 not null,
`period` int not null,
`period` int not null,
job_name varchar(128) charset utf8 not null,
data_range_start varchar(128) charset utf8 null,
data_range_end varchar(128) charset utf8 null,
Expand All @@ -13,21 +13,21 @@ create table job_log
last_update_time datetime default CURRENT_TIMESTAMP not null comment 'log update time',
load_type varchar(32) null,
log_driven_type varchar(32) null,
file text charset utf8 null,
file text charset utf8 null,
application_id varchar(64) charset utf8 null,
project_name varchar(64) charset utf8 null,
runtime_args text charset utf8 null
runtime_args text charset utf8 null
) charset = utf8;

create table quality_check_log
(
id bigint auto_increment
primary key,
job_id bigint not null,
job_id varchar(128) not null,
job_name varchar(64) charset utf8 not null comment 'job name(workflow_name + period)',
`column` varchar(64) charset utf8 not null comment 'issue column name',
data_check_type varchar(64) charset utf8 not null,
ids text charset utf8 not null comment 'issue data primary key, concat by `, `, multiple primary key will be concat by `__`',
ids text charset utf8 not null comment 'issue data primary key, concat by `, `, multiple primary key will be concat by `__`',
error_type varchar(16) charset utf8 not null comment 'warn/error',
warn_count bigint null,
error_count bigint null,
Expand All @@ -37,18 +37,18 @@ create table quality_check_log

create table step_log
(
job_id bigint not null,
step_id varchar(64) not null,
status varchar(32) not null,
start_time datetime not null,
job_id varchar(128) not null,
step_id varchar(64) not null,
status varchar(32) not null,
start_time datetime not null,
end_time datetime null,
duration int(11) unsigned not null,
output text not null,
output text not null,
source_count bigint null,
target_count bigint null,
success_count bigint null comment 'success data count',
failure_count bigint null comment 'failure data count',
error text null,
error text null,
source_type varchar(32) null,
target_type varchar(32) null,
primary key (job_id, step_id)
Expand Down
54 changes: 54 additions & 0 deletions core/src/main/resources/db/spark/migration/V1__init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
create table `sharp_etl`.job_log
(
job_id string,
workflow_name string not null,
`period` int not null,
job_name string not null,
data_range_start string,
data_range_end string,
job_start_time timestamp,
job_end_time timestamp,
status string not null comment 'job status: SUCCESS,FAILURE,RUNNING',
create_time timestamp comment 'log create time',
last_update_time timestamp comment 'log update time',
load_type string,
log_driven_type string,
file string,
application_id string,
project_name string,
runtime_args string
) using delta;

create table `sharp_etl`.quality_check_log
(
id bigint,
job_id string not null,
job_name string not null comment 'job name(workflow_name + period)',
`column` string not null comment 'issue column name',
data_check_type string not null,
ids string not null comment 'issue data primary key, concat by `, `, multiple primary key will be concat by `__`',
error_type varchar(16) not null comment 'warn/error',
warn_count bigint,
error_count bigint,
create_time timestamp comment 'log create time',
last_update_time timestamp comment 'log update time'
) using delta;

create table `sharp_etl`.step_log
(
job_id string not null,
step_id string not null,
status string not null,
start_time timestamp not null,
end_time timestamp,
duration int not null,
output string not null,
source_count bigint,
target_count bigint,
success_count bigint comment 'success data count',
failure_count bigint comment 'failure data count',
error string,
source_type string,
target_type string
) using delta;

18 changes: 8 additions & 10 deletions core/src/main/resources/db/sqlserver/migration/V1__init.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
create table sharp_etl.job_log
(
job_id bigint identity
constraint PK_JobLog_TransactionID
primary key,
job_id nvarchar(128) identity primary key,
workflow_name nvarchar(128) not null,
"period" int not null,
job_name nvarchar(128) not null,
Expand All @@ -27,7 +25,7 @@ create table sharp_etl.quality_check_log
id bigint identity
constraint PK_QCLog_TransactionID
primary key,
job_id bigint not null,
job_id nvarchar(128) not null,
job_name nvarchar(64) not null,
[column] nvarchar(64) not null,
data_check_type nvarchar(64) not null,
Expand All @@ -42,13 +40,13 @@ go

create table sharp_etl.step_log
(
job_id bigint not null,
step_id varchar(64) not null,
status varchar(32) not null,
start_time datetime not null,
job_id nvarchar(128) not null,
step_id varchar(64) not null,
status varchar(32) not null,
start_time datetime not null,
end_time datetime,
duration int not null,
output varchar(max) not null,
duration int not null,
output varchar(max) not null,
source_count bigint,
target_count bigint,
success_count bigint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.github.sharpdata.sharpetl.core.util.Constants.Job.nullDataTime
import com.github.sharpdata.sharpetl.core.util.DateUtil.{BigIntToLocalDateTime, LocalDateTimeToBigInt}
import com.github.sharpdata.sharpetl.core.util.IncIdUtil.NumberStringPadding
import com.github.sharpdata.sharpetl.core.util.JobLogUtil.JobLogFormatter
import com.github.sharpdata.sharpetl.core.util.StringUtil.{BigIntConverter, isNullOrEmpty}
import com.github.sharpdata.sharpetl.core.util.StringUtil.{BigIntConverter, isNullOrEmpty, uuid}
import com.github.sharpdata.sharpetl.core.util._

import java.math.BigInteger
Expand Down Expand Up @@ -131,7 +131,7 @@ final case class LogDrivenInterpreter(

Seq(
new JobLog(
jobId = 0, workflowName = workflowName,
jobId = uuid, workflowName = workflowName,
period = period, jobName = jobScheduleId,
dataRangeStart = dataRangeStart, dataRangeEnd = endTimeStr.getOrElse("latest"), // update `dataRangeEnd` in [[BatchKafkaDataSource.read()]]
jobStartTime = nullDataTime, jobEndTime = nullDataTime,
Expand Down Expand Up @@ -159,7 +159,7 @@ final case class LogDrivenInterpreter(
val jobScheduleId = s"$workflowName-$startFrom"
Seq(
new JobLog(
jobId = 0, workflowName = workflowName,
jobId = uuid, workflowName = workflowName,
period = period, jobName = jobScheduleId,
dataRangeStart = startFrom.padding(), dataRangeEnd = endTimeStr.getOrElse("0").padding(),
jobStartTime = nullDataTime, jobEndTime = nullDataTime,
Expand Down Expand Up @@ -284,7 +284,7 @@ final case class LogDrivenInterpreter(
val dataRangeEnd = startTime.plus(idx * execPeriod, ChronoUnit.MINUTES).asBigInt().toString
val jobScheduleId = s"$workflowName-$dataRangeStart"
new JobLog(
jobId = 0, workflowName = workflowName,
jobId = uuid, workflowName = workflowName,
period = execPeriod, jobName = jobScheduleId,
dataRangeStart = dataRangeStart, dataRangeEnd = dataRangeEnd,
jobStartTime = nullDataTime, jobEndTime = nullDataTime,
Expand All @@ -298,11 +298,11 @@ final case class LogDrivenInterpreter(
)
}

private def dependOnUpstreamScheduleJob(upstreamLogId: BigInt, incrementalType: String): JobLog = {
private def dependOnUpstreamScheduleJob(upstreamLogId: String, incrementalType: String): JobLog = {
val dataRangeStart = upstreamLogId.toString()
val jobScheduleId = s"$workflowName-$dataRangeStart"
new JobLog(
jobId = 0, workflowName = workflowName,
jobId = uuid, workflowName = workflowName,
period = period, jobName = jobScheduleId,
dataRangeStart = dataRangeStart, dataRangeEnd = "",
jobStartTime = nullDataTime, jobEndTime = nullDataTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class NotificationUtil(val jobLogAccessor: JobLogAccessor) {
}
}

final case class JobMessage(jobId: Long,
final case class JobMessage(jobId: String,
jobName: String,
jobRangeStart: String,
jobRangeEnd: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait QualityCheck[DataFrame] extends Serializable {
val dataQualityCheckRules: Map[String, QualityCheckRule]
val qualityCheckAccessor: QualityCheckAccessor

def qualityCheck(step: WorkflowStep, jobId: Long, jobScheduleId: String,
def qualityCheck(step: WorkflowStep, jobId: String, jobScheduleId: String,
df: DataFrame): CheckResult[DataFrame] = {
val idColumn = step.source.options.getOrElse("idColumn", "id")
val sortColumn = step.source.options.getOrElse("sortColumn", "")
Expand Down Expand Up @@ -99,7 +99,7 @@ trait QualityCheck[DataFrame] extends Serializable {
.toSeq


def recordCheckResult(jobId: Long, jobScheduleId: String, results: Seq[DataQualityCheckResult]): Unit = {
def recordCheckResult(jobId: String, jobScheduleId: String, results: Seq[DataQualityCheckResult]): Unit = {
results
.filter(it => it.warnCount > 0 || it.errorCount > 0)
.map(it =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.sharpdata.sharpetl.core.repository

import com.github.sharpdata.sharpetl.core.util.ETLConfig
import com.github.sharpdata.sharpetl.core.util.ETLConfig
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import org.apache.ibatis.datasource.DataSourceFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object JobLogAccessor {
case Constants.ETLDatabaseType.MSSQL => new com.github.sharpdata.sharpetl.core.repository.mssql.JobLogAccessor()
case Constants.ETLDatabaseType.H2 => new com.github.sharpdata.sharpetl.core.repository.mysql.JobLogAccessor()
case Constants.ETLDatabaseType.MYSQL => new com.github.sharpdata.sharpetl.core.repository.mysql.JobLogAccessor()
case Constants.ETLDatabaseType.SPARK_SHARP_ETL => new com.github.sharpdata.sharpetl.core.repository.spark.JobLogAccessor()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object QualityCheckAccessor {
case ETLDatabaseType.MSSQL => new com.github.sharpdata.sharpetl.core.repository.mssql.QualityCheckAccessor()
case Constants.ETLDatabaseType.H2 => new com.github.sharpdata.sharpetl.core.repository.mysql.QualityCheckAccessor()
case Constants.ETLDatabaseType.MYSQL => new com.github.sharpdata.sharpetl.core.repository.mysql.QualityCheckAccessor()
case Constants.ETLDatabaseType.SPARK_SHARP_ETL => new com.github.sharpdata.sharpetl.core.repository.spark.QualityCheckAccessor()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object StepLogAccessor {
case ETLDatabaseType.MSSQL => new com.github.sharpdata.sharpetl.core.repository.mssql.StepLogAccessor()
case Constants.ETLDatabaseType.H2 => new com.github.sharpdata.sharpetl.core.repository.mysql.StepLogAccessor()
case Constants.ETLDatabaseType.MYSQL => new com.github.sharpdata.sharpetl.core.repository.mysql.StepLogAccessor()
case Constants.ETLDatabaseType.SPARK_SHARP_ETL => new com.github.sharpdata.sharpetl.core.repository.spark.StepLogAccessor()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ trait JobLogMapper extends Serializable {
))
def isAnotherJobRunning(jobName: String): JobLog

@Insert(Array("insert into sharp_etl.job_log(job_name, [period], workflow_name," +
@Insert(Array("insert into sharp_etl.job_log(job_id, job_name, [period], workflow_name," +
"data_range_start, data_range_end," +
"job_start_time, job_end_time, " +
"status, create_time," +
"last_update_time, file, application_id, project_name, load_type, log_driven_type, runtime_args) values (#{jobName}, #{period}, #{workflowName}, " +
"last_update_time, file, application_id, project_name, load_type, log_driven_type, runtime_args) values " +
"(#{jobId}, #{jobName}, #{period}, #{workflowName}, " +
"#{dataRangeStart}, #{dataRangeEnd}, #{jobStartTime}, #{jobEndTime}, " +
"#{status}, #{createTime}, #{lastUpdateTime}, #{file}, #{applicationId}, #{projectName}, #{loadType}, #{logDrivenType}, #{runtimeArgs})"
))
@Options(useGeneratedKeys = true, keyProperty = "jobId")
//@Options(useGeneratedKeys = true, keyProperty = "jobId")
def createJobLog(jobLog: JobLog): Unit


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,23 @@ trait JobLogMapper extends Serializable {
))
def isAnotherJobRunning(jobName: String): JobLog

@Insert(Array("insert into job_log(job_name, `period`, workflow_name," +
@Insert(Array("insert into job_log(job_id, job_name, `period`, workflow_name," +
"data_range_start, data_range_end," +
"job_start_time, job_end_time, " +
"status, create_time," +
"last_update_time, file, application_id, project_name, load_type, log_driven_type, runtime_args) values (#{workflowName}, #{period}, #{workflowName}, " +
"last_update_time, file, application_id, project_name, load_type, log_driven_type, runtime_args) values " +
"(#{jobId}, #{jobName}, #{period}, #{workflowName}, " +
"#{dataRangeStart}, #{dataRangeEnd}, #{jobStartTime}, #{jobEndTime}, " +
"#{status}, #{createTime}, #{lastUpdateTime}, #{file}, #{applicationId}, #{projectName}, #{loadType}, #{logDrivenType}, #{runtimeArgs})"
))
@Options(useGeneratedKeys = true, keyProperty = "jobId")
//@Options(useGeneratedKeys = true, keyProperty = "jobId")
def createJobLog(jobLog: JobLog): Unit

@Update(Array(
"update job_log set " +
"workflow_name = #{workflowName}, " +
"`period` = #{period}, " +
"job_name = #{workflowName}, " +
"job_name = #{jobName}, " +
"data_range_start = #{dataRangeStart}, " +
"data_range_end = #{dataRangeEnd}, " +
"job_start_time = #{jobStartTime}, " +
Expand Down

0 comments on commit 4776828

Please sign in to comment.