forked from go-zookeeper/zk
-
Notifications
You must be signed in to change notification settings - Fork 6
/
server_java_test.go
171 lines (154 loc) · 5.3 KB
/
server_java_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package zk
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"testing"
)
type ErrMissingServerConfigField string
func (e ErrMissingServerConfigField) Error() string {
return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}
const (
DefaultServerTickTime = 500
DefaultServerInitLimit = 10
DefaultServerSyncLimit = 5
DefaultServerAutoPurgeSnapRetainCount = 3
DefaultPeerPort = 2888
DefaultLeaderElectionPort = 3888
)
type server struct {
stdout, stderr io.Writer
cmdString string
cmdArgs []string
cmdEnv []string
cmd *exec.Cmd
// this cancel will kill the command being run in this case the server itself.
cancelFunc context.CancelFunc
}
func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io.Writer) (*server, error) { // nolint: revive
// allow external systems to configure this zk server bin path.
zkPath := os.Getenv("ZOOKEEPER_BIN_PATH")
if zkPath == "" {
// default to a static reletive path that can be setup with a build system
zkPath = "zookeeper/bin"
}
if _, err := os.Stat(zkPath); err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("zk: could not find testing zookeeper bin path at %q: %v ", zkPath, err)
}
}
// password is 'test'
superString := `SERVER_JVMFLAGS=-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=`
// enable TTL
superString += ` -Dzookeeper.extendedTypesEnabled=true -Dzookeeper.emulate353TTLNodes=true`
return &server{
cmdString: filepath.Join(zkPath, "zkServer.sh"),
cmdArgs: []string{"start-foreground", configPath},
cmdEnv: []string{superString},
stdout: stdout, stderr: stderr,
}, nil
}
func (srv *server) Start() error {
ctx, cancel := context.WithCancel(context.Background())
srv.cancelFunc = cancel
srv.cmd = exec.CommandContext(ctx, srv.cmdString, srv.cmdArgs...)
srv.cmd.Stdout = srv.stdout
srv.cmd.Stderr = srv.stderr
srv.cmd.Env = srv.cmdEnv
return srv.cmd.Start()
}
func (srv *server) Stop() error {
srv.cancelFunc()
return srv.cmd.Wait()
}
type ServerConfigServer struct {
ID int
Host string
PeerPort int
LeaderElectionPort int
}
type ServerConfig struct {
TickTime int // Number of milliseconds of each tick
InitLimit int // Number of ticks that the initial synchronization phase can take
SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
DataDir string // Direcrory where the snapshot is stored
ClientPort int // Port at which clients will connect
AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
Servers []ServerConfigServer
}
func (sc ServerConfig) Marshall(w io.Writer) error {
// the admin server is not wanted in test cases as it slows the startup process and is
// of little unit test value.
fmt.Fprintln(w, "admin.enableServer=false")
if sc.DataDir == "" {
return ErrMissingServerConfigField("dataDir")
}
fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
if sc.TickTime <= 0 {
sc.TickTime = DefaultServerTickTime
}
fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
if sc.InitLimit <= 0 {
sc.InitLimit = DefaultServerInitLimit
}
fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
if sc.SyncLimit <= 0 {
sc.SyncLimit = DefaultServerSyncLimit
}
fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
if sc.ClientPort <= 0 {
sc.ClientPort = DefaultPort
}
fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
if sc.AutoPurgePurgeInterval > 0 {
if sc.AutoPurgeSnapRetainCount <= 0 {
sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
}
fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
}
// enable reconfig.
// TODO: allow setting this
fmt.Fprintln(w, "reconfigEnabled=true")
fmt.Fprintln(w, "4lw.commands.whitelist=*")
if len(sc.Servers) < 2 {
// if we dont have more than 2 servers we just dont specify server list to start in standalone mode
// see https://zookeeper.apache.org/doc/current/zookeeperStarted.html#sc_InstallingSingleMode for more details.
return nil
}
// if we then have more than one server force it to be distributed
fmt.Fprintln(w, "standaloneEnabled=false")
for _, srv := range sc.Servers {
if srv.PeerPort <= 0 {
srv.PeerPort = DefaultPeerPort
}
if srv.LeaderElectionPort <= 0 {
srv.LeaderElectionPort = DefaultLeaderElectionPort
}
fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
}
return nil
}
// this is a helper to wait for the zk connection to at least get to the HasSession state
func waitForSession(ctx context.Context, c *Conn, eventChan <-chan Event) error {
for c.State() != StateHasSession {
select {
case e, ok := <-eventChan:
// The eventChan is used solely to determine when the ZK conn has stopped.
if !ok {
return fmt.Errorf("connection closed before state reached")
}
if e.State == StateHasSession {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}