Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add workflow controller #8

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added Dockerfile
Empty file.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
Expand Down Expand Up @@ -64,5 +66,6 @@ require (
golang.org/x/tools v0.13.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.25.4 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
Expand Down Expand Up @@ -265,6 +269,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
Expand Down
19 changes: 12 additions & 7 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,21 @@ type ConfigurationStep struct {
type ConfigurationWorkflowState string

const (
ConfigurationWorkflowStatePending ConfigurationWorkflowState = "pending"
ConfigurationWorkflowStateRunning ConfigurationWorkflowState = "running"
ConfigurationWorkflowStateSuccess ConfigurationWorkflowState = "success"
ConfigurationWorkflowStateFailed ConfigurationWorkflowState = "failed"
ConfigurationWorkflowStatePending ConfigurationWorkflowState = "pending"
ConfigurationWorkflowStateRunning ConfigurationWorkflowState = "running"
ConfigurationWorkflowStateRunningSuccess ConfigurationWorkflowState = "runningSuccess"
ConfigurationWorkflowStateRunningFailed ConfigurationWorkflowState = "runningFailed"
ConfigurationWorkflowStateDeleting ConfigurationWorkflowState = "deleting"
ConfigurationWorkflowStateDeletingFailed ConfigurationWorkflowState = "deletingFailed"
ConfigurationWorkflowStateDeletingSuccess ConfigurationWorkflowState = "Deletingsuccess"
)

type ConfigurationWorkflowStatus struct {
State ConfigurationWorkflowState `json:"state"`
Message string `json:"message"`
Steps []ConfigurationStepStatus `json:"steps"`
State ConfigurationWorkflowState `json:"state"`
Message string `json:"message"`
LocalConfigurationSteps []ConfigurationStepStatus `json:"LocalConfigurationSteps"`
RemoteConfigurationSteps []ConfigurationStepStatus `json:"RemoteConfigurationSteps"`
DNSConfigurationSteps []ConfigurationStepStatus `json:"DNSConfigurationSteps"`
}

type ConfigurationStepStatus struct {
Expand Down
22 changes: 22 additions & 0 deletions pkg/container/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package container

import (
"context"

"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
)

func DeleteContainer(id string) error {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
err = cli.ContainerRemove(context.Background(), id, types.ContainerRemoveOptions{
Force: true,
})
if err != nil {
return err
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/container/get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package container

import (
"context"

"github.com/docker/docker/client"
)

func GetContainer(id string) (string, int, error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return "", -1, err
}
resp, err := cli.ContainerInspect(context.Background(), id)
if err != nil {
return "", -1, err
}

return resp.State.Status, resp.State.ExitCode, nil
}
111 changes: 111 additions & 0 deletions pkg/controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,114 @@
*/

package controller

import (
"errors"
"fmt"
"sync"
"time"

"gorm.io/gorm"
"opennaslab.io/bifrost/pkg/api"
"opennaslab.io/bifrost/pkg/container"
"opennaslab.io/bifrost/pkg/customapi"
"opennaslab.io/bifrost/pkg/database"
)

type WorkflowQueue struct {
Workflow map[string]interface{}
mutex sync.Mutex
}

var WFQueue *WorkflowQueue

func InitWorkflowQueue() {
WFQueue = &WorkflowQueue{
Workflow: make(map[string]interface{}),
mutex: sync.Mutex{},
}
}

func (w *WorkflowQueue) AddWorkflow(name string) {
w.mutex.Lock()
defer w.mutex.Unlock()

w.Workflow[name] = nil
}

func (w *WorkflowQueue) Run() {
for {
w.mutex.Lock()
defer w.mutex.Unlock()

Check failure on line 56 in pkg/controller/workflow.go

View workflow job for this annotation

GitHub Actions / lint

SA5003: defers in this infinite loop will never run (staticcheck)
for name, _ := range w.Workflow {

Check failure on line 57 in pkg/controller/workflow.go

View workflow job for this annotation

GitHub Actions / lint

S1005: unnecessary assignment to the blank identifier (gosimple)
requeue, err := w.Reconcile(name)
if err == nil && !requeue {
delete(w.Workflow, name)
}
}
time.Sleep(time.Second * 3)
}
}

func (w *WorkflowQueue) Reconcile(name string) (requeue bool, err error) {
db := database.GetWorkflowMode()
wf, err := db.GetWorkflow(name)
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
if wf.Status.State == api.ConfigurationWorkflowStateDeleting {
requeue, err := w.DeleteWorkflow(wf)
if !requeue && err != nil {
err := db.DeleteWorkflow(name)
if err != nil {
return false, err
}
}
}
return false, nil
}

func (w *WorkflowQueue) DeleteWorkflow(wf *api.ConfigurationWorkflow) (requeue bool, err error) {
for index, step := range wf.DNSConfigurationSteps {
stepState := wf.Status.DNSConfigurationSteps[index]

if stepState.State == api.ConfigurationWorkflowStateRunning || stepState.State == api.ConfigurationWorkflowStateRunningSuccess {
err := container.DeleteContainer(stepState.ContainerId)
if err != nil {
return false, err
}
}

if stepState.State == api.ConfigurationWorkflowStateDeletingSuccess {
continue
}

if stepState.State == api.ConfigurationWorkflowStateDeleting {
containerState, exitCode, err := container.GetContainer(stepState.ContainerId)
if err != nil {
return false, err
}
if containerState == "exited" && exitCode == 0 {
stepState.State = api.ConfigurationWorkflowStateDeletingSuccess
continue
}
}

stepDef := customapi.GetDNSStepDefinition(step.Use)
if stepDef == nil {
return false, fmt.Errorf("dns step definition %s not found", step.Use)
}
id, err := container.CreateContainer(wf.Name, step.Name, stepDef.Image)
if err != nil {
return false, err
}
stepState.ContainerId = id
stepState.State = api.ConfigurationWorkflowStateDeleting
}

return false, nil
}

func (w *WorkflowQueue) UpdateWorkflow(name string) (*api.ConfigurationWorkflow, error) {
return nil, nil
}
10 changes: 9 additions & 1 deletion pkg/customapi/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var RemoteStepsInfoMap = map[string]StepsInfo{
},
}

var DNSStpesInfoMap = map[string]StepsInfo{}
var DNSStepsInfoMap = map[string]StepsInfo{}

type TypedInterface interface {
Validate() error
Expand Down Expand Up @@ -125,6 +125,14 @@ func GetRemoteStepDefinition(name string) *StepsInfo {
return &ret
}

func GetDNSStepDefinition(name string) *StepsInfo {
if _, ok := DNSStepsInfoMap[name]; !ok {
return nil
}
ret := DNSStepsInfoMap[name]
return &ret
}

func ListLocalStepDefinitions() []StepsInfo {
ret := []StepsInfo{}
for name := range LocalStepsInfoMap {
Expand Down
17 changes: 0 additions & 17 deletions pkg/database/action.go

This file was deleted.

17 changes: 0 additions & 17 deletions pkg/database/config.go

This file was deleted.

Loading
Loading