Skip to content

Commit

Permalink
topicctl get action to fetch controllerid and clusterid (#176)
Browse files Browse the repository at this point in the history
* topicctl get action to fetch broker controller id

* Minor format fixtures

* get action controllerid, clusterid and tests

* GetControllerID modify generic interface to struct

* version bump
  • Loading branch information
ssingudasu authored Feb 7, 2024
1 parent 368dc0a commit 5bf428a
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 1 deletion.
44 changes: 44 additions & 0 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func init() {
getCmd.AddCommand(
balanceCmd(),
brokersCmd(),
controllerCmd(),
clusterIDCmd(),
configCmd(),
groupsCmd(),
lagsCmd(),
Expand Down Expand Up @@ -134,6 +136,48 @@ func brokersCmd() *cobra.Command {
}
}

func controllerCmd() *cobra.Command {
return &cobra.Command{
Use: "controllerid",
Short: "Displays active controller broker id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetControllerID(ctx, getConfig.full)
},
}
}

func clusterIDCmd() *cobra.Command {
return &cobra.Command{
Use: "clusterid",
Short: "Displays cluster id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetClusterID(ctx, getConfig.full)
},
}
}

func configCmd() *cobra.Command {
return &cobra.Command{
Use: "config [broker or topic]",
Expand Down
13 changes: 13 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) (
return brokerInfos, nil
}

// GetControllerID gets ID of the active controller broker
func (c *BrokerAdminClient) GetControllerID(ctx context.Context) (
int,
error,
) {
metadataResp, err := c.getMetadata(ctx, nil)
if err != nil {
return -1, err
}

return metadataResp.Controller.ID, nil
}

// GetBrokerIDs get the IDs of all brokers in the cluster.
func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) {
resp, err := c.getMetadata(ctx, nil)
Expand Down
31 changes: 31 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@ import (
"github.com/stretchr/testify/require"
)

func TestBrokerClientControllerID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

brokerIDs, err := client.GetBrokerIDs(ctx)
require.NoError(t, err)
assert.Equal(
t,
[]int{1, 2, 3, 4, 5, 6},
brokerIDs,
)

controllerID, err := client.GetControllerID(ctx)
require.NoError(t, err)
assert.Condition(t, func() bool {
return controllerID >= 1 && controllerID <= 6
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
}

func TestBrokerClientGetClusterID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Client interface {
// GetBrokers gets information about all brokers in the cluster.
GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)

// GetControllerID get the active controller broker ID in the cluster.
GetControllerID(ctx context.Context) (int, error)

// GetBrokerIDs get the IDs of all brokers in the cluster.
GetBrokerIDs(ctx context.Context) ([]int, error)

Expand Down
58 changes: 58 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,64 @@ func FormatBrokers(brokers []BrokerInfo, full bool) string {
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatControllerID creates a pretty table for controller broker.
func FormatControllerID(brokerID int) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
headers := []string{"Active Controller"}
table.SetHeader(headers)

table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

table.Append([]string{
fmt.Sprintf("%d", brokerID),
})

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatClusterID creates a pretty table for cluster ID.
func FormatClusterID(clusterID string) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
headers := []string{"Kafka Cluster ID"}
table.SetHeader(headers)

table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

table.Append([]string{
clusterID,
})

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatBrokerReplicas creates a pretty table that shows how many replicas are in each
// position (i.e., leader, second, third) by broker across all topics. Useful for showing
// total-topic balance.
Expand Down
6 changes: 6 additions & 0 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ type zkClusterID struct {
ID string `json:"id"`
}

type zkControllerInfo struct {
Version int `json:"version"`
BrokerID int `json:"brokerid"`
Timestamp string `json:"timestamp"`
}

type zkBrokerInfo struct {
Endpoints []string `json:"endpoints"`
Host string `json:"host"`
Expand Down
23 changes: 23 additions & 0 deletions pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
assignmentPath = "/admin/reassign_partitions"
electionPath = "/admin/preferred_replica_election"
brokersPath = "/brokers/ids"
controllerPath = "/controller"
topicsPath = "/brokers/topics"
clusterIDPath = "/cluster/id"
brokerConfigsPath = "/config/brokers"
Expand Down Expand Up @@ -293,6 +294,28 @@ func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error) {
return brokerIDs, nil
}

// GetControllerID gets ID of the active controller broker
func (c *ZKAdminClient) GetControllerID(
ctx context.Context,
) (int, error) {
zkControllerInfo := zkControllerInfo{}
zkControllerPath := c.zNode(controllerPath)

_, err := c.zkClient.GetJSON(
ctx,
zkControllerPath,
&zkControllerInfo,
)
if err != nil {
return -1, fmt.Errorf("Error getting zookeeper path %s: %+v",
zkControllerPath,
err,
)
}

return zkControllerInfo.BrokerID, nil
}

// GetConnector returns the Connector instance associated with this client.
func (c *ZKAdminClient) GetConnector() *Connector {
return c.Connector
Expand Down
47 changes: 47 additions & 0 deletions pkg/admin/zkclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,53 @@ import (
"github.com/stretchr/testify/require"
)

func TestZkClientControllerID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
5*time.Second,
)
require.NoError(t, err)
require.NotNil(t, zkConn)
defer zkConn.Close()

clusterName := testClusterID("clusterID")
zk.CreateNodes(
t,
zkConn,
[]zk.PathTuple{
{
Path: fmt.Sprintf("/%s", clusterName),
Obj: nil,
},
{
Path: fmt.Sprintf("/%s/controller", clusterName),
Obj: map[string]interface{}{
"version": 1,
"brokerid": 3,
"timestamp": "1589603217000",
},
},
},
)

ctx := context.Background()
adminClient, err := NewZKAdminClient(
ctx,
ZKAdminClientConfig{
ZKAddrs: []string{util.TestZKAddr()},
ZKPrefix: clusterName,
BootstrapAddrs: []string{util.TestKafkaAddr()},
ReadOnly: true,
},
)
require.NoError(t, err)
defer adminClient.Close()

controllerID, err := adminClient.GetControllerID(ctx)
assert.NoError(t, err)
assert.Equal(t, 3, controllerID)
}

func TestZkClientGetClusterID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
Expand Down
28 changes: 28 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ func (c *CLIRunner) GetBrokers(ctx context.Context, full bool) error {
return nil
}

// Get active controller broker ID
func (c *CLIRunner) GetControllerID(ctx context.Context, full bool) error {
c.startSpinner()

brokerID, err := c.adminClient.GetControllerID(ctx)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Broker ID:\n%s", admin.FormatControllerID(brokerID))
return nil
}

// Get cluster ID
func (c *CLIRunner) GetClusterID(ctx context.Context, full bool) error {
c.startSpinner()

clusterID, err := c.adminClient.GetClusterID(ctx)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Cluster ID:\n%s", admin.FormatClusterID(clusterID))
return nil
}

// ApplyTopic does an apply run according to the spec in the argument config.
func (c *CLIRunner) ApplyTopic(
ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package version

// Version is the current topicctl version.
const Version = "1.13.0"
const Version = "1.14.0"

0 comments on commit 5bf428a

Please sign in to comment.