Skip to content

Commit

Permalink
final
Browse files Browse the repository at this point in the history
  • Loading branch information
parkedwards committed Aug 28, 2024
1 parent 2340777 commit 2eccf47
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 23 deletions.
21 changes: 17 additions & 4 deletions internal/controller/prefectserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func (r *PrefectServerReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return *result, err
}

// Reconcile the migration job, if one is required
// Reconcile the migration job, if one is required.
// NOTE: if an active migration is still running,
// this reconciliation will requeue and exit early here.
result, err = r.reconcileMigrationJob(ctx, server, desiredMigrationJob, log)
if result != nil {
return *result, err
Expand Down Expand Up @@ -205,6 +207,11 @@ func (r *PrefectServerReconciler) reconcileMigrationJob(ctx context.Context, ser
condition = conditions.AlreadyExists(objName, errorMessage)

return &ctrl.Result{}, errors.NewBadRequest(errorMessage)
} else if !isMigrationJobFinished(foundMigrationJob) {
log.Info("Waiting on active migration Job to complete", "name", foundMigrationJob.Name)
condition = conditions.AlreadyExists(objName, fmt.Sprintf("migration Job %s is still active", foundMigrationJob.Name))

return &ctrl.Result{Requeue: true}, nil
} else {
if !meta.IsStatusConditionTrue(server.Status.Conditions, "MigrationJobReconciled") {
condition = conditions.Updated(objName)
Expand Down Expand Up @@ -362,9 +369,15 @@ func pvcNeedsUpdate(current, desired *corev1.PersistentVolumeClaimSpec, log logr
return false
}

func migrationJobNeedsUpdate(current, desired *batchv1.JobSpec, log logr.Logger) bool {
merged := current.DeepCopy()
return needsUpdate(current, merged, desired, log)
func isMigrationJobFinished(foundMigrationJob *batchv1.Job) bool {
switch {
case foundMigrationJob.Status.Succeeded > 0:
return true
case foundMigrationJob.Status.Failed > 0:
return true
default:
return false
}
}

func deploymentNeedsUpdate(current, desired *appsv1.DeploymentSpec, log logr.Logger) bool {
Expand Down
97 changes: 78 additions & 19 deletions internal/controller/prefectserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/utils"
)

var _ = Describe("PrefectServer controller", func() {
Expand Down Expand Up @@ -1122,15 +1121,17 @@ var _ = Describe("PrefectServer controller", func() {
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
hashSuffix, _ := utils.Hash(desiredMigrationJob.Spec, 8)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: fmt.Sprintf("prefect-on-postgres-migration-%s", hashSuffix),
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

deployment = &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1299,12 +1300,11 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
hashSuffix, _ := utils.Hash(desiredMigrationJob.Spec, 8)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: fmt.Sprintf("prefect-on-postgres-migration-%s", hashSuffix),
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

Expand Down Expand Up @@ -1378,6 +1378,7 @@ var _ = Describe("PrefectServer controller", func() {
})

Context("When updating a server backed by PostgreSQL", func() {
var controllerReconciler *PrefectServerReconciler
BeforeEach(func() {
prefectserver = &prefectiov1.PrefectServer{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1396,7 +1397,7 @@ var _ = Describe("PrefectServer controller", func() {
}
Expect(k8sClient.Create(ctx, prefectserver)).To(Succeed())

controllerReconciler := &PrefectServerReconciler{
controllerReconciler = &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
Expand All @@ -1408,6 +1409,18 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

// Set the first migration Job to be complete
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())

prefectserver.Spec.Settings = []corev1.EnvVar{
{Name: "PREFECT_SOME_SETTING", Value: "some-value"},
}
Expand All @@ -1419,11 +1432,17 @@ var _ = Describe("PrefectServer controller", func() {
})
Expect(err).NotTo(HaveOccurred())

// Reconcile again to update the server
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())
// Set the second migration Job to be complete
_, _, desiredMigrationJob = controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())
migrateJob.Status.Succeeded = 1
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
})

It("should update the Deployment with the new setting", func() {
Expand All @@ -1443,18 +1462,58 @@ var _ = Describe("PrefectServer controller", func() {
}))
})

It("should not attempt to update a migration Job that it does not own", func() {
controllerReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
}
It("should create new migration Job with the new setting", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
job := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, job)
}).Should(Succeed())
Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1))
container := job.Spec.Template.Spec.Containers[0]
Expect(container.Env).To(ContainElement(corev1.EnvVar{
Name: "PREFECT_SOME_SETTING",
Value: "some-value",
}))
})

It("should do nothing if an active migration Job already exists", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob)
}).Should(Succeed())

migrateJob.Status.Succeeded = 0
migrateJob.Status.Failed = 0
Expect(k8sClient.Status().Update(ctx, migrateJob)).To(Succeed())
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).NotTo(HaveOccurred())

migrateJob2 := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: desiredMigrationJob.Name,
}, migrateJob2)
}).Should(Succeed())

Expect(migrateJob2.Generation).To(Equal(migrateJob.Generation))
})

It("should not attempt to update a migration Job that it does not own", func() {
job := &batchv1.Job{}
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
hashSuffix, _ := utils.Hash(desiredMigrationJob.Spec, 8)
Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: namespaceName,
Name: fmt.Sprintf("prefect-on-postgres-migration-%s", hashSuffix),
Name: desiredMigrationJob.Name,
}, job)).To(Succeed())

job.OwnerReferences = nil
Expand All @@ -1463,7 +1522,7 @@ var _ = Describe("PrefectServer controller", func() {
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: name,
})
Expect(err).To(MatchError(fmt.Sprintf("Job prefect-on-postgres-migration-%s already exists and is not controlled by PrefectServer prefect-on-postgres", hashSuffix)))
Expect(err).To(MatchError(fmt.Sprintf("Job %s already exists and is not controlled by PrefectServer %s", desiredMigrationJob.Name, prefectserver.Name)))
})

It("should not attempt to update a Deployment that it does not own", func() {
Expand Down

0 comments on commit 2eccf47

Please sign in to comment.