Skip to content

Commit

Permalink
[Server] support agent remote command
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed May 22, 2024
1 parent dff16fa commit de1ef23
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/controller/grpc/synchronizer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *service) GetUniversalTagNameMaps(ctx context.Context, in *api.Universal
}

func (s *service) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error {
return nil
return s.vTapEvent.RemoteExecute(in)
}

func (s *service) GetOrgIDs(ctx context.Context, in *api.OrgIDsRequest) (*api.OrgIDsResponse, error) {
Expand Down
216 changes: 216 additions & 0 deletions server/controller/http/router/agent_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package router

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strconv"

"github.com/deepflowio/deepflow/message/trident"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
httpcommon "github.com/deepflowio/deepflow/server/controller/http/common"
. "github.com/deepflowio/deepflow/server/controller/http/router/common"
routercommon "github.com/deepflowio/deepflow/server/controller/http/router/common"
"github.com/deepflowio/deepflow/server/controller/http/service"
"github.com/deepflowio/deepflow/server/controller/model"
"github.com/gin-gonic/gin"
)

type AgentCMD struct{}

func NewAgentCMD() *AgentCMD {
return new(AgentCMD)
}

func (c *AgentCMD) RegisterTo(e *gin.Engine) {
e.GET("/v1/agent/:id/cmd", forwardToServerConnectedByAgent(), getCMDAndNamespaceHandler)
e.GET("/v1/agent/:id/cmd/run", forwardToServerConnectedByAgent(), cmdRunHandler)
}

func forwardToServerConnectedByAgent() gin.HandlerFunc {
return func(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
c.Abort()
return
}
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
db, err := mysql.GetDB(orgID.(int))
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}

var agent *mysql.VTap
if err = db.Where("id = ?", agentID).First(&agent).Error; err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
log.Infof("weiqiang url host: %v", c.Request.Host)

if c.Request.Host == common.NodeIP || c.Request.Host == common.PodIP {
c.Next()
return
}
targetURL, err := getTargetServerURL(db, agent)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}

proxyURL, err := url.Parse(targetURL)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
proxy := httputil.NewSingleHostReverseProxy(proxyURL)

proxy.ServeHTTP(c.Writer, c.Request)
}
}

func getTargetServerURL(db *mysql.DB, agent *mysql.VTap) (string, error) {
url, err := getURLAndVerify(db, agent.CurControllerIP)
if errors.Is(err, ErrTCPActive) {
url, err = getURLAndVerify(db, agent.ControllerIP)
if err != nil {
return "", fmt.Errorf("controller ip(%s), error: %v", agent.CurControllerIP, err)
}
return url, nil
}
if err != nil {
return "", fmt.Errorf("current controller ip(%s), error: %v", agent.CurControllerIP, err)
}
return url, nil
}

var (
ErrTCPActive = errors.New("tcp active error")
)

func getURLAndVerify(db *mysql.DB, controllerIP string) (string, error) {
var controller *mysql.Controller
if err := db.Where("ip = ?", controllerIP).First(&controller).Error; err != nil {
return "", err
}

var azControllerConn *mysql.AZControllerConnection
if err := db.Where("controller_ip = ?", controllerIP).First(&azControllerConn).Error; err != nil {
return "", err
}

newHost, newPort := controllerIP, common.GConfig.HTTPNodePort
// local region, use pod ip and port
if controllerIP == common.NodeIP {
newHost = controller.PodIP
newPort = common.GConfig.HTTPPort
}

if err := common.IsTCPActive(newHost, newPort); err != nil {
return "", fmt.Errorf("%w, %s:%d unreachable, err(%s)",
ErrTCPActive, newHost, newPort, newHost, err)
}

return fmt.Sprintf("http://%s:%d", newHost, newPort), nil
}

func getCMDAndNamespaceHandler(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}

orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
data, err := service.GetCMDAndNamespace(orgID.(int), agentID)
JsonResponse(c, data, err)
}

func getAgentID(c *gin.Context) (int, error) {
agentIDStr := c.Param("id")
if agentIDStr == "" {
return 0, errors.New("id can not be empty")
}
agentID, err := strconv.Atoi(agentIDStr)
if err != nil {
return 0, fmt.Errorf("agent id(%s) can not convert to int", agentIDStr)
}
return agentID, nil
}

func cmdRunHandler(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}

req := model.RemoteExecReq{}
if err := c.ShouldBindJSON(&req); err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}
agentReq := trident.RemoteExecRequest{
ExecType: trident.ExecutionType_RUN_COMMAND.Enum(),
CommandId: req.CommandId,
Params: req.Params,
LinuxNsPid: req.LinuxNsPid,
}

orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
content, err := service.RunAgentCMD(orgID.(int), agentID, &agentReq)
if err != nil {
BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
return
}

if req.OutputFormat.String() == trident.OutputFormat_TEXT.String() {
JsonResponse(c, content, nil)
return
}
sendAsFile(c, req.OutputFilename, bytes.NewBuffer([]byte(content)))
}

func sendAsFile(c *gin.Context, fileName string, content *bytes.Buffer) {
c.Writer.Header().Set("Content-Type", "application/octet-stream")
if fileName != "" {
c.Writer.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename*=utf-8''%s", fileName))
}

if _, err := io.Copy(c.Writer, content); err != nil {
c.AbortWithStatus(http.StatusInternalServerError)
log.Error(err)
return
}
}
1 change: 1 addition & 0 deletions server/controller/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *Server) appendRegistrant() []registrant.Registrant {
router.NewMail(),
router.NewPrometheus(),
router.NewDatabase(s.controllerConfig),
router.NewAgentCMD(),
// icon
router.NewIcon(s.controllerConfig),

Expand Down
152 changes: 152 additions & 0 deletions server/controller/http/service/agent_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
"encoding/json"
"fmt"
"sync"

"github.com/deepflowio/deepflow/message/trident"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/model"
)

var (
AgentRemoteExecMap = make(map[string]*CMDManager)
)

func AddSteamToManager(key string, stream trident.Synchronizer_RemoteExecuteServer) *CMDManager {
m := &CMDManager{
ExecCH: make(chan *trident.RemoteExecRequest, 1),
ExecDoneCH: make(chan struct{}, 1),
RemoteCMDDoneCH: make(chan struct{}, 1),
LinuxNamespaceDoneCH: make(chan struct{}, 1),

resp: &model.RemoteExecResp{},
stream: stream,
}
AgentRemoteExecMap[key] = m
return m
}

type CMDManager struct {
mu sync.RWMutex

ExecCH chan *trident.RemoteExecRequest
ExecDoneCH chan struct{}
RemoteCMDDoneCH chan struct{}
LinuxNamespaceDoneCH chan struct{}

resp *model.RemoteExecResp
stream trident.Synchronizer_RemoteExecuteServer
}

func (m *CMDManager) InitResp() {
m.mu.Lock()
defer m.mu.Unlock()
m.resp = &model.RemoteExecResp{}
}

func (m *CMDManager) AppendCommands(data []*trident.RemoteCommand) {
m.mu.Lock()
defer m.mu.Unlock()
m.resp.RemoteCommand = append(m.resp.RemoteCommand, data...)
}

func (m *CMDManager) AppendNamespaces(data []*trident.LinuxNamespace) {
m.mu.Lock()
defer m.mu.Unlock()
m.resp.LinuxNamespace = append(m.resp.LinuxNamespace, data...)
}

func (m *CMDManager) AppendContent(data []byte) {
m.mu.Lock()
defer m.mu.Unlock()
m.resp.Content += string(data)
}

func (m *CMDManager) AppendErr(data *string) {
m.mu.Lock()
defer m.mu.Unlock()
m.resp.Content += *data
}

func GetCMDAndNamespace(orgID, agentID int) (*model.RemoteExecResp, error) {
log.Infof("agent(id: %d) get remote commands and linux namespaces", agentID)
dbInfo, err := mysql.GetDB(orgID)
if err != nil {
return nil, err
}
var agent *mysql.VTap
// TODO(weiqiang): add team filter
if err := dbInfo.Where("id = ?", agentID).Find(&agent).Error; err != nil {
return nil, err
}

key := agent.CtrlIP + "-" + agent.CtrlMac
manager, ok := AgentRemoteExecMap[key]
if !ok {
return nil, fmt.Errorf("agent(name: %s, key: %s) remote exec map not found", agent.Name, key)
}
cmdReq := &trident.RemoteExecRequest{ExecType: trident.ExecutionType_LIST_COMMAND.Enum()}
manager.ExecCH <- cmdReq
namespaceReq := &trident.RemoteExecRequest{ExecType: trident.ExecutionType_LIST_NAMESPACE.Enum()}
manager.ExecCH <- namespaceReq

resp := &model.RemoteExecResp{}
select {
case <-manager.RemoteCMDDoneCH:
resp.RemoteCommand = manager.resp.RemoteCommand
case <-manager.LinuxNamespaceDoneCH:
resp.LinuxNamespace = manager.resp.LinuxNamespace
default:
if resp.RemoteCommand != nil && resp.LinuxNamespace != nil {
manager.InitResp()
break
}
}
return resp, nil
}

func RunAgentCMD(orgID, agentID int, req *trident.RemoteExecRequest) (string, error) {
b, _ := json.Marshal(req)
log.Infof("agent(id: %d) run remote command, request: %s", agentID, string(b))
dbInfo, err := mysql.GetDB(orgID)
if err != nil {
return "", err
}
var agent *mysql.VTap
// TODO(weiqiang): add team filter
if err := dbInfo.Where("id = ?", agentID).Find(&agent).Error; err != nil {
return "", err
}
key := agent.CtrlIP + "-" + agent.CtrlMac
manager, ok := AgentRemoteExecMap[key]
if !ok {
return "", fmt.Errorf("agent(name: %s, key: %s) remote exec map not found", agent.Name, key)
}

content := ""
select {
case <-manager.ExecDoneCH:
content = manager.resp.Content
manager.InitResp()
break
}
return content, nil
}

0 comments on commit de1ef23

Please sign in to comment.