diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 69ab43d732a..6263dcf8f9e 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -356,6 +356,8 @@ Usage of vttablet: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of _vt.vreplication diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go new file mode 100644 index 00000000000..bcf95becd7d --- /dev/null +++ b/go/vt/vttablet/flags.go @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vttablet + +import ( + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/servenv" +) + +var ( + VReplicationNetReadTimeout = 300 + VReplicationNetWriteTimeout = 600 +) + +func registerFlags(fs *pflag.FlagSet) { + fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") + fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") +} + +func init() { + servenv.OnParseFor("vtcombo", registerFlags) + servenv.OnParseFor("vttablet", registerFlags) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 60c478a078c..8b7e9a9b903 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -22,6 +22,8 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/vttablet" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/discovery" @@ -245,6 +247,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil { return err } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil { + return err + } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil { + return err + } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 560546d7c04..ed75125cb31 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" @@ -122,6 +124,13 @@ func (rs *rowStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names binary", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { + return err + } + return rs.streamQuery(conn, rs.send) }