Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .gitignore file. Add option for choosing protocol version. #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go.mod
go.sum

vendor/*
47 changes: 22 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,33 @@ All dependencies are vendored with [manul](https://github.com/kovetskiy/manul).

The tool supports multiple concurrent clients, configurable message size, etc:
```
> mqtt-bm-latency --help
Usage of ./mqtt-bm-latency:
Usage of mqtt-bm-latency:
-broker string
MQTT broker endpoint as scheme://host:port (default "tcp://localhost:1883")
-topic string
MQTT topic for outgoing messages (default "/test")
-username string
MQTT username (empty if auth disabled)
-password string
MQTT password (empty if auth disabled)
-pubqos int
QoS for published messages (default 1)
-subqos int
QoS for subscribed messages (default 1)
-size int
Size of the messages payload (bytes) (default 100)
MQTT broker endpoint as scheme://host:port (default "tcp://localhost:1883")
-clients int
Number of clients pair to start (default 10)
Number of clients pair to start (default 10)
-count int
Number of messages to send per pubclient (default 100)
-keepalive int
Keep alive period in seconds (default 60)
Number of messages to send per pubclient (default 100)
-format string
Benchmark results output format: text|json (default "text")
Output format: text|json (default "text")
-keepalive int
Keep alive period in seconds (default 60)
-password string
MQTT password (empty if auth disabled)
-protocolver uint
MQTT protocol version: 3 means 3.1, 4 means 3.1.1 (default 3)
-pubqos int
QoS for published messages (default 1)
-quiet
Suppress logs while running




Suppress logs while running (default false)
-size int
Size of the messages payload in bytes (default 100)
-subqos int
QoS for subscribed messages (default 1)
-topic string
MQTT topic for outgoing messages (default "/test")
-username string
MQTT username (empty if auth disabled)
```

Two output formats supported: human-readable plain text and JSON.
Expand Down
93 changes: 48 additions & 45 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package main

import (
"bytes"
"strconv"
"encoding/json"
"flag"
"fmt"
"log"
"strconv"
"time"

"github.com/GaryBoone/GoStats/stats"
Expand All @@ -24,25 +24,25 @@ type Message struct {

// SubResults describes results of a single SUBSCRIBER / run
type SubResults struct {
ID int `json:"id"`
Published int64 `json:"actual_published"`
Received int64 `json:"received"`
FwdRatio float64 `json:"fwd_success_ratio"`
FwdLatencyMin float64 `json:"fwd_time_min"`
FwdLatencyMax float64 `json:"fwd_time_max"`
FwdLatencyMean float64 `json:"fwd_time_mean"`
FwdLatencyStd float64 `json:"fwd_time_std"`
ID int `json:"id"`
Published int64 `json:"actual_published"`
Received int64 `json:"received"`
FwdRatio float64 `json:"fwd_success_ratio"`
FwdLatencyMin float64 `json:"fwd_time_min"`
FwdLatencyMax float64 `json:"fwd_time_max"`
FwdLatencyMean float64 `json:"fwd_time_mean"`
FwdLatencyStd float64 `json:"fwd_time_std"`
}

// TotalSubResults describes results of all SUBSCRIBER / runs
type TotalSubResults struct {
TotalFwdRatio float64 `json:"fwd_success_ratio"`
TotalReceived int64 `json:"successes"`
TotalPublished int64 `json:"actual_total_published"`
FwdLatencyMin float64 `json:"fwd_latency_min"`
FwdLatencyMax float64 `json:"fwd_latency_max"`
FwdLatencyMeanAvg float64 `json:"fwd_latency_mean_avg"`
FwdLatencyMeanStd float64 `json:"fwd_latency_mean_std"`
TotalFwdRatio float64 `json:"fwd_success_ratio"`
TotalReceived int64 `json:"successes"`
TotalPublished int64 `json:"actual_total_published"`
FwdLatencyMin float64 `json:"fwd_latency_min"`
FwdLatencyMax float64 `json:"fwd_latency_max"`
FwdLatencyMeanAvg float64 `json:"fwd_latency_mean_avg"`
FwdLatencyMeanStd float64 `json:"fwd_latency_mean_std"`
}

// PubResults describes results of a single PUBLISHER / run
Expand Down Expand Up @@ -75,8 +75,8 @@ type TotalPubResults struct {

// JSONResults are used to export results as a JSON document
type JSONResults struct {
PubRuns []*PubResults `json:"publish runs"`
SubRuns []*SubResults `json:"subscribe runs"`
PubRuns []*PubResults `json:"publish runs"`
SubRuns []*SubResults `json:"subscribe runs"`
PubTotals *TotalPubResults `json:"publish totals"`
SubTotals *TotalSubResults `json:"receive totals"`
}
Expand All @@ -90,12 +90,13 @@ func main() {
password = flag.String("password", "", "MQTT password (empty if auth disabled)")
pubqos = flag.Int("pubqos", 1, "QoS for published messages")
subqos = flag.Int("subqos", 1, "QoS for subscribed messages")
size = flag.Int("size", 100, "Size of the messages payload (bytes)")
size = flag.Int("size", 100, "Size of the messages payload in bytes")
count = flag.Int("count", 100, "Number of messages to send per pubclient")
clients = flag.Int("clients", 10, "Number of clients pair to start")
keepalive = flag.Int("keepalive", 60, "Keep alive period in seconds")
keepalive = flag.Int("keepalive", 60, "Keep alive period in seconds")
format = flag.String("format", "text", "Output format: text|json")
quiet = flag.Bool("quiet", false, "Suppress logs while running")
quiet = flag.Bool("quiet", false, "Suppress logs while running (default false)")
protocolver = flag.Uint("protocolver", 3, "MQTT protocol version: 3 means 3.1, 4 means 3.1.1")
)

flag.Parse()
Expand All @@ -113,27 +114,28 @@ func main() {
if !*quiet {
log.Printf("Starting subscribe..\n")
}

for i := 0; i < *clients; i++ {
sub := &SubClient{
ID: i,
BrokerURL: *broker,
BrokerUser: *username,
BrokerPass: *password,
SubTopic: *topic + "-" + strconv.Itoa(i),
SubQoS: byte(*subqos),
KeepAlive: *keepalive,
Quiet: *quiet,
ID: i,
BrokerURL: *broker,
BrokerUser: *username,
BrokerPass: *password,
SubTopic: *topic + "-" + strconv.Itoa(i),
SubQoS: byte(*subqos),
KeepAlive: *keepalive,
Quiet: *quiet,
ProtocolVer: *protocolver,
}
go sub.run(subResCh, subDone, jobDone)
}

SUBJOBDONE:
SUBJOBDONE:
for {
select {
case <-subDone:
subCnt++
if subCnt==*clients {
if subCnt == *clients {
if !*quiet {
log.Printf("all subscribe job done.\n")
}
Expand All @@ -150,16 +152,17 @@ func main() {
start := time.Now()
for i := 0; i < *clients; i++ {
c := &PubClient{
ID: i,
BrokerURL: *broker,
BrokerUser: *username,
BrokerPass: *password,
PubTopic: *topic + "-" + strconv.Itoa(i),
MsgSize: *size,
MsgCount: *count,
PubQoS: byte(*pubqos),
KeepAlive: *keepalive,
Quiet: *quiet,
ID: i,
BrokerURL: *broker,
BrokerUser: *username,
BrokerPass: *password,
PubTopic: *topic + "-" + strconv.Itoa(i),
MsgSize: *size,
MsgCount: *count,
PubQoS: byte(*pubqos),
KeepAlive: *keepalive,
Quiet: *quiet,
ProtocolVer: *protocolver,
}
go c.run(pubResCh)
}
Expand All @@ -172,8 +175,8 @@ func main() {
totalTime := time.Now().Sub(start)
pubtotals := calculatePublishResults(pubresults, totalTime)

for i:=0; i<3; i++ {
time.Sleep(1*time.Second)
for i := 0; i < 3; i++ {
time.Sleep(1 * time.Second)
if !*quiet {
log.Printf("Benchmark will stop after %v seconds.\n", 3-i)
}
Expand All @@ -191,7 +194,7 @@ func main() {
}

// collect the sub results
subtotals := calculateSubscribeResults(subresults,pubresults)
subtotals := calculateSubscribeResults(subresults, pubresults)

// print stats
printResults(pubresults, pubtotals, subresults, subtotals, *format)
Expand Down
37 changes: 19 additions & 18 deletions pubclient.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package main

import (
"bytes"
"fmt"
"log"
"time"
"bytes"
"strconv"
)
"time"

import (
"github.com/GaryBoone/GoStats/stats"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

type PubClient struct {
ID int
BrokerURL string
BrokerUser string
BrokerPass string
PubTopic string
MsgSize int
MsgCount int
PubQoS byte
KeepAlive int
Quiet bool
ID int
BrokerURL string
BrokerUser string
BrokerPass string
PubTopic string
MsgSize int
MsgCount int
PubQoS byte
KeepAlive int
Quiet bool
ProtocolVer uint
}

func (c *PubClient) run(res chan *PubResults) {
Expand Down Expand Up @@ -72,8 +72,8 @@ func (c *PubClient) run(res chan *PubResults) {
func (c *PubClient) genMessages(ch chan *Message, done chan bool) {
for i := 0; i < c.MsgCount; i++ {
ch <- &Message{
Topic: c.PubTopic,
QoS: c.PubQoS,
Topic: c.PubTopic,
QoS: c.PubQoS,
//Payload: make([]byte, c.MsgSize),
}
}
Expand Down Expand Up @@ -121,9 +121,10 @@ func (c *PubClient) pubMessages(in, out chan *Message, doneGen, donePub chan boo
SetAutoReconnect(true).
SetOnConnectHandler(onConnected).
SetKeepAlive(ka).
SetProtocolVersion(c.ProtocolVer).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
log.Printf("PUBLISHER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
log.Printf("PUBLISHER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
if c.BrokerUser != "" && c.BrokerPass != "" {
opts.SetUsername(c.BrokerUser)
opts.SetPassword(c.BrokerPass)
Expand Down
57 changes: 29 additions & 28 deletions subclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ package main
import (
"fmt"
"log"
"time"
"strconv"
)
"time"

import (
"github.com/GaryBoone/GoStats/stats"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

type SubClient struct {
ID int
BrokerURL string
BrokerUser string
BrokerPass string
SubTopic string
SubQoS byte
KeepAlive int
Quiet bool
ID int
BrokerURL string
BrokerUser string
BrokerPass string
SubTopic string
SubQoS byte
KeepAlive int
Quiet bool
ProtocolVer uint
}

func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bool,) {
func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bool) {
runResults := new(SubResults)
runResults.ID = c.ID

forwardLatency := []float64{}

ka, _ := time.ParseDuration(strconv.Itoa(c.KeepAlive) + "s")
Expand All @@ -37,22 +37,23 @@ func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bo
SetCleanSession(true).
SetAutoReconnect(true).
SetKeepAlive(ka).
SetProtocolVersion(c.ProtocolVer).
SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
recvTime := time.Now().UnixNano()
payload := msg.Payload()
i := 0
for ; i<len(payload)-3; i++ {
if payload[i]=='#' && payload[i+1]=='@' && payload[i+2]=='#' {
sendTime,_ := strconv.ParseInt(string(payload[:i]), 10, 64)
forwardLatency = append(forwardLatency, float64(recvTime - sendTime)/1000000) // in milliseconds
break
recvTime := time.Now().UnixNano()
payload := msg.Payload()
i := 0
for ; i < len(payload)-3; i++ {
if payload[i] == '#' && payload[i+1] == '@' && payload[i+2] == '#' {
sendTime, _ := strconv.ParseInt(string(payload[:i]), 10, 64)
forwardLatency = append(forwardLatency, float64(recvTime-sendTime)/1000000) // in milliseconds
break
}
}
}
runResults.Received++
}).
runResults.Received++
}).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
log.Printf("SUBSCRIBER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
log.Printf("SUBSCRIBER %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
if c.BrokerUser != "" && c.BrokerPass != "" {
opts.SetUsername(c.BrokerUser)
opts.SetPassword(c.BrokerPass)
Expand All @@ -74,10 +75,10 @@ func (c *SubClient) run(res chan *SubResults, subDone chan bool, jobDone chan bo
}

subDone <- true
//加各项统计
//加各项统计
for {
select {
case <- jobDone:
case <-jobDone:
client.Disconnect(250)
runResults.FwdLatencyMin = stats.StatsMin(forwardLatency)
runResults.FwdLatencyMax = stats.StatsMax(forwardLatency)
Expand Down