Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scheduling flows feature #21

Open
taronien opened this issue Apr 29, 2022 · 4 comments
Open

add scheduling flows feature #21

taronien opened this issue Apr 29, 2022 · 4 comments
Labels
enhancement New feature or request

Comments

@taronien
Copy link

Hello,
Can you add metrics to know if a flow is scheduled or not ?

eg.
azkaban_flow_is_scheduled{flow="flow_1",project="my_project_1"} 1
azkaban_flow_is_scheduled{flow="flow_2",project="my_project_1"} 0

Thanks

@rea1shane rea1shane added the enhancement New feature or request label May 6, 2022
@taronien
Copy link
Author

taronien commented May 6, 2022

Hello,
I'm not a go expert but I have something that works. Here what I've done :
in pkg/api/param.go

type FetchFlowsOfAProjectParam struct {
        ServerUrl   string
        SessionId   string
        ProjectName string
+        ProjectId   int
}

+type FetchScheduleOfAFlowParam struct {
+        ServerUrl    string
+        SessionId    string
+        ProjectName  string
+        ProjectId    int
+        FlowId       string
+}

In pkg/api/request.go

+// FetchScheduleOfAFlow
+func FetchScheduleOfAFlow(p FetchScheduleOfAFlowParam, ctx context.Context) (Schedules, error) {
+        method := "GET"
+        response := Schedules{}
+        url := p.ServerUrl + "/schedule?ajax=fetchSchedule&session.id=" + p.SessionId + "&projectId=" + strconv.Itoa(p.ProjectId) + "&flowId=" + p.FlowId
+        req, err := http.NewRequest(method, url, nil)
+        if err != nil {
+                return Schedules{}, failure.Wrap(err)
+        }
+        err = h.Request(req, ctx, &response)
+        if err != nil {
+                return Schedules{}, err
+        }
+        if response.Error != "" {
+                return Schedules{}, failure.New(RequestError, failure.Context{
+                        "reason": response.Error,
+                })
+        }
+        return response, nil
+}

In pkg/api/response.go

+type Schedules struct {
+        Error     string      `json:"error"`
+        Schedule  Schedule    `json:"schedule"`
+}
+
+type Schedule struct {
+        Error       string      `json:"error"`
+       ScheduleId  string      `json:"scheduleId"`
+}

type Executions struct {
        Total      int         `json:"total"`
        Executions []Execution `json:"executions"`
        Length     int         `json:"length"`
        Project    string      `json:"project"`
        From       int         `json:"from"`
-       ProjectID  int         `json:"projectId"`
+       ProjectId  int         `json:"projectId"`
        Flow       string      `json:"flow"`
        Error      string      `json:"error"`
}

type Execution struct {
        SubmitTime int64  `json:"submitTime"`
        SubmitUser string `json:"submitUser"`
        StartTime  int64  `json:"startTime"`
        EndTime    int64  `json:"endTime"`
        FlowID     string `json:"flowId"`
-       ProjectID  int    `json:"projectId"`
+       ProjectId  int    `json:"projectId"`
        ExecID     int    `json:"execId"`
        Status     string `json:"status"`
}

In pkg/azkaban.go

import (
        "context"
        "github.com/go-kratos/kratos/pkg/sync/errgroup"
        "github.com/rea1shane/azkaban_exporter/pkg/api"
        "gopkg.in/yaml.v2"
        "io/ioutil"
        "sync"
        "time"
+        "strconv"
)


 type projectWithFlows struct {
        projectName string
+       projectId   int
        flowIds     []string
 }


+type schedule struct {
+        projectName string
+        flowID      string
+       scheduleID  int
+}


func (a *Azkaban) getProjectWithFlows(ctx context.Context, ch chan<- projectWithFlows) error {
        err := a.auth(ctx)
        if err != nil {
                return err
        }
        projects, err := api.FetchUserProjects(api.FetchUserProjectsParam{
                ServerUrl: a.Server.url,
                SessionId: a.User.session.sessionId,
        }, ctx)
        if err != nil {
                return err
        }
        group := errgroup.WithCancel(ctx)
        for _, project := range projects {
                p := project
                group.Go(func(ctx context.Context) error {
                        select {
                        case <-ctx.Done():
                                return ctx.Err()
                        default:
                                flows, err := api.FetchFlowsOfAProject(api.FetchFlowsOfAProjectParam{
                                        ServerUrl:   a.Server.url,
                                        SessionId:   a.User.session.sessionId,
                                        ProjectName: p.ProjectName,
+                                        ProjectId:   p.ProjectId,
                                }, ctx)
                                if err != nil {
                                        return err
                                }
                                var ids []string
                                for _, flow := range flows {
                                        ids = append(ids, flow.FlowId)
                                }
                                ch <- projectWithFlows{
                                        projectName: p.ProjectName,
+                                        projectId:   p.ProjectId,
                                        flowIds:     ids,
                                }
                                return nil
                        }
                })
        }
        return group.Wait()
}



+func (a *Azkaban) getSchedules(ctx context.Context, projectName string, projectId int, flowId string, ch chan<- schedule) error {
+        Schedules, err := api.FetchScheduleOfAFlow(api.FetchScheduleOfAFlowParam{
+                ServerUrl:   a.Server.url,
+                SessionId:   a.User.session.sessionId,
+                ProjectName: projectName,
+                ProjectId:   projectId,
+                FlowId:      flowId,
+        }, ctx)
+        if err != nil {
+                return err
+        }
+       var scheduleid, conv_err = strconv.Atoi("0")
+       if (Schedules.Schedule.ScheduleId != ""){
+               scheduleid, conv_err = strconv.Atoi(Schedules.Schedule.ScheduleId)
+       }
+       if conv_err != nil {
+               return nil
+       }
+       ch <- schedule{
+               scheduleID: scheduleid,
+                projectName: projectName,
+                flowID: flowId,
+        }
+        return nil
+}

In pkg/collector.go

@@ -48,6 +48,7 @@ type azkabanCollector struct {
        totalKilled     util.TypedDesc
        lastStatus      util.TypedDesc
        lastDuration    util.TypedDesc
+       isSchedule      util.TypedDesc
 }

@@ -143,6 +144,11 @@ func newAzkabanCollector(namespace string, logger *log.Entry) (basexporter.Colle
                                "Flow last execution duration which finished. (unit: ms)", labelProjectFlow, nil),
                        ValueType: prometheus.GaugeValue,
                },
+               isSchedule: util.TypedDesc{
+                        Desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "is_schedule"),
+                                "Flow scheduling id. O if not schedule", labelProjectFlow, nil),
+                        ValueType: prometheus.GaugeValue,
+                },
        }, nil
 }


@@ -152,6 +158,7 @@ func (c azkabanCollector) Update(ch chan<- prometheus.Metric) error {
 
                projectsWithFlows = make(chan projectWithFlows)
                executions        = make(chan execution)
+               schedules         = make(chan schedule)



@@ -170,6 +177,7 @@ func (c azkabanCollector) Update(ch chan<- prometheus.Metric) error {
                runningDurationRecorder = cmap.New()
                lastStatusRecorder      = cmap.New()
                lastDurationRecorder    = cmap.New()
+               isScheduleRecorder      = cmap.New()


+       group.Go(func(ctx context.Context) error {
+                g := errgroup.WithCancel(ctx)
+                for projectFlows := range projectsWithFlows {
+                        projectName := projectFlows.projectName
+                        projectId := projectFlows.projectId
+                        flowIds := projectFlows.flowIds
+                       isScheduleRecorder.Set(projectName, cmap.New())
+                       g.Go(func(ctx context.Context) error {
+                                select {
+                                case <-ctx.Done():
+                                        return ctx.Err()
+                                default:
+                                        for _, flowId := range flowIds {
+                                                fid := flowId
+                                                g.Go(func(ctx context.Context) error {
+                                                        select {
+                                                        case <-ctx.Done():
+                                                                return ctx.Err()
+                                                        default:
+                                                                return azkaban.getSchedules(ctx, projectName, projectId, fid, schedules)
+                                                        }
+                                                })
+                                        }
+                                        return nil
+                                }
+                        })
+                }
+                err := g.Wait()
+                close(schedules)
+                return err
+        })
+        for sche := range schedules {
+                isScheduleSecondMap, _ := isScheduleRecorder.Get(sche.projectName)
+                isScheduleSecondMap.(cmap.ConcurrentMap).Set(sche.flowID, sche.scheduleID)
+        }


@@ -491,6 +534,11 @@ func (c azkabanCollector) Update(ch chan<- prometheus.Metric) error {
                                        ch <- c.lastStatus.MustNewConstMetric(float64(v.(int)), projectName, flowId)
                                })
                        })
+                       isScheduleRecorder.IterCb(func(projectName string, secondMap interface{}) {
+                                secondMap.(cmap.ConcurrentMap).IterCb(func(flowId string, v interface{}) {
+                                        ch <- c.isSchedule.MustNewConstMetric(float64(v.(int)), projectName, flowId)
+                                })
+                        })
                        return nil

The collector part that I've added is clearly not efficient for me. That's why I didn't propose a merge request.
I attach the entire code. Hoping that it could help you
azkaban.zip

Thanks for this exporter

@rea1shane
Copy link
Owner

Thanks for your code. I'll finish it in the nearest free time.

@yewanwan
Copy link

Hello
I can't get azkaban_flow_last_status metrics from this exporter.
Could you help me.
image

@smiecj
Copy link
Collaborator

smiecj commented Jun 23, 2022

Hello I can't get azkaban_flow_last_status metrics from this exporter. Could you help me. image

Please create a new issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants