diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7c17468 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +go.mod +go.sum + +vendor/* diff --git a/README.md b/README.md index 808d270..a59f21a 100644 --- a/README.md +++ b/README.md @@ -27,36 +27,33 @@ All dependencies are vendored with [manul](https://github.com/kovetskiy/manul). The tool supports multiple concurrent clients, configurable message size, etc: ``` -> mqtt-bm-latency --help -Usage of ./mqtt-bm-latency: +Usage of mqtt-bm-latency: -broker string - MQTT broker endpoint as scheme://host:port (default "tcp://localhost:1883") - -topic string - MQTT topic for outgoing messages (default "/test") - -username string - MQTT username (empty if auth disabled) - -password string - MQTT password (empty if auth disabled) - -pubqos int - QoS for published messages (default 1) - -subqos int - QoS for subscribed messages (default 1) - -size int - Size of the messages payload (bytes) (default 100) + MQTT broker endpoint as scheme://host:port (default "tcp://localhost:1883") -clients int - Number of clients pair to start (default 10) + Number of clients pair to start (default 10) -count int - Number of messages to send per pubclient (default 100) - -keepalive int - Keep alive period in seconds (default 60) + Number of messages to send per pubclient (default 100) -format string - Benchmark results output format: text|json (default "text") + Output format: text|json (default "text") + -keepalive int + Keep alive period in seconds (default 60) + -password string + MQTT password (empty if auth disabled) + -protocolver uint + MQTT protocol version: 3 means 3.1, 4 means 3.1.1 (default 3) + -pubqos int + QoS for published messages (default 1) -quiet - Suppress logs while running - - - - + Suppress logs while running (default false) + -size int + Size of the messages payload in bytes (default 100) + -subqos int + QoS for subscribed messages (default 1) + -topic string + MQTT topic for outgoing messages (default "/test") + -username string + MQTT username (empty if auth disabled) ``` Two output formats supported: human-readable plain text and JSON. diff --git a/main.go b/main.go index b197ef3..dc5bf80 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,11 @@ package main import ( "bytes" - "strconv" "encoding/json" "flag" "fmt" "log" + "strconv" "time" "github.com/GaryBoone/GoStats/stats" @@ -24,25 +24,25 @@ type Message struct { // SubResults describes results of a single SUBSCRIBER / run type SubResults struct { - ID int `json:"id"` - Published int64 `json:"actual_published"` - Received int64 `json:"received"` - FwdRatio float64 `json:"fwd_success_ratio"` - FwdLatencyMin float64 `json:"fwd_time_min"` - FwdLatencyMax float64 `json:"fwd_time_max"` - FwdLatencyMean float64 `json:"fwd_time_mean"` - FwdLatencyStd float64 `json:"fwd_time_std"` + ID int `json:"id"` + Published int64 `json:"actual_published"` + Received int64 `json:"received"` + FwdRatio float64 `json:"fwd_success_ratio"` + FwdLatencyMin float64 `json:"fwd_time_min"` + FwdLatencyMax float64 `json:"fwd_time_max"` + FwdLatencyMean float64 `json:"fwd_time_mean"` + FwdLatencyStd float64 `json:"fwd_time_std"` } // TotalSubResults describes results of all SUBSCRIBER / runs type TotalSubResults struct { - TotalFwdRatio float64 `json:"fwd_success_ratio"` - TotalReceived int64 `json:"successes"` - TotalPublished int64 `json:"actual_total_published"` - FwdLatencyMin float64 `json:"fwd_latency_min"` - FwdLatencyMax float64 `json:"fwd_latency_max"` - FwdLatencyMeanAvg float64 `json:"fwd_latency_mean_avg"` - FwdLatencyMeanStd float64 `json:"fwd_latency_mean_std"` + TotalFwdRatio float64 `json:"fwd_success_ratio"` + TotalReceived int64 `json:"successes"` + TotalPublished int64 `json:"actual_total_published"` + FwdLatencyMin float64 `json:"fwd_latency_min"` + FwdLatencyMax float64 `json:"fwd_latency_max"` + FwdLatencyMeanAvg float64 `json:"fwd_latency_mean_avg"` + FwdLatencyMeanStd float64 `json:"fwd_latency_mean_std"` } // PubResults describes results of a single PUBLISHER / run @@ -75,8 +75,8 @@ type TotalPubResults struct { // JSONResults are used to export results as a JSON document type JSONResults struct { - PubRuns []*PubResults `json:"publish runs"` - SubRuns []*SubResults `json:"subscribe runs"` + PubRuns []*PubResults `json:"publish runs"` + SubRuns []*SubResults `json:"subscribe runs"` PubTotals *TotalPubResults `json:"publish totals"` SubTotals *TotalSubResults `json:"receive totals"` } @@ -90,12 +90,13 @@ func main() { password = flag.String("password", "", "MQTT password (empty if auth disabled)") pubqos = flag.Int("pubqos", 1, "QoS for published messages") subqos = flag.Int("subqos", 1, "QoS for subscribed messages") - size = flag.Int("size", 100, "Size of the messages payload (bytes)") + size = flag.Int("size", 100, "Size of the messages payload in bytes") count = flag.Int("count", 100, "Number of messages to send per pubclient") clients = flag.Int("clients", 10, "Number of clients pair to start") - keepalive = flag.Int("keepalive", 60, "Keep alive period in seconds") + keepalive = flag.Int("keepalive", 60, "Keep alive period in seconds") format = flag.String("format", "text", "Output format: text|json") - quiet = flag.Bool("quiet", false, "Suppress logs while running") + quiet = flag.Bool("quiet", false, "Suppress logs while running (default false)") + protocolver = flag.Uint("protocolver", 3, "MQTT protocol version: 3 means 3.1, 4 means 3.1.1") ) flag.Parse() @@ -113,27 +114,28 @@ func main() { if !*quiet { log.Printf("Starting subscribe..\n") } - + for i := 0; i < *clients; i++ { sub := &SubClient{ - ID: i, - BrokerURL: *broker, - BrokerUser: *username, - BrokerPass: *password, - SubTopic: *topic + "-" + strconv.Itoa(i), - SubQoS: byte(*subqos), - KeepAlive: *keepalive, - Quiet: *quiet, + ID: i, + BrokerURL: *broker, + BrokerUser: *username, + BrokerPass: *password, + SubTopic: *topic + "-" + strconv.Itoa(i), + SubQoS: byte(*subqos), + KeepAlive: *keepalive, + Quiet: *quiet, + ProtocolVer: *protocolver, } go sub.run(subResCh, subDone, jobDone) } - SUBJOBDONE: +SUBJOBDONE: for { select { case <-subDone: subCnt++ - if subCnt==*clients { + if subCnt == *clients { if !*quiet { log.Printf("all subscribe job done.\n") } @@ -150,16 +152,17 @@ func main() { start := time.Now() for i := 0; i < *clients; i++ { c := &PubClient{ - ID: i, - BrokerURL: *broker, - BrokerUser: *username, - BrokerPass: *password, - PubTopic: *topic + "-" + strconv.Itoa(i), - MsgSize: *size, - MsgCount: *count, - PubQoS: byte(*pubqos), - KeepAlive: *keepalive, - Quiet: *quiet, + ID: i, + BrokerURL: *broker, + BrokerUser: *username, + BrokerPass: *password, + PubTopic: *topic + "-" + strconv.Itoa(i), + MsgSize: *size, + MsgCount: *count, + PubQoS: byte(*pubqos), + KeepAlive: *keepalive, + Quiet: *quiet, + ProtocolVer: *protocolver, } go c.run(pubResCh) } @@ -172,8 +175,8 @@ func main() { totalTime := time.Now().Sub(start) pubtotals := calculatePublishResults(pubresults, totalTime) - for i:=0; i<3; i++ { - time.Sleep(1*time.Second) + for i := 0; i < 3; i++ { + time.Sleep(1 * time.Second) if !*quiet { log.Printf("Benchmark will stop after %v seconds.\n", 3-i) } @@ -191,7 +194,7 @@ func main() { } // collect the sub results - subtotals := calculateSubscribeResults(subresults,pubresults) + subtotals := calculateSubscribeResults(subresults, pubresults) // print stats printResults(pubresults, pubtotals, subresults, subtotals, *format) diff --git a/pubclient.go b/pubclient.go index e513dcd..353322d 100644 --- a/pubclient.go +++ b/pubclient.go @@ -1,29 +1,29 @@ package main import ( + "bytes" "fmt" "log" - "time" - "bytes" "strconv" -) + "time" -import ( "github.com/GaryBoone/GoStats/stats" + mqtt "github.com/eclipse/paho.mqtt.golang" ) type PubClient struct { - ID int - BrokerURL string - BrokerUser string - BrokerPass string - PubTopic string - MsgSize int - MsgCount int - PubQoS byte - KeepAlive int - Quiet bool + ID int + BrokerURL string + BrokerUser string + BrokerPass string + PubTopic string + MsgSize int + MsgCount int + PubQoS byte + KeepAlive int + Quiet bool + ProtocolVer uint } func (c *PubClient) run(res chan *PubResults) { @@ -72,8 +72,8 @@ func (c *PubClient) run(res chan *PubResults) { func (c *PubClient) genMessages(ch chan *Message, done chan bool) { for i := 0; i < c.MsgCount; i++ { ch <- &Message{ - Topic: c.PubTopic, - QoS: c.PubQoS, + Topic: c.PubTopic, + QoS: c.PubQoS, //Payload: make([]byte, c.MsgSize), } } @@ -121,9 +121,10 @@ func (c *PubClient) pubMessages(in, out chan *Message, doneGen, donePub chan boo SetAutoReconnect(true). SetOnConnectHandler(onConnected). SetKeepAlive(ka). + SetProtocolVersion(c.ProtocolVer). SetConnectionLostHandler(func(client mqtt.Client, reason error) { - log.Printf("PUBLISHER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error()) - }) + log.Printf("PUBLISHER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error()) + }) if c.BrokerUser != "" && c.BrokerPass != "" { opts.SetUsername(c.BrokerUser) opts.SetPassword(c.BrokerPass) diff --git a/subclient.go b/subclient.go index 280238d..8ed121e 100644 --- a/subclient.go +++ b/subclient.go @@ -3,30 +3,30 @@ package main import ( "fmt" "log" - "time" "strconv" -) + "time" -import ( "github.com/GaryBoone/GoStats/stats" + mqtt "github.com/eclipse/paho.mqtt.golang" ) type SubClient struct { - ID int - BrokerURL string - BrokerUser string - BrokerPass string - SubTopic string - SubQoS byte - KeepAlive int - Quiet bool + ID int + BrokerURL string + BrokerUser string + BrokerPass string + SubTopic string + SubQoS byte + KeepAlive int + Quiet bool + ProtocolVer uint } -func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bool,) { +func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bool) { runResults := new(SubResults) runResults.ID = c.ID - + forwardLatency := []float64{} ka, _ := time.ParseDuration(strconv.Itoa(c.KeepAlive) + "s") @@ -37,22 +37,23 @@ func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bo SetCleanSession(true). SetAutoReconnect(true). SetKeepAlive(ka). + SetProtocolVersion(c.ProtocolVer). SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { - recvTime := time.Now().UnixNano() - payload := msg.Payload() - i := 0 - for ; i