From 2ea4545db77bfbfdb35cd63d42a592ad9a2673fc Mon Sep 17 00:00:00 2001 From: zionjxyu Date: Mon, 16 May 2022 16:54:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B0=81=E8=A3=85=E5=AD=97=E7=AC=A6?= =?UTF-8?q?=E4=B8=B2=E7=9B=B8=E5=85=B3=E5=87=BD=E6=95=B0=EF=BC=8Cetcd?= =?UTF-8?q?=E9=80=89=E4=B8=BE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/elect.go | 71 ++++++++++++++++++++++++++++++++++++++++++ constant/constant.go | 16 ++++++++-- strings/string.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 client/elect.go diff --git a/client/elect.go b/client/elect.go new file mode 100644 index 0000000..11db203 --- /dev/null +++ b/client/elect.go @@ -0,0 +1,71 @@ +package client + +import ( + "context" + "fmt" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +// Node 表示参与选举的节点。 +type Node struct { + elect *concurrency.Election + prefix string + session *concurrency.Session + leaderKey string +} + +// New 返回一个新创建的Node对象。 +func New(client *clientv3.Client, service string, leaderKey string, ttl int) (*Node, error) { + if client == nil { + return nil, fmt.Errorf("unexpected client, nil") + } + + prefix := fmt.Sprintf("/m/elect/%s/", service) + session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl)) + if err != nil { + err = fmt.Errorf("new session error, %w", err) + return nil, err + } + + elect := concurrency.NewElection(session, prefix) + return &Node{ + elect: elect, + prefix: prefix, + session: session, + leaderKey: leaderKey, + }, nil +} + +// WaitToBeLeader 阻塞等待成为leader。 +func (n *Node) WaitToBeLeader(timeout time.Duration) error { + ctx := context.Background() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + return n.elect.Campaign(ctx, n.leaderKey) +} + +// ObserverDone 将session断开、过期事件关联上回调事件。例如:leader断开服务器连接了,丢失leadership,则os.Exit(1)。 +func (n *Node) ObserverDone(cb func()) { + ch := n.session.Done() + for { + _, ok := <-ch + if !ok { + break + } + } + + if cb != nil { + cb() + } +} + +// Close 关闭内部session。 +func (n *Node) Close() error { + return n.session.Close() +} diff --git a/constant/constant.go b/constant/constant.go index bde1217..34f2bf7 100644 --- a/constant/constant.go +++ b/constant/constant.go @@ -9,8 +9,20 @@ const ( ) const ( - TrueFlag = 1 // 真值 - FalseFlag = 0 // 假值 + // SepFlag 竖线分割符 + SepFlag = "|" + // Comma 逗号 + Comma = "," + // Hyphen 连字符 + Hyphen = "-" + // Colon 冒号 + Colon = ":" + // CommConfigPath 通用配置文件路径 + CommConfigPath = "comm.yaml" + // TrueFlag 表示真的数值 + TrueFlag = 1 + // FalseFlag 表示假的数值 + FalseFlag = 0 ) const ( diff --git a/strings/string.go b/strings/string.go index 284175b..3baa594 100644 --- a/strings/string.go +++ b/strings/string.go @@ -7,10 +7,13 @@ import ( "encoding/base64" "encoding/hex" "fmt" + "math" "math/big" "math/rand" + "reflect" "strings" "time" + "unsafe" ) // NullStringToString nullstring转换成string @@ -24,6 +27,15 @@ func NullStringToString(nullStrings []sql.NullString) []string { return strings } +// TrimSpaceSlice 将src中元素逐个调用strings.TrimSpace。 +func TrimSpaceSlice(src []string) []string { + dest := make([]string, 0, len(src)) + for _, s := range src { + dest = append(dest, strings.TrimSpace(s)) + } + return dest +} + // SplitString 返回以partition分割的字符串数组 func SplitString(str, partition string) []string { splitStr := strings.Split(str, partition) @@ -83,3 +95,64 @@ func GetUniqueID() string { uniqueID := fmt.Sprintf("%d", seqNo) return uniqueID } + +// IsStringSliceEqual 返回两个字符串列表是否相等。都为nil,返回true。其中之一为nil,返回false。 +// 都不是nil,则长度内容顺序都一致才返回true。 +func IsStringSliceEqual(lh, rh []string) bool { + if lh == nil && rh == nil { + return true + } + + if lh == nil || rh == nil { + return false + } + + if len(lh) != len(rh) { + return false + } + + for k, v := range lh { + if v != rh[k] { + return false + } + } + return true +} + +// SplitSlice 将slice按长度batchSize分成多段 +// batchHandler 返回false,表示退出执行 +func SplitSlice(slice interface{}, batchSize int, batchHandler func(batch interface{}) bool) { + if batchHandler == nil { + return + } + + rv := reflect.ValueOf(slice) + if rv.Kind() != reflect.Slice { + panic("argument not a slice") + } + + blocks := int(math.Ceil(float64(rv.Len()) / float64(batchSize))) + for i := 0; i < blocks; i++ { + begin := i * batchSize + end := begin + batchSize + if end > rv.Len() { + end = rv.Len() + } + + batch := rv.Slice(begin, end) + isContinue := batchHandler(batch.Interface()) + if !isContinue { + break + } + } +} + +// BytesToString copy-free的[]byte转string,但注意使用场景限制,不可滥用。 +func BytesToString(bytes []byte) string { + var s string + sliceHeader := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + stringHeader := (*reflect.StringHeader)(unsafe.Pointer(&s)) + stringHeader.Data = sliceHeader.Data + stringHeader.Len = sliceHeader.Len + return s +}