Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update reconciliation for migration job #45

Merged
merged 11 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/PrefectHQ/prefect-operator
go 1.21

require (
dario.cat/mergo v1.0.0
github.com/go-logr/logr v1.4.1
github.com/imdario/mergo v0.3.6
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.34.1
k8s.io/api v0.29.2
Expand All @@ -15,7 +16,6 @@ require (
)

require (
dario.cat/mergo v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -34,7 +34,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
67 changes: 42 additions & 25 deletions internal/controller/prefectserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/conditions"
"github.com/PrefectHQ/prefect-operator/internal/constants"
"github.com/PrefectHQ/prefect-operator/internal/utils"
"github.com/go-logr/logr"
)

Expand Down Expand Up @@ -87,7 +88,9 @@ func (r *PrefectServerReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return *result, err
}

// Reconcile the migration job, if one is required
// Reconcile the migration job, if one is required.
// NOTE: if an active migration is still running,
// this reconciliation will requeue and exit early here.
result, err = r.reconcileMigrationJob(ctx, server, desiredMigrationJob, log)
if result != nil {
return *result, err
Expand Down Expand Up @@ -179,6 +182,7 @@ func (r *PrefectServerReconciler) reconcileMigrationJob(ctx context.Context, ser

foundMigrationJob := &batchv1.Job{}
err = r.Get(ctx, types.NamespacedName{Namespace: server.Namespace, Name: desiredMigrationJob.Name}, foundMigrationJob)

if errors.IsNotFound(err) {
log.Info("Creating migration Job", "name", desiredMigrationJob.Name)
if err = r.Create(ctx, desiredMigrationJob); err != nil {
Expand All @@ -201,9 +205,11 @@ func (r *PrefectServerReconciler) reconcileMigrationJob(ctx context.Context, ser
condition = conditions.AlreadyExists(objName, errorMessage)

return &ctrl.Result{}, errors.NewBadRequest(errorMessage)
} else if jobNeedsUpdate(&foundMigrationJob.Spec, &desiredMigrationJob.Spec, log) {
// TODO: handle replacing the job if something has changed
} else if !isMigrationJobFinished(foundMigrationJob) {
log.Info("Waiting on active migration Job to complete", "name", foundMigrationJob.Name)
condition = conditions.AlreadyExists(objName, fmt.Sprintf("migration Job %s is still active", foundMigrationJob.Name))

return &ctrl.Result{Requeue: true}, nil
} else {
if !meta.IsStatusConditionTrue(server.Status.Conditions, "MigrationJobReconciled") {
condition = conditions.Updated(objName)
Expand Down Expand Up @@ -377,9 +383,15 @@ func pvcNeedsUpdate(current, desired *corev1.PersistentVolumeClaimSpec, log logr
return false
}

func jobNeedsUpdate(current, desired *batchv1.JobSpec, log logr.Logger) bool {
// TODO: check for changes to the job spec that require an update
return false
func isMigrationJobFinished(foundMigrationJob *batchv1.Job) bool {
switch {
case foundMigrationJob.Status.Succeeded > 0:
return true
case foundMigrationJob.Status.Failed > 0:
return true
default:
return false
}
}

func deploymentNeedsUpdate(current, desired *appsv1.DeploymentSpec, log logr.Logger) bool {
Expand Down Expand Up @@ -627,30 +639,35 @@ func (r *PrefectServerReconciler) postgresDeploymentSpec(server *prefectiov1.Pre
}

func (r *PrefectServerReconciler) postgresMigrationJob(server *prefectiov1.PrefectServer) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: server.Namespace,
Name: server.Name + "-migration",
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: server.ServerLabels(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "prefect-server-migration",
Image: server.Image(),
Command: []string{"prefect", "server", "database", "upgrade", "--yes"},
Env: append(append(server.ToEnvVars(), server.Spec.Postgres.ToEnvVars()...), server.Spec.Settings...),
},
jobSpec := batchv1.JobSpec{
TTLSecondsAfterFinished: ptr.To(int32(7 * 24 * 60 * 60)), // 7 days
mitchnielsen marked this conversation as resolved.
Show resolved Hide resolved
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: server.ServerLabels(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "prefect-server-migration",
Image: server.Image(),
Command: []string{"prefect", "server", "database", "upgrade", "--yes"},
Env: append(append(server.ToEnvVars(), server.Spec.Postgres.ToEnvVars()...), server.Spec.Settings...),
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
}

// Generate hash based on the job spec
hashSuffix, _ := utils.Hash(jobSpec, 8) // Use the first 8 characters of the hash
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: server.Namespace,
Name: fmt.Sprintf("%s-migration-%s", server.Name, hashSuffix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may eventually want to trim this name (and the name of all resources, I guess) to 63 characters in case the server.Name provided is unusually long for some reason. Or we can let it fail and we'll know why - just calling it out as a familiar speedbump.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call out. might make sense to validate a char limit for the .Name on the CR
#56

},
Spec: jobSpec,
}
}

func (r *PrefectServerReconciler) prefectServerService(server *prefectiov1.PrefectServer) corev1.Service {
Expand Down
92 changes: 82 additions & 10 deletions internal/controller/prefectserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,14 +1255,18 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

deployment = &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1430,11 +1434,12 @@ var _ = Describe("PrefectServer controller", func() {
})
Expect(err).NotTo(HaveOccurred())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

Expand Down Expand Up @@ -1508,6 +1513,7 @@ var _ = Describe("PrefectServer controller", func() {
})

Context("When updating a server backed by PostgreSQL", func() {
var controllerReconciler *PrefectServerReconciler
BeforeEach(func() {
prefectserver = &prefectiov1.PrefectServer{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1526,7 +1532,7 @@ var _ = Describe("PrefectServer controller", func() {
}
Expect(k8sClient.Create(ctx, prefectserver)).To(Succeed())

controllerReconciler := &PrefectServerReconciler{
controllerReconciler = &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
Expand All @@ -1538,6 +1544,18 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

// Set the first migration Job to be complete
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

prefectserver.Spec.Settings = []corev1.EnvVar{
{Name: "PREFECT_SOME_SETTING", Value: "some-value"},
}
Expand All @@ -1548,6 +1566,18 @@ var _ = Describe("PrefectServer controller", func() {
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())

// Set the second migration Job to be complete
_, _, desiredMigrationJob = controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
Comment on lines +1571 to +1580
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of annoying, but looking up a Job by name means i have to re-run the manifest generator every time to create the hashed name, which is based on the manifest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I remember thinking about using .metadata.generation from the CR since that should be unique and change each time the CR changes. But not sure if there are any drawbacks to that approach.

})

It("should update the Deployment with the new setting", func() {
Expand All @@ -1567,25 +1597,67 @@ var _ = Describe("PrefectServer controller", func() {
}))
})

It("should create new migration Job with the new setting", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
job := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, job)
}).Should(Succeed())
Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1))
container := job.Spec.Template.Spec.Containers[0]
Expect(container.Env).To(ContainElement(corev1.EnvVar{
Name: "PREFECT_SOME_SETTING",
Value: "some-value",
}))
})

It("should do nothing if an active migration Job already exists", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 0
migrateJob.Status.Failed = 0
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())

migrateJob2 := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob2)
}).Should(Succeed())

Expect(migrateJob2.Generation).To(Equal(migrateJob.Generation))
})

It("should not attempt to update a migration Job that it does not own", func() {
job := &batchv1.Job{}
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-on-postgres-migration",
Name: desiredMigrationJob.Name,
}, job)).To(Succeed())

job.OwnerReferences = nil
Expect(k8sClient.Update(ctx, job)).To(Succeed())

controllerReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).To(MatchError("Job prefect-on-postgres-migration already exists and is not controlled by PrefectServer prefect-on-postgres"))
Expect(err).To(MatchError(fmt.Sprintf("Job %s already exists and is not controlled by PrefectServer %s", desiredMigrationJob.Name, prefectserver.Name)))
})

It("should not attempt to update a Deployment that it does not own", func() {
Expand Down
18 changes: 18 additions & 0 deletions internal/utils/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package utils

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
)

func Hash(obj interface{}, length int) (string, error) {
data, err := json.Marshal(obj) // Serialize the object to JSON
if err != nil {
return "", err
}

hash := sha256.New()
hash.Write(data)
return hex.EncodeToString(hash.Sum(nil))[:length], nil
}
67 changes: 67 additions & 0 deletions internal/utils/hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package utils

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

func MigrationJobStub() *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-migration-job",
Namespace: "default",
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: ptr.To(int32(7 * 24 * 60 * 60)), // 7 days
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "prefect-server"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "prefect-server-migration",
Image: "prefecthq/prefect:2.10.4",
Command: []string{"prefect", "server", "database", "upgrade", "--yes"},
Env: []corev1.EnvVar{
{Name: "PREFECT_SERVER_DATABASE_CONNECTION_URL", Value: "postgresql://user:password@host:5432/db"},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
},
}
}

var _ = Describe("Hash Function", func() {
It("returns the same hash for the same job spec", func() {
By("comparing hash for the same job spec")
job := MigrationJobStub()
job2 := MigrationJobStub()
hash1, err := Hash(job.Spec, 8)
Expect(err).NotTo(HaveOccurred())
hash2, err := Hash(job2.Spec, 8)
Expect(err).NotTo(HaveOccurred())

Expect(hash1).To(Equal(hash2))
})

It("returns different hashes for different job specs", func() {
By("comparing hash for different job specs")
job := MigrationJobStub()
hash1, err := Hash(job.Spec, 8)
Expect(err).NotTo(HaveOccurred())

job.Spec.Template.Spec.Containers[0].Image = "image1:v2"
hash2, err := Hash(job.Spec, 8)
Expect(err).NotTo(HaveOccurred())

Expect(hash1).NotTo(Equal(hash2))
Comment on lines +61 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice

})
})
Loading