Skip to content

Commit

Permalink
add fluent bit component (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
Muzry authored Nov 23, 2021
1 parent 0c7fa02 commit 43e9863
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4
1.5-alpha.1
12 changes: 4 additions & 8 deletions pkg/cluster/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,21 @@ func CreateOrUpdate(
func Delete(
client kubernetes.Interface,
dicesvcname string,
dicesvc *diceyml.Service,
clus *spec.DiceCluster,
ownerRefs []metav1.OwnerReference) error {
generatedDS, err := BuildDaemonSet(dicesvcname, dicesvc, clus, ownerRefs)
if err != nil {
return err
}
return client.AppsV1().DaemonSets(clus.Namespace).Delete(context.Background(), generatedDS.Name, metav1.DeleteOptions{})
clus *spec.DiceCluster) error {
return client.AppsV1().DaemonSets(clus.Namespace).Delete(context.Background(), GenName(dicesvcname, clus), metav1.DeleteOptions{})
}

func BuildDaemonSet(
dicesvcname string,
dicesvc *diceyml.Service,
clus *spec.DiceCluster,
ownerRefs []metav1.OwnerReference) (*appsv1.DaemonSet, error) {

vols, volmounts, err := deployment.Volumes(dicesvc)
if err != nil {
return nil, err
}

livenessProbe, err := deployment.LivenessProbe(dicesvcname, dicesvc)
if err != nil {
return nil, err
Expand Down
61 changes: 58 additions & 3 deletions pkg/cluster/diff/dicediff.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

const (
fdpAgent = "fdp-agent"
fluentbit = "fluent-bit"
filebeat = "filebeat"
soldier = "soldier"
telegraf = "telegraf"
Expand Down Expand Up @@ -174,6 +175,15 @@ type SpecDiff struct {
currentSpotMonitorServices map[string]*diceyml.Service
targetSpotMonitorServices map[string]*diceyml.Service

// spec.fluentBit
fluentBitGlobalEnvDiff bool
currentFluentBitGlobalEnv map[string]string
targetFluentBitGlobalEnv map[string]string

fluentBitServiceDiff bool
currentFluentBitServices map[string]*diceyml.Service
targetFluentBitServices map[string]*diceyml.Service

// spec.fdp
fdpGlobalEnvDiff bool
currentFdpGlobalEnv map[string]string
Expand Down Expand Up @@ -278,6 +288,8 @@ func NewSpecDiff(current, target *spec.DiceCluster) *SpecDiff {
targetFdpGlobalEnv: make(map[string]string),
currentMeshControllerGlobalEnv: make(map[string]string),
targetMeshControllerGlobalEnv: make(map[string]string),
currentFluentBitGlobalEnv: make(map[string]string),
targetFluentBitGlobalEnv: make(map[string]string),

currentDiceServices: make(map[string]*diceyml.Service),
targetDiceServices: make(map[string]*diceyml.Service),
Expand Down Expand Up @@ -313,6 +325,8 @@ func NewSpecDiff(current, target *spec.DiceCluster) *SpecDiff {
targetFdpServices: make(map[string]*diceyml.Service),
currentMeshControllerServices: make(map[string]*diceyml.Service),
targetMeshControllerServices: make(map[string]*diceyml.Service),
currentFluentBitServices: make(map[string]*diceyml.Service),
targetFluentBitServices: make(map[string]*diceyml.Service),
}

if current == nil {
Expand All @@ -339,6 +353,7 @@ func NewSpecDiff(current, target *spec.DiceCluster) *SpecDiff {
diffSpotMonitor(*diceyml.CopyObj(&current.Spec.SpotMonitor), *diceyml.CopyObj(&target.Spec.SpotMonitor), &r)
diffFdp(*diceyml.CopyObj(&current.Spec.Fdp), *diceyml.CopyObj(&target.Spec.Fdp), &r)
diffMeshController(*diceyml.CopyObj(&current.Spec.MeshController), *diceyml.CopyObj(&target.Spec.MeshController), &r)
diffFluentBit(*diceyml.CopyObj(&current.Spec.FluentBit), *diceyml.CopyObj(&target.Spec.FluentBit), &r)
if len(target.Spec.MainPlatform) > 0 {
r.filterEdgeClusterServices()
}
Expand All @@ -351,6 +366,7 @@ func (d *SpecDiff) filterEdgeClusterServices() {
fdpAgent,
telegrafApp,
telegrafAppEdge,
fluentbit,
filebeat,
telegraf,
telegrafEdge,
Expand Down Expand Up @@ -403,6 +419,8 @@ func (d *SpecDiff) filterEdgeClusterServices() {
f(d.targetFdpServices)
f(d.currentMeshControllerServices)
f(d.targetMeshControllerServices)
f(d.currentFluentBitServices)
f(d.targetFluentBitServices)

}

Expand Down Expand Up @@ -545,6 +563,21 @@ func (d *SpecDiff) GetActions() *Actions {
mergemap(r.UpdatedServices, differentServices)
}

// spec.fluentBit
missingInSet1, missingInSet2, shared = diffServiceset(d.currentFluentBitServices, d.targetFluentBitServices)
differentServices = getDifferentServices(d.currentFluentBitServices, d.targetFluentBitServices, shared)
expandGlobalEnv(d.targetFluentBitGlobalEnv, missingInSet1)
expandGlobalEnv(d.targetFluentBitGlobalEnv, missingInSet2)
expandGlobalEnv(d.targetFluentBitGlobalEnv, shared)

mergemap(r.AddedDaemonSet, missingInSet1)
mergemap(r.DeletedDaemonSet, missingInSet2)
if d.fluentBitGlobalEnvDiff {
mergemap(r.UpdatedDaemonSet, shared)
} else {
mergemap(r.UpdatedDaemonSet, differentServices)
}

// spec.spotFilebeat (Daemonset)
missingInSet1, missingInSet2, shared = diffServiceset(d.currentSpotFilebeatServices, d.targetSpotFilebeatServices)
differentServices = getDifferentServices(d.currentSpotFilebeatServices, d.targetSpotFilebeatServices, shared)
Expand Down Expand Up @@ -583,14 +616,14 @@ func (d *SpecDiff) GetActions() *Actions {
expandGlobalEnv(d.targetSpotTelegrafGlobalEnv, shared)

for name, dicesvc := range missingInSet1 {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge || name == fluentbit {
mergemap(r.AddedDaemonSet, map[string]*diceyml.Service{name: dicesvc})
} else {
mergemap(r.AddedServices, map[string]*diceyml.Service{name: dicesvc})
}
}
for name, dicesvc := range missingInSet2 {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge || name == fluentbit {
mergemap(r.DeletedDaemonSet, map[string]*diceyml.Service{name: dicesvc})
} else {
mergemap(r.DeletedServices, map[string]*diceyml.Service{name: dicesvc})
Expand All @@ -601,7 +634,7 @@ func (d *SpecDiff) GetActions() *Actions {
updatedSet = shared
}
for name, dicesvc := range updatedSet {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge {
if name == telegraf || name == telegrafApp || name == telegrafEdge || name == telegrafAppEdge || name == fluentbit {
mergemap(r.UpdatedDaemonSet, map[string]*diceyml.Service{name: dicesvc})
} else {
mergemap(r.UpdatedServices, map[string]*diceyml.Service{name: dicesvc})
Expand Down Expand Up @@ -720,6 +753,7 @@ func diffFromBlank(target *spec.DiceCluster, specdiff *SpecDiff) {
specdiff.spotMonitorGlobalEnvDiff = true
specdiff.fdpGlobalEnvDiff = true
specdiff.meshControllerGlobalEnvDiff = true
specdiff.fluentBitGlobalEnvDiff = true

dice := diceyml.CopyObj(&target.Spec.Dice)
addonplatform := diceyml.CopyObj(&target.Spec.AddonPlatform)
Expand All @@ -738,6 +772,7 @@ func diffFromBlank(target *spec.DiceCluster, specdiff *SpecDiff) {
spotMonitor := diceyml.CopyObj(&target.Spec.SpotMonitor)
fdp := diceyml.CopyObj(&target.Spec.Fdp)
meshController := diceyml.CopyObj(&target.Spec.MeshController)
fluentBit := diceyml.CopyObj(&target.Spec.FluentBit)

specdiff.targetDiceGlobalEnv = dice.Envs
specdiff.targetAddonPlatformGlobalEnv = addonplatform.Envs
Expand All @@ -756,6 +791,7 @@ func diffFromBlank(target *spec.DiceCluster, specdiff *SpecDiff) {
specdiff.targetSpotMonitorGlobalEnv = spotMonitor.Envs
specdiff.targetFdpGlobalEnv = fdp.Envs
specdiff.targetMeshControllerGlobalEnv = meshController.Envs
specdiff.targetFluentBitGlobalEnv = fluentBit.Envs

specdiff.diceServiceDiff = true
specdiff.addonPlatformServiceDiff = true
Expand All @@ -774,6 +810,7 @@ func diffFromBlank(target *spec.DiceCluster, specdiff *SpecDiff) {
specdiff.spotMonitorServiceDiff = true
specdiff.fdpServiceDiff = true
specdiff.meshControllerServiceDiff = true
specdiff.fluentBitServiceDiff = true

specdiff.targetDiceServices = dice.Services
specdiff.targetAddonPlatformServices = addonplatform.Services
Expand All @@ -792,6 +829,7 @@ func diffFromBlank(target *spec.DiceCluster, specdiff *SpecDiff) {
specdiff.targetSpotMonitorServices = spotMonitor.Services
specdiff.targetFdpServices = fdp.Services
specdiff.targetMeshControllerServices = meshController.Services
specdiff.targetFluentBitServices = fluentBit.Services

}

Expand Down Expand Up @@ -873,6 +911,11 @@ func diffMeshController(current, target diceyml.Object, specdiff *SpecDiff) {
diffMeshControllerServices(current.Services, target.Services, specdiff)
}

func diffFluentBit(current, target diceyml.Object, specdiff *SpecDiff) {
diffFluentBitGlobalEnv(current.Envs, target.Envs, specdiff)
diffFluentBitServices(current.Services, target.Services, specdiff)
}

func diffDiceGlobalEnv(current, target map[string]string, specdiff *SpecDiff) {
auxDiffGlobalEnv(current, target,
&specdiff.currentDiceGlobalEnv, &specdiff.targetDiceGlobalEnv, &specdiff.diceGlobalEnvDiff)
Expand Down Expand Up @@ -963,6 +1006,12 @@ func diffMeshControllerGlobalEnv(current, target map[string]string, specdiff *Sp
&specdiff.meshControllerGlobalEnvDiff)
}

func diffFluentBitGlobalEnv(current, target map[string]string, specdiff *SpecDiff) {
auxDiffGlobalEnv(current, target,
&specdiff.currentFluentBitGlobalEnv, &specdiff.currentFluentBitGlobalEnv,
&specdiff.fluentBitGlobalEnvDiff)
}

func auxDiffGlobalEnv(current, target map[string]string, specCurrent, specTarget *map[string]string, diff *bool) {
if len(current) != len(target) {
*diff = true
Expand Down Expand Up @@ -1071,6 +1120,12 @@ func diffMeshControllerServices(current, target map[string]*diceyml.Service, spe
&specdiff.meshControllerServiceDiff)
}

func diffFluentBitServices(current, target map[string]*diceyml.Service, specdiff *SpecDiff) {
auxDiffServices(current, target,
&specdiff.currentFluentBitServices, &specdiff.targetFluentBitServices,
&specdiff.fluentBitServiceDiff)
}

func auxDiffServices(current, target map[string]*diceyml.Service, specCurrent, specTarget *map[string]*diceyml.Service, diff *bool) {
if len(current) != len(target) {
*diff = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/launch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (l *Launcher) launchUpdatedDS(c chan result, svcName string, diceSvc *dicey
}

func (l *Launcher) launchDeletedDS(c chan result, svcName string, diceSvc *diceyml.Service) {
if err := daemonset.Delete(l.client, svcName, diceSvc, l.targetspec, l.ownerRefs); err != nil {
if err := daemonset.Delete(l.client, svcName, l.targetspec); err != nil {
msg := fmt.Sprintf("Failed to delete daemonset: dicesvc: %s, err: %v", svcName, err)
c <- result{svcName, msg, false, spec.ClusterPhaseFailed}
return
Expand Down
1 change: 1 addition & 0 deletions pkg/envs/envs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func GetAllServices(cluster *spec.DiceCluster) []diceyml.Services {
cluster.Spec.SpotCollector.Services,
cluster.Spec.SpotDashboard.Services,
cluster.Spec.SpotFilebeat.Services,
cluster.Spec.FluentBit.Services,
cluster.Spec.SpotStatus.Services,
cluster.Spec.SpotTelegraf.Services,
cluster.Spec.Tmc.Services,
Expand Down
1 change: 1 addition & 0 deletions pkg/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ClusterSpec struct {
SpotStatus diceyml.Object `json:"spotStatus"`
SpotTelegraf diceyml.Object `json:"spotTelegraf"`
Tmc diceyml.Object `json:"tmc"`
FluentBit diceyml.Object `json:"fluentBit"`
Hepa diceyml.Object `json:"hepa"`
SpotMonitor diceyml.Object `json:"spotMonitor"`
Fdp diceyml.Object `json:"fdp"`
Expand Down

0 comments on commit 43e9863

Please sign in to comment.