Skip to content

Commit

Permalink
Add reconciliation of coordinator services
Browse files Browse the repository at this point in the history
  • Loading branch information
as51340 committed Aug 7, 2024
1 parent f95a284 commit 50dd4e7
Showing 1 changed file with 227 additions and 73 deletions.
300 changes: 227 additions & 73 deletions internal/controller/memgraphha_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -94,69 +95,93 @@ func (r *MemgraphHAReconciler) Reconcile(ctx context.Context, req ctrl.Request)

logger.Info("MemgrahHA namespace", memgraphha.Namespace)

// TODO: andi unroll loop
for coordId := 1; coordId <= 3; coordId++ {
// ClusterIP
coordClusterIPStatus, coordClusterIPErr := r.reconcileCoordClusterIPService(ctx, memgraphha, &logger, coordId)
if coordClusterIPErr != nil {
logger.Info("Error returned when reconciling ClusterIP with id", coordId, "Returning empty Result with error.")
return ctrl.Result{}, coordClusterIPErr
}

coordsStatus, coordsErr := r.reconcileCoordinators(ctx, memgraphha, &logger)
if coordsErr != nil {
logger.Info("Error returned from reconciling coordinators. Returning empty Result with error.")
return ctrl.Result{}, coordsErr
}
if coordClusterIPStatus == true {
logger.Info("ClusterIP with id", coordId, "has been created. Returning Result with the request for requeing with error set to nil.")
return ctrl.Result{Requeue: true}, nil
}

// NodePort
coordNodePortStatus, coordNodePortErr := r.reconcileCoordNodePortService(ctx, memgraphha, &logger, coordId)
if coordNodePortErr != nil {
logger.Info("Error returned when reconciling NodePort with id", coordId, "Returning empty Result with error.")
return ctrl.Result{}, coordNodePortErr
}

if coordNodePortStatus == true {
logger.Info("NodePort with id", coordId, "has been created. Returning Result with the request for requeing with error set to nil.")
return ctrl.Result{Requeue: true}, nil
}

// Coordinator
coordStatus, coordErr := r.reconcileCoordinator(ctx, memgraphha, &logger, coordId)
if coordErr != nil {
logger.Info("Error returned when reconciling coordinator", coordId, "Returning empty Result with error.")
return ctrl.Result{}, coordErr
}

if coordsStatus == true {
logger.Info("One of coordinators has been created. Returning Result with the request for requeing with error=nil.")
return ctrl.Result{Requeue: true}, nil
if coordStatus == true {
logger.Info("Coordinator", coordId, "has been created. Returning Result with the request for requeing with error set to nil.")
return ctrl.Result{Requeue: true}, nil
}
}

logger.Info("Reconciliation of coordinators finished without actions needed.")

// TODO: (andi) unroll loop
for dataInstanceId := 0; dataInstanceId <= 1; dataInstanceId++ {

dataInstancesStatus, dataInstancesErr := r.reconcileDataInstances(ctx, memgraphha, &logger)
if dataInstancesErr != nil {
logger.Info("Error returned from reconciling data instances. Returning empty Result with error.")
return ctrl.Result{}, dataInstancesErr
}
// Data instance
dataInstancesStatus, dataInstancesErr := r.reconcileDataInstance(ctx, memgraphha, &logger, dataInstanceId)
if dataInstancesErr != nil {
logger.Info("Error returned when reconciling data instance", dataInstanceId, "Returning empty Result with error.")
return ctrl.Result{}, dataInstancesErr
}

if dataInstancesStatus == true {
logger.Info("One of data instances has been created. Returning Result with the request for requeing with error=nil.")
return ctrl.Result{Requeue: true}, nil
if dataInstancesStatus == true {
logger.Info("Data instance", dataInstanceId, "has been created. Returning Result with the request for requeing with error=nil.")
return ctrl.Result{Requeue: true}, nil
}
}

logger.Info("Reconciliation of data instances finished without actions needed.")

// The resource doesn't need to be reconciled anymore
return ctrl.Result{}, nil
}

func (r *MemgraphHAReconciler) reconcileDataInstances(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger) (bool, error) {
logger.Info("Started reconciling data instances")
status := false
func (r *MemgraphHAReconciler) reconcileDataInstance(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger, dataInstanceId int) (bool, error) {
name := fmt.Sprintf("memgraph-data-%d", dataInstanceId)
logger.Info("Started reconciling", name)
dataInstanceStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: memgraphha.Namespace}, dataInstanceStatefulSet)

for dataInstanceId := 0; dataInstanceId <= 1; dataInstanceId++ {
dataInstanceStatefulSet := &appsv1.StatefulSet{}
name := fmt.Sprintf("memgraph-data-%d", dataInstanceId)
logger.Info("Started reconciling", name)
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: memgraphha.Namespace}, dataInstanceStatefulSet)
if err != nil {
if errors.IsNotFound(err) {
dataInstance := r.createStatefulSetForDataInstance(memgraphha, dataInstanceId)
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", dataInstance.Namespace, "StatefulSet.Name", dataInstance.Name)
err := r.Create(ctx, dataInstance)
if err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dataInstance.Namespace, "StatefulSet.Name", dataInstance.Name)
return false, err
}
status = true
logger.Info("StatefulSet for", name, "is created. Setting status flag to true and continuing to reconcile other data instances. Caller should requeue.")
} else {
logger.Error(err, "Failed to fetch StatefulSet for", name, "Reconciliation loop is terminated.")
return false, err
}
} else {
logger.Info("StatefulSet for", name, "already exists. Keeping status flag as it was and continuing to reconcile other data instances.")
if err == nil {
logger.Info("StatefulSet", name, "already exists.")
return false, nil
}

if errors.IsNotFound(err) {
dataInstance := r.createStatefulSetForDataInstance(memgraphha, dataInstanceId)
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", dataInstance.Namespace, "StatefulSet.Name", dataInstance.Name)
err := r.Create(ctx, dataInstance)
if err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dataInstance.Namespace, "StatefulSet.Name", dataInstance.Name)
return true, err
}
logger.Info("StatefulSet", name, "is created.")
return true, nil
}

return status, nil
logger.Error(err, "Failed to fetch StatefulSet", name)
return true, err

}

func (r *MemgraphHAReconciler) createStatefulSetForDataInstance(memgraphha *memgraphv1.MemgraphHA, dataInstanceId int) *appsv1.StatefulSet {
Expand Down Expand Up @@ -320,43 +345,171 @@ func (r *MemgraphHAReconciler) createStatefulSetForDataInstance(memgraphha *memg
return data
}

func (r *MemgraphHAReconciler) reconcileCoordNodePortService(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger, coordId int) (bool, error) {
serviceName := fmt.Sprintf("memgraph-coordinator-%d-external", coordId)
logger.Info("Started reconciling NodePort service", serviceName)

coordNodePortService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: memgraphha.Namespace}, coordNodePortService)

if err == nil {
logger.Info("NodePort", serviceName, "already exists.")
return false, nil
}

if errors.IsNotFound(err) {
nodePort := r.createNodePort(memgraphha, coordId)
logger.Info("Creating a new NodePort", "NodePort.Namespace", nodePort.Namespace, "NodePort.Name", nodePort.Name)
err := r.Create(ctx, nodePort)
if err != nil {
logger.Error(err, "Failed to create new NodePort", "NodePort.Namespace", nodePort.Namespace, "NodePort.Name", nodePort.Name)
return true, err
}
logger.Info("NodePort", serviceName, "is created.")
return true, nil
}

logger.Error(err, "Failed to fetch NodePort", serviceName)
return true, err

}

func (r *MemgraphHAReconciler) createNodePort(memgraphha *memgraphv1.MemgraphHA, coordId int) *corev1.Service {
serviceName := fmt.Sprintf("memgraph-coordinator-%d-external", coordId)
coordName := fmt.Sprintf("memgraph-coordinator-%d", coordId)
// TODO: (andi) Extract somehow configuration and move into separate files.
boltPort := 7687

coordNodePort := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: memgraphha.Namespace,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: createCoordLabels(coordName),
Ports: []corev1.ServicePort{
{
Name: "bolt",
Protocol: corev1.ProtocolTCP,
Port: int32(boltPort),
TargetPort: intstr.FromInt(boltPort),
},
},
},
}

ctrl.SetControllerReference(memgraphha, coordNodePort, r.Scheme)
return coordNodePort
}

func (r *MemgraphHAReconciler) reconcileCoordClusterIPService(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger, coordId int) (bool, error) {
serviceName := fmt.Sprintf("memgraph-coordinator-%d", coordId)
logger.Info("Started reconciling ClusterIP service", serviceName)

coordClusterIPService := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: memgraphha.Namespace}, coordClusterIPService)

if err == nil {
logger.Info("ClusterIP", serviceName, "already exists.")
return false, nil
}

if errors.IsNotFound(err) {
clusterIP := r.createCoordClusterIP(memgraphha, coordId)
logger.Info("Creating a new ClusterIP", "ClusterIP.Namespace", clusterIP.Namespace, "ClusterIP.Name", clusterIP.Name)
err := r.Create(ctx, clusterIP)
if err != nil {
logger.Error(err, "Failed to create new ClusterIP", "ClusterIP.Namespace", clusterIP.Namespace, "ClusterIP.Name", clusterIP.Name)
return true, err
}
logger.Info("ClusterIP", serviceName, "is created.")
return true, nil
}

logger.Error(err, "Failed to fetch ClusterIP", serviceName)
return true, err

}

func (r *MemgraphHAReconciler) createCoordClusterIP(memgraphha *memgraphv1.MemgraphHA, coordId int) *corev1.Service {
serviceName := fmt.Sprintf("memgraph-coordinator-%d", coordId)
coordName := serviceName
// TODO: (andi) Extract somehow configuration and move into separate files.
boltPort := 7687
coordinatorPort := 12000
mgmtPort := 10000

coordClusterIP := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: memgraphha.Namespace,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: createCoordLabels(coordName),
Ports: []corev1.ServicePort{
{
Name: "bolt",
Protocol: corev1.ProtocolTCP,
Port: int32(boltPort),
TargetPort: intstr.FromInt(boltPort),
},
{
Name: "coordinator",
Protocol: corev1.ProtocolTCP,
Port: int32(coordinatorPort),
TargetPort: intstr.FromInt(coordinatorPort),
},
{
Name: "management",
Protocol: corev1.ProtocolTCP,
Port: int32(mgmtPort),
TargetPort: intstr.FromInt(mgmtPort),
},
},
},
}

ctrl.SetControllerReference(memgraphha, coordClusterIP, r.Scheme)
return coordClusterIP
}

/*
Returns bool, error tuple. If error exists, the caller should return with error and status will always be set to false.
If there is no error, we must look at bool status which when true will say that some coordinator was created (or all of them) and we need to requeue
or we need to go to the next step of reconciliation.
Returns bool, error tuple. If error exists, the caller should return with error and status will always be set to true.
If there is no error, we must look at bool status which when true will say that the coordinator was createdand we need to requeue
or that nothing was done and we can continue with the next step of reconciliation.
*/
func (r *MemgraphHAReconciler) reconcileCoordinators(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger) (bool, error) {
logger.Info("Started reconciling coordinators")
status := false
for coordId := 1; coordId <= 3; coordId++ {
coordStatefulSet := &appsv1.StatefulSet{}
name := fmt.Sprintf("memgraph-coordinator-%d", coordId)
logger.Info("Started reconciling", name)
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: memgraphha.Namespace}, coordStatefulSet)
func (r *MemgraphHAReconciler) reconcileCoordinator(ctx context.Context, memgraphha *memgraphv1.MemgraphHA, logger *logr.Logger, coordId int) (bool, error) {
name := fmt.Sprintf("memgraph-coordinator-%d", coordId)
logger.Info("Started reconciling", name)
coordStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: memgraphha.Namespace}, coordStatefulSet)

if err == nil {
logger.Info("StatefulSet", name, "already exists.")
return false, nil
}

if errors.IsNotFound(err) {
coord := r.createStatefulSetForCoord(memgraphha, coordId)
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", coord.Namespace, "StatefulSet.Name", coord.Name)
err := r.Create(ctx, coord)
if err != nil {
if errors.IsNotFound(err) {
coord := r.createStatefulSetForCoord(memgraphha, coordId)
logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", coord.Namespace, "StatefulSet.Name", coord.Name)
err := r.Create(ctx, coord)
if err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", coord.Namespace, "StatefulSet.Name", coord.Name)
return false, err
}
status = true
logger.Info("StatefulSet for", name, "is created. Setting status flag to true and continuing to reconcile other coordinators. Caller should requeue.")
} else {
logger.Error(err, "Failed to fetch StatefulSet for", name, "Reconciliation loop is terminated.")
return false, err
}
} else {
logger.Info("StatefulSet for", name, "already exists. Keeping status flag as it was and continuing to reconcile other coordinators.")
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", coord.Namespace, "StatefulSet.Name", coord.Name)
return true, err
}
logger.Info("StatefulSet", name, "is created.")
return true, nil
}
return status, nil

logger.Error(err, "Failed to fetch StatefulSet", name)
return true, err
}

func (r *MemgraphHAReconciler) createStatefulSetForCoord(memgraphha *memgraphv1.MemgraphHA, coordId int) *appsv1.StatefulSet {
coordName := fmt.Sprintf("memgraph-coordinator-%d", coordId)
serviceName := coordName
labels := createCoordLabels(coordName)
replicas := int32(1)
containerName := "memgraph-coordinator"
Expand Down Expand Up @@ -405,7 +558,8 @@ func (r *MemgraphHAReconciler) createStatefulSetForCoord(memgraphha *memgraphv1.
Namespace: memgraphha.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: serviceName,
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Expand Down

0 comments on commit 50dd4e7

Please sign in to comment.