Skip to content

Commit

Permalink
Added logic to create configuration (yaml) from user input
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Oct 26, 2024
1 parent 902ba0b commit 11ca912
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 99 deletions.
48 changes: 48 additions & 0 deletions sink-connector-client/config.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: "company-1"
topic.prefix: "sink-connector-1"
database.hostname: "mysql-master"
database.port: "3306"
database.user: "root"
database.password: "root"
database.server.id: "12345"
database.server.name: "ER54"
database.include.list: test
table.include.list: ""
clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
clickhouse.server.password: "root"
clickhouse.server.port: "8123"
database.allowPublicKeyRetrieval: "true"
snapshot.mode: "initial"
offset.flush.interval.ms: 5000
connector.class: "io.debezium.connector.mysql.MySqlConnector"
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
offset.storage.jdbc.user: "root"
offset.storage.jdbc.password: "root"
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"
offset.storage.jdbc.offset.table.delete: "select * from %s"
offset.storage.jdbc.offset.table.select: "SELECT id, offset_key, offset_val FROM %s FINAL ORDER BY record_insert_ts, record_insert_seq"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
schema.history.internal.jdbc.user: "root"
schema.history.internal.jdbc.password: "root"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
enable.snapshot.ddl: "true"
persist.raw.bytes: "false"
auto.create.tables: "true"
auto.create.tables.replicated: "true"
database.connectionTimeZone: "UTC"
clickhouse.database.override.map: "employees:employees2, products:productsnew
8 changes: 4 additions & 4 deletions sink-connector-client/create_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func validateMySQL(sourceUsername string, sourcePassword string, sourceHost stri
// if log_bin is not enabled, then return false
// check if rows has response 'OFF'
// if it is 'OFF' then return false
if rows == 'OFF' {
log.fatal("Binlogs are not enabled")
return false
}
// if rows == 'OFF' {
// log.fatal("Binlogs are not enabled")
// return false
// }

if err != nil {
log.Fatal(err)
Expand Down
5 changes: 1 addition & 4 deletions sink-connector-client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/testcontainers/testcontainers-go v0.32.0
github.com/tidwall/pretty v1.2.1
github.com/urfave/cli v1.22.13
gopkg.in/yaml.v2 v2.4.0
//testcontainers
)

Expand Down Expand Up @@ -38,10 +39,6 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
Expand Down
5 changes: 1 addition & 4 deletions sink-connector-client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
Expand Down Expand Up @@ -58,8 +56,6 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
Expand Down Expand Up @@ -210,6 +206,7 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
193 changes: 106 additions & 87 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package main
import (
"encoding/json"
"fmt"
"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
"log"
"os"
"time"

"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
)

var requestOptions = &grequests.RequestOptions{}
Expand All @@ -28,24 +29,26 @@ type UpdateLsn struct {
}

const (
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
DELETE_SCHEMA_HISTORY_COMMAND = "delete_schema_history"
CREATE_CONFIG_COMMAND = "create_config"
)

const (
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
RESTART_REPLICATION = "restart"
STATUS = "status"
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
START_REPLICATION = "start"
STOP_REPLICATION = "stop"
RESTART_REPLICATION = "restart"
STATUS = "status"
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
DELETE_SCHEMA_HISTORY = "schema-history"
CREATE_CONFIG = "create_config"
)

// Fetches the repos for the given Github users
Expand All @@ -59,14 +62,16 @@ func getHTTPCall(url string) *grequests.Response {
}

func getHTTPDeleteCall(url string) *grequests.Response {
resp, err := grequests.Delete(url, requestOptions)
// you can modify the request by passing an optional RequestOptions struct
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
resp, err := grequests.Delete(url, requestOptions)
// you can modify the request by passing an optional RequestOptions struct
if err != nil {
log.Fatalln("Unable to make request: ", err)
}
return resp
}
/**

/*
*
Function to get server url based on the parameters passed
*/
func getServerUrl(action string, c *cli.Context) string {
Expand Down Expand Up @@ -228,77 +233,90 @@ func main() {
return nil
},
},
{
Name: DELETE_OFFSETS_COMMAND,
Usage: "Delete offsets from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteOffsets(c)
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
}
{
Name: DELETE_OFFSETS_COMMAND,
Usage: "Delete offsets from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteOffsets(c)
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
{
Name: CREATE_CONFIG_COMMAND,
Usage: "Create a config file",
Action: func(c *cli.Context) error {
handleCreateConfig(c)
return nil
},
},
}
app.Version = "1.0"
app.Run(os.Args)
}

func handleCreateConfig(c *cli.Context) bool {
readConfig()
return true
}

func handleDeleteOffsets(c *cli.Context) bool {
log.Println("***** Delete offsets from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Offsets deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting offsets")
return false
}
log.Println("***** Delete offsets from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_OFFSETS, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Offsets deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting offsets")
return false
}
}

func handleDeleteSchemaHistory(c *cli.Context) bool {
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
}

func handleUpdateLsn(c *cli.Context) bool {
Expand Down Expand Up @@ -343,7 +361,8 @@ func handleUpdateLsn(c *cli.Context) bool {
return true
}

/**
/*
*
Function to handle update binlog action
which is used to set binlog file/position and gtids
*/
Expand Down
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
Loading

0 comments on commit 11ca912

Please sign in to comment.