Skip to content

Commit

Permalink
setpipeline supports detach mode
Browse files Browse the repository at this point in the history
Signed-off-by: Evan <[email protected]>
  • Loading branch information
evanchaoli committed May 23, 2022
1 parent 52cb197 commit b533285
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 38 deletions.
1 change: 1 addition & 0 deletions atc/builds/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (visitor *planVisitor) VisitSetPipeline(step *atc.SetPipelineStep) error {
Vars: step.Vars,
VarFiles: step.VarFiles,
InstanceVars: step.InstanceVars,
Detach: step.Detach,
})

return nil
Expand Down
12 changes: 10 additions & 2 deletions atc/db/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ type Build interface {
config atc.Config,
from ConfigVersion,
initiallyPaused bool,
detach bool,
) (Pipeline, bool, error)

ResourceCacheUser() ResourceCacheUser
Expand Down Expand Up @@ -1799,6 +1800,7 @@ func (b *build) SavePipeline(
config atc.Config,
from ConfigVersion,
initiallyPaused bool,
detach bool,
) (Pipeline, bool, error) {
tx, err := b.conn.Begin()
if err != nil {
Expand All @@ -1807,8 +1809,14 @@ func (b *build) SavePipeline(

defer Rollback(tx)

jobID := newNullInt64(b.jobID)
buildID := newNullInt64(b.id)
var jobID, buildID sql.NullInt64
if detach {
jobID = sql.NullInt64{Valid: false}
buildID = sql.NullInt64{Valid: false}
} else {
jobID = newNullInt64(b.jobID)
buildID = newNullInt64(b.id)
}
pipelineID, isNewPipeline, err := savePipeline(tx, pipelineRef, config, from, initiallyPaused, teamID, jobID, buildID)
if err != nil {
return nil, false, err
Expand Down
2 changes: 1 addition & 1 deletion atc/db/build_in_memory_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (b *inMemoryCheckBuild) Start(atc.Plan) (bool, error) {
func (b *inMemoryCheckBuild) ResourcesChecked() (bool, error) {
return false, errors.New("not implemented for in memory build")
}
func (b *inMemoryCheckBuild) SavePipeline(atc.PipelineRef, int, atc.Config, ConfigVersion, bool) (Pipeline, bool, error) {
func (b *inMemoryCheckBuild) SavePipeline(atc.PipelineRef, int, atc.Config, ConfigVersion, bool, bool) (Pipeline, bool, error) {
return nil, false, errors.New("not implemented for in memory build")
}
func (b *inMemoryCheckBuild) AdoptInputsAndPipes() ([]BuildInput, bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions atc/db/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ var _ = Describe("Build", func() {
BeforeEach(func() {
By("creating a child pipeline")
build, _ := defaultJob.CreateBuild(defaultBuildCreatedBy)
childPipeline, _, _ = build.SavePipeline(atc.PipelineRef{Name: "child1-pipeline"}, defaultTeam.ID(), defaultPipelineConfig, db.ConfigVersion(0), false)
childPipeline, _, _ = build.SavePipeline(atc.PipelineRef{Name: "child1-pipeline"}, defaultTeam.ID(), defaultPipelineConfig, db.ConfigVersion(0), false, false)
build.Finish(db.BuildStatusSucceeded)

childPipeline.Reload()
Expand All @@ -831,7 +831,7 @@ var _ = Describe("Build", func() {
for i := 0; i < 5; i++ {
job, _, _ := childPipeline.Job("some-job")
build, _ := job.CreateBuild(defaultBuildCreatedBy)
childPipeline, _, _ = build.SavePipeline(atc.PipelineRef{Name: "child-pipeline-" + strconv.Itoa(i)}, defaultTeam.ID(), defaultPipelineConfig, db.ConfigVersion(0), false)
childPipeline, _, _ = build.SavePipeline(atc.PipelineRef{Name: "child-pipeline-" + strconv.Itoa(i)}, defaultTeam.ID(), defaultPipelineConfig, db.ConfigVersion(0), false, false)
build.Finish(db.BuildStatusSucceeded)
childPipelines = append(childPipelines, childPipeline)
}
Expand Down Expand Up @@ -2393,7 +2393,7 @@ var _ = Describe("Build", func() {
},
},
},
}, db.ConfigVersion(0), false)
}, db.ConfigVersion(0), false, false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(build.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(build.ID()))
Expand Down Expand Up @@ -2431,7 +2431,7 @@ var _ = Describe("Build", func() {
},
},
},
}, db.ConfigVersion(0), false)
}, db.ConfigVersion(0), false, false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(buildTwo.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(buildTwo.ID()))
Expand Down Expand Up @@ -2461,7 +2461,7 @@ var _ = Describe("Build", func() {
},
},
},
}, pipeline.ConfigVersion(), false)
}, pipeline.ConfigVersion(), false, false)
Expect(err).To(Equal(db.ErrSetByNewerBuild))
})

Expand All @@ -2472,7 +2472,7 @@ var _ = Describe("Build", func() {
Expect(err).ToNot(HaveOccurred())

By("re-saving the default pipeline with the build")
pipeline, _, err := build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, db.ConfigVersion(1), false)
pipeline, _, err := build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, db.ConfigVersion(1), false, false)
Expect(err).ToNot(HaveOccurred())
Expect(pipeline.ParentJobID()).To(Equal(build.JobID()))
Expect(pipeline.ParentBuildID()).To(Equal(build.ID()))
Expand All @@ -2493,7 +2493,7 @@ var _ = Describe("Build", func() {
By("setting the pipeline again via a build")
build, err := defaultJob.CreateBuild(defaultBuildCreatedBy)
Expect(err).ToNot(HaveOccurred())
pipeline, _, err = build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, pipeline.ConfigVersion(), false)
pipeline, _, err = build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, pipeline.ConfigVersion(), false, false)
Expect(err).ToNot(HaveOccurred())

Expect(pipeline.Paused()).To(BeFalse())
Expand All @@ -2513,7 +2513,7 @@ var _ = Describe("Build", func() {
By("setting the pipeline again via a build")
build, err := defaultJob.CreateBuild(defaultBuildCreatedBy)
Expect(err).ToNot(HaveOccurred())
pipeline, _, err = build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, pipeline.ConfigVersion(), false)
pipeline, _, err = build.SavePipeline(defaultPipelineRef, build.TeamID(), defaultPipelineConfig, pipeline.ConfigVersion(), false, false)
Expect(err).ToNot(HaveOccurred())

Expect(pipeline.Paused()).To(BeTrue())
Expand Down
18 changes: 10 additions & 8 deletions atc/db/dbfakes/fake_build.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions atc/db/dbfakes/fake_pipeline.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions atc/db/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Pipeline interface {
Variables(lager.Logger, creds.Secrets, creds.VarSourcePool) (vars.Variables, error)

SetParentIDs(jobID, buildID int) error
DetachParent() error
}

type pipeline struct {
Expand Down Expand Up @@ -1168,6 +1169,39 @@ func (p *pipeline) Variables(logger lager.Logger, globalSecrets creds.Secrets, v
return allVars, nil
}

func (p *pipeline) DetachParent() error {
tx, err := p.conn.Begin()
if err != nil {
return err
}

defer Rollback(tx)

nullID := sql.NullInt64{Valid: false}
result, err := psql.Update("pipelines").
Set("parent_job_id", nullID).
Set("parent_build_id", nullID).
Where(sq.Eq{
"id": p.id,
}).
RunWith(tx).
Exec()

if err != nil {
return err
}

rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return ErrSetByNewerBuild
}

return tx.Commit()
}

func (p *pipeline) SetParentIDs(jobID, buildID int) error {
if jobID <= 0 || buildID <= 0 {
return errors.New("job and build id cannot be negative or zero-value")
Expand Down
11 changes: 9 additions & 2 deletions atc/exec/set_pipeline_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (step *SetPipelineStep) run(ctx context.Context, state RunState, delegate S
}

var team db.Team
var detach bool
if step.plan.Team == "" {
team = step.teamFactory.GetByID(step.metadata.TeamID)
} else {
Expand Down Expand Up @@ -163,6 +164,7 @@ func (step *SetPipelineStep) run(ctx context.Context, state RunState, delegate S
}
if currentTeam.Admin() {
permitted = true
detach = step.plan.Detach
}
if !permitted {
return false, fmt.Errorf(
Expand Down Expand Up @@ -202,7 +204,12 @@ func (step *SetPipelineStep) run(ctx context.Context, state RunState, delegate S
fmt.Fprintf(stdout, "no changes to apply.\n")

if found {
err := pipeline.SetParentIDs(step.metadata.JobID, step.metadata.BuildID)
var err error
if detach {
err = pipeline.DetachParent()
} else {
err = pipeline.SetParentIDs(step.metadata.JobID, step.metadata.BuildID)
}
if err != nil {
return false, err
}
Expand Down Expand Up @@ -230,7 +237,7 @@ func (step *SetPipelineStep) run(ctx context.Context, state RunState, delegate S
return false, fmt.Errorf("set_pipeline step not attached to a buildID")
}

pipeline, _, err = parentBuild.SavePipeline(pipelineRef, team.ID(), atcConfig, fromVersion, false)
pipeline, _, err = parentBuild.SavePipeline(pipelineRef, team.ID(), atcConfig, fromVersion, false, detach)
if err != nil {
if err == db.ErrSetByNewerBuild {
fmt.Fprintln(stderr, "\x1b[1;33mWARNING: the pipeline was not saved because it was already saved by a newer build\x1b[0m")
Expand Down
Loading

0 comments on commit b533285

Please sign in to comment.