Skip to content

Commit

Permalink
[Server] support agent remote command
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed May 22, 2024
1 parent dff16fa commit 3f38d8a
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/controller/grpc/synchronizer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *service) GetUniversalTagNameMaps(ctx context.Context, in *api.Universal
}

func (s *service) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error {
return nil
return s.vTapEvent.RemoteExecute(in)
}

func (s *service) GetOrgIDs(ctx context.Context, in *api.OrgIDsRequest) (*api.OrgIDsResponse, error) {
Expand Down
277 changes: 277 additions & 0 deletions server/controller/http/router/agent_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package router

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"

"github.com/deepflowio/deepflow/message/trident"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
httpcommon "github.com/deepflowio/deepflow/server/controller/http/common"
. "github.com/deepflowio/deepflow/server/controller/http/router/common"
routercommon "github.com/deepflowio/deepflow/server/controller/http/router/common"
"github.com/deepflowio/deepflow/server/controller/http/service"
"github.com/deepflowio/deepflow/server/controller/model"
"github.com/gin-gonic/gin"
)

type AgentCMD struct{}

func NewAgentCMD() *AgentCMD {
return new(AgentCMD)
}

func (c *AgentCMD) RegisterTo(e *gin.Engine) {
e.GET("/v1/agent/:id/cmd", forwardToServerConnectedByAgent(), getCMDAndNamespaceHandler)
e.GET("/v1/agent/:id/cmd/run", forwardToServerConnectedByAgent(), cmdRunHandler)
}

func forwardToServerConnectedByAgent() gin.HandlerFunc {
return func(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
c.Abort()
return
}
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
db, err := mysql.GetDB(orgID.(int))
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}

var agent *mysql.VTap
if err = db.Where("id = ?", agentID).First(&agent).Error; err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}

log.Infof("weiqiang host: %v, url host: %v", c.Request.Host, c.Request.URL.Host)

var ip string
if net.ParseIP(c.Request.Host) == nil {
ips, err := net.LookupIP(c.Request.Host)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
log.Infof("weiqiang ips: %#v", ips)
if len(ips) == 0 {
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, fmt.Sprintf("net parse ip null, host(%s)", c.Request.Host))
c.Abort()
return
}
b, _ := json.Marshal(ips)
log.Infof("weiqiang ips: %v", string(b))
ip = ips[0].String()
} else {
if !strings.Contains(c.Request.Host, ":") {
ip = c.Request.Host
} else {
hostIP, _, err := net.SplitHostPort(c.Request.Host)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
log.Infof("weiqiang host ip: %v", hostIP)
ip = hostIP
}
}
log.Infof("weiqiang ip: %v", ip)

newHost, newPort, err := getTargetServerURL(db, agent)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
targetURL := fmt.Sprintf("http://%s:%d", newHost, newPort)

if newHost == common.NodeIP {
c.Next()
return
}

log.Infof("weiqiang agent, targetURL(%s), current controller ip(%s), controller ip(%s)", targetURL, agent.CurControllerIP, agent.ControllerIP)

proxyURL, err := url.Parse(targetURL)
if err != nil {
log.Error(err)
routercommon.BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
c.Abort()
return
}
log.Infof("weiqiang host(%s) proxy(%s)", ip, proxyURL)
proxy := httputil.NewSingleHostReverseProxy(proxyURL)

proxy.ServeHTTP(c.Writer, c.Request)
}
}

func getTargetServerURL(db *mysql.DB, agent *mysql.VTap) (string, int, error) {
// url, err := getURLAndVerify(db, agent.CurControllerIP)
// if errors.Is(err, ErrTCPActive) {
// url, err = getURLAndVerify(db, agent.ControllerIP)
// if err != nil {
// return "", fmt.Errorf("controller ip(%s), error: %v", agent.CurControllerIP, err)
// }
// return url, nil
// }
// if err != nil {
// return "", fmt.Errorf("current controller ip(%s), error: %v", agent.CurControllerIP, err)
// }
// return url, nil
newHost, newPort := agent.CurControllerIP, common.GConfig.HTTPNodePort
if agent.CurControllerIP == common.NodeIP {
err := common.IsTCPActive(newHost, newPort)
if err == nil {
return newHost, newPort, nil
}
log.Errorf("%w, %s:%d current controller ip unreachable, err(%s)", ErrTCPActive, newHost, newPort, newHost, err)

newHost = agent.ControllerIP
if err := common.IsTCPActive(newHost, newPort); err != nil {
err := fmt.Errorf("%w, %s:%d controller ip unreachable, err(%s)",
ErrTCPActive, newHost, newPort, newHost, err)
log.Error(err)
return "", 0, err
}
}
return newHost, newPort, nil
}

var (
ErrTCPActive = errors.New("tcp active error")
)

func getURLAndVerify(db *mysql.DB, controllerIP string) (string, error) {
var controller *mysql.Controller
if err := db.Where("ip = ?", controllerIP).First(&controller).Error; err != nil {
return "", err
}

var azControllerConn *mysql.AZControllerConnection
if err := db.Where("controller_ip = ?", controllerIP).First(&azControllerConn).Error; err != nil {
return "", err
}

newHost, newPort := controllerIP, common.GConfig.HTTPNodePort
// local region, use pod ip and port
if controllerIP == common.NodeIP {
newHost = controller.PodIP
newPort = common.GConfig.HTTPPort
}

if err := common.IsTCPActive(newHost, newPort); err != nil {
return "", fmt.Errorf("%w, %s:%d unreachable, err(%s)",
ErrTCPActive, newHost, newPort, newHost, err)
}

return fmt.Sprintf("http://%s:%d", newHost, newPort), nil
}

func getCMDAndNamespaceHandler(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}

orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
data, err := service.GetCMDAndNamespace(orgID.(int), agentID)
JsonResponse(c, data, err)
}

func getAgentID(c *gin.Context) (int, error) {
agentIDStr := c.Param("id")
if agentIDStr == "" {
return 0, errors.New("id can not be empty")
}
agentID, err := strconv.Atoi(agentIDStr)
if err != nil {
return 0, fmt.Errorf("agent id(%s) can not convert to int", agentIDStr)
}
return agentID, nil
}

func cmdRunHandler(c *gin.Context) {
agentID, err := getAgentID(c)
if err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}

req := model.RemoteExecReq{}
if err := c.ShouldBindJSON(&req); err != nil {
BadRequestResponse(c, httpcommon.INVALID_PARAMETERS, err.Error())
return
}
agentReq := trident.RemoteExecRequest{
ExecType: trident.ExecutionType_RUN_COMMAND.Enum(),
CommandId: req.CommandId,
Params: req.Params,
LinuxNsPid: req.LinuxNsPid,
}

orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
content, err := service.RunAgentCMD(orgID.(int), agentID, &agentReq)
if err != nil {
BadRequestResponse(c, httpcommon.SERVER_ERROR, err.Error())
return
}

if req.OutputFormat.String() == trident.OutputFormat_TEXT.String() {
JsonResponse(c, content, nil)
return
}
sendAsFile(c, req.OutputFilename, bytes.NewBuffer([]byte(content)))
}

func sendAsFile(c *gin.Context, fileName string, content *bytes.Buffer) {
c.Writer.Header().Set("Content-Type", "application/octet-stream")
if fileName != "" {
c.Writer.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename*=utf-8''%s", fileName))
}

if _, err := io.Copy(c.Writer, content); err != nil {
c.AbortWithStatus(http.StatusInternalServerError)
log.Error(err)
return
}
}
1 change: 1 addition & 0 deletions server/controller/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *Server) appendRegistrant() []registrant.Registrant {
router.NewMail(),
router.NewPrometheus(),
router.NewDatabase(s.controllerConfig),
router.NewAgentCMD(),
// icon
router.NewIcon(s.controllerConfig),

Expand Down

0 comments on commit 3f38d8a

Please sign in to comment.