Skip to content

Commit

Permalink
feat: add update reconciliation for migration job (#45)
Browse files Browse the repository at this point in the history
* chore: add update reconciliation for migration job

* use client update

* back to delete+update

* make it idempotent

* idk

* use hash, fix tests

* final

* oops

* missing files

* use switch case

* add requeue delay
  • Loading branch information
parkedwards authored Aug 29, 2024
1 parent 52de65c commit 221c6bc
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 47 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/PrefectHQ/prefect-operator
go 1.21

require (
dario.cat/mergo v1.0.0
github.com/go-logr/logr v1.4.1
github.com/imdario/mergo v0.3.6
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.34.1
k8s.io/api v0.29.2
Expand All @@ -15,7 +16,6 @@ require (
)

require (
dario.cat/mergo v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -34,7 +34,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
87 changes: 53 additions & 34 deletions internal/controller/prefectserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"time"

"dario.cat/mergo"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -40,6 +41,7 @@ import (
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/conditions"
"github.com/PrefectHQ/prefect-operator/internal/constants"
"github.com/PrefectHQ/prefect-operator/internal/utils"
"github.com/go-logr/logr"
)

Expand Down Expand Up @@ -87,7 +89,9 @@ func (r *PrefectServerReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return *result, err
}

// Reconcile the migration job, if one is required
// Reconcile the migration job, if one is required.
// NOTE: if an active migration is still running,
// this reconciliation will requeue and exit early here.
result, err = r.reconcileMigrationJob(ctx, server, desiredMigrationJob, log)
if result != nil {
return *result, err
Expand Down Expand Up @@ -179,32 +183,36 @@ func (r *PrefectServerReconciler) reconcileMigrationJob(ctx context.Context, ser

foundMigrationJob := &batchv1.Job{}
err = r.Get(ctx, types.NamespacedName{Namespace: server.Namespace, Name: desiredMigrationJob.Name}, foundMigrationJob)
if errors.IsNotFound(err) {

switch {
case errors.IsNotFound(err):
log.Info("Creating migration Job", "name", desiredMigrationJob.Name)
if err = r.Create(ctx, desiredMigrationJob); err != nil {
condition = conditions.NotCreated(objName, err)

return &ctrl.Result{}, err
}

condition = conditions.Created(objName)
} else if err != nil {
condition = conditions.UnknownError(objName, err)

case err != nil:
condition = conditions.UnknownError(objName, err)
return &ctrl.Result{}, err
} else if !metav1.IsControlledBy(foundMigrationJob, server) {

case !metav1.IsControlledBy(foundMigrationJob, server):
errorMessage := fmt.Sprintf(
"%s %s already exists and is not controlled by PrefectServer %s",
"Job", desiredMigrationJob.Name, server.Name,
)

condition = conditions.AlreadyExists(objName, errorMessage)

return &ctrl.Result{}, errors.NewBadRequest(errorMessage)
} else if jobNeedsUpdate(&foundMigrationJob.Spec, &desiredMigrationJob.Spec, log) {
// TODO: handle replacing the job if something has changed

} else {
case !isMigrationJobFinished(foundMigrationJob):
log.Info("Waiting on active migration Job to complete", "name", foundMigrationJob.Name)
condition = conditions.AlreadyExists(objName, fmt.Sprintf("migration Job %s is still active", foundMigrationJob.Name))

// We'll requeue after 20 seconds to check on the migration Job's status
return &ctrl.Result{Requeue: true, RequeueAfter: 20 * time.Second}, nil

default:
if !meta.IsStatusConditionTrue(server.Status.Conditions, "MigrationJobReconciled") {
condition = conditions.Updated(objName)
}
Expand Down Expand Up @@ -377,9 +385,15 @@ func pvcNeedsUpdate(current, desired *corev1.PersistentVolumeClaimSpec, log logr
return false
}

func jobNeedsUpdate(current, desired *batchv1.JobSpec, log logr.Logger) bool {
// TODO: check for changes to the job spec that require an update
return false
func isMigrationJobFinished(foundMigrationJob *batchv1.Job) bool {
switch {
case foundMigrationJob.Status.Succeeded > 0:
return true
case foundMigrationJob.Status.Failed > 0:
return true
default:
return false
}
}

func deploymentNeedsUpdate(current, desired *appsv1.DeploymentSpec, log logr.Logger) bool {
Expand Down Expand Up @@ -627,30 +641,35 @@ func (r *PrefectServerReconciler) postgresDeploymentSpec(server *prefectiov1.Pre
}

func (r *PrefectServerReconciler) postgresMigrationJob(server *prefectiov1.PrefectServer) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: server.Namespace,
Name: server.Name + "-migration",
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: server.ServerLabels(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "prefect-server-migration",
Image: server.Image(),
Command: []string{"prefect", "server", "database", "upgrade", "--yes"},
Env: append(append(server.ToEnvVars(), server.Spec.Postgres.ToEnvVars()...), server.Spec.Settings...),
},
jobSpec := batchv1.JobSpec{
TTLSecondsAfterFinished: ptr.To(int32(7 * 24 * 60 * 60)), // 7 days
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: server.ServerLabels(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "prefect-server-migration",
Image: server.Image(),
Command: []string{"prefect", "server", "database", "upgrade", "--yes"},
Env: append(append(server.ToEnvVars(), server.Spec.Postgres.ToEnvVars()...), server.Spec.Settings...),
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
}

// Generate hash based on the job spec
hashSuffix, _ := utils.Hash(jobSpec, 8) // Use the first 8 characters of the hash
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: server.Namespace,
Name: fmt.Sprintf("%s-migration-%s", server.Name, hashSuffix),
},
Spec: jobSpec,
}
}

func (r *PrefectServerReconciler) prefectServerService(server *prefectiov1.PrefectServer) corev1.Service {
Expand Down
92 changes: 82 additions & 10 deletions internal/controller/prefectserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,14 +1255,18 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

deployment = &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1430,11 +1434,12 @@ var _ = Describe("PrefectServer controller", func() {
})
Expect(err).NotTo(HaveOccurred())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

Expand Down Expand Up @@ -1508,6 +1513,7 @@ var _ = Describe("PrefectServer controller", func() {
})

Context("When updating a server backed by PostgreSQL", func() {
var controllerReconciler *PrefectServerReconciler
BeforeEach(func() {
prefectserver = &prefectiov1.PrefectServer{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1526,7 +1532,7 @@ var _ = Describe("PrefectServer controller", func() {
}
Expect(k8sClient.Create(ctx, prefectserver)).To(Succeed())

controllerReconciler := &PrefectServerReconciler{
controllerReconciler = &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
Expand All @@ -1538,6 +1544,18 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

// Set the first migration Job to be complete
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

prefectserver.Spec.Settings = []corev1.EnvVar{
{Name: "PREFECT_SOME_SETTING", Value: "some-value"},
}
Expand All @@ -1548,6 +1566,18 @@ var _ = Describe("PrefectServer controller", func() {
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())

// Set the second migration Job to be complete
_, _, desiredMigrationJob = controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
})

It("should update the Deployment with the new setting", func() {
Expand All @@ -1567,25 +1597,67 @@ var _ = Describe("PrefectServer controller", func() {
}))
})

It("should create new migration Job with the new setting", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
job := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, job)
}).Should(Succeed())
Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1))
container := job.Spec.Template.Spec.Containers[0]
Expect(container.Env).To(ContainElement(corev1.EnvVar{
Name: "PREFECT_SOME_SETTING",
Value: "some-value",
}))
})

It("should do nothing if an active migration Job already exists", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 0
migrateJob.Status.Failed = 0
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())

migrateJob2 := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob2)
}).Should(Succeed())

Expect(migrateJob2.Generation).To(Equal(migrateJob.Generation))
})

It("should not attempt to update a migration Job that it does not own", func() {
job := &batchv1.Job{}
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, job)).To(Succeed())

job.OwnerReferences = nil
Expect(k8sClient.Update(ctx, job)).To(Succeed())

controllerReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).To(MatchError("Job prefect-on-postgres-migration already exists and is not controlled by PrefectServer prefect-on-postgres"))
Expect(err).To(MatchError(fmt.Sprintf("Job %s already exists and is not controlled by PrefectServer %s", desiredMigrationJob.Name, prefectserver.Name)))
})

It("should not attempt to update a Deployment that it does not own", func() {
Expand Down
20 changes: 20 additions & 0 deletions internal/utils/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package utils

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
)

// Hash returns a hashed string based on an input object,
// which is JSON serialized, and the length of the output.
func Hash(obj interface{}, length int) (string, error) {
data, err := json.Marshal(obj) // Serialize the object to JSON
if err != nil {
return "", err
}

hash := sha256.New()
hash.Write(data)
return hex.EncodeToString(hash.Sum(nil))[:length], nil
}
Loading

0 comments on commit 221c6bc

Please sign in to comment.