Skip to content

Commit

Permalink
Merge pull request #18 from numberly/rate-limit-vault-api
Browse files Browse the repository at this point in the history
feat: implement a new rate limit on the vault API to avoid 429 error
  • Loading branch information
SoulKyu authored Jun 10, 2024
2 parents 5eb599f + a537831 commit 2d3b69d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
SyncTTLSecond int `yaml:"syncTTLSecond" envconfig:"sync_ttl_second"`
InjectorLabel string `yaml:"injectorLabel" envconfig:"injector_label"`
DefaultEngine string `yaml:"defaultEngine" envconfig:"default_engine"`
VaultRateLimit int `yaml:"vaultRateLimit" envconfig:"vault_rate_limit"`
}

func NewConfig(configFile string) (*Config, error) {
Expand All @@ -45,6 +46,7 @@ func NewConfig(configFile string) (*Config, error) {
SyncTTLSecond: 300,
InjectorLabel: "vault-db-injector",
DefaultEngine: "databases",
VaultRateLimit: 50,
}
if configFile != "" {
data, err := os.ReadFile(configFile)
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8smutator/k8smutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func handlePodConfiguration(ctx context.Context, cfg *config.Config, dbConfs *[]
logger.Errorf("Their is an issue with the db Configuration")
return nil, "db-role not found", nil, err
}
vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok)
vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok, cfg.VaultRateLimit)
if err := vaultConn.Login(ctx); err != nil {
return nil, dbConf.Role, nil, errors.Newf("cannot authenticate vault role: %s", err.Error())
}
Expand Down
51 changes: 38 additions & 13 deletions pkg/vault/handle_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/numberly/vault-db-injector/pkg/config"
"github.com/numberly/vault-db-injector/pkg/k8s"
promInjector "github.com/numberly/vault-db-injector/pkg/prometheus"
"golang.org/x/time/rate"
)

type KeyInformation struct {
Expand Down Expand Up @@ -119,24 +120,35 @@ func (c *Connector) ListKeyInformations(ctx context.Context, path, prefix string
var wg sync.WaitGroup
keyInformationsChan := make(chan *KeyInformation, len(keys))

// Create a rate limiter
rateLimit := rate.Limit(c.VaultRateLimit) // requests per second
limiter := rate.NewLimiter(rateLimit, 1)

for _, k := range keys {
wg.Add(1)
go func(k interface{}) {
defer wg.Done()

// Wait for the rate limiter
if err := limiter.Wait(ctx); err != nil {
c.Log.Errorf("Rate limiter error: %v", err)
return
}

podName := strings.TrimSuffix(k.(string), "/")

// Utiliser le préfixe pour lire les données
dataPath := fmt.Sprintf("%s/data/%s/%s", path, prefix, podName)
podSecret, err := c.client.Logical().ReadWithContext(ctx, dataPath)
if err != nil {
c.Log.Errorf("Error while trying to recover data informations for : %s: %v", podName, err)
c.Log.Errorf("Error while trying to recover data informations for: %s: %v", podName, err)
return
}

if podSecret == nil || podSecret.Data == nil || podSecret.Data["data"] == nil {
status, err := c.DeleteData(ctx, podName, path, podName, "", prefix)
if err != nil {
c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", podName, status, err.Error())
c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", podName, status, err.Error())
}
return
}
Expand Down Expand Up @@ -189,61 +201,73 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn
return false
}

// Créer une map pour une recherche rapide des podsInformations
// Create a map for quick lookup of pod information
podInfoMap := make(map[string]k8s.PodInformations)
for _, pi := range podsInformations {
for _, uuid := range pi.PodNameUUIDs {
podInfoMap[uuid] = pi
}

}

var KubePolicies []string
KubePolicies = append(KubePolicies, c.authRole)
_, err = c.CreateOrphanToken(ctx, "1h", KubePolicies)
if err != nil {
c.Log.Errorf("Can't create orphan ticket : %v", err)
c.Log.Errorf("Can't create orphan ticket: %v", err)
c.Log.Error("Token renew has been cancelled")
return false
}

// Create a rate limiter
rateLimit := rate.Limit(cfg.VaultRateLimit) // requests per second
limiter := rate.NewLimiter(rateLimit, 1)

var wg sync.WaitGroup
var isOk bool = true

for _, ki := range keysInformations {
wg.Add(1)
go func(ki *KeyInformation) {
defer wg.Done()

// Wait for the rate limiter
if err := limiter.Wait(ctx); err != nil {
c.Log.Errorf("Rate limiter error: %v", err)
isOk = false
return
}

if _, found := podInfoMap[ki.PodNameUID]; found {
err := c.RenewToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace, SyncTTLSecond)
if err != nil {
c.Log.Errorf("Can't renew Token with pod UUID : %s", ki.PodNameUID)
c.Log.Errorf("Can't renew Token with pod UUID: %s", ki.PodNameUID)
isOk = false
return
}
err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1week
err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1 week
if err != nil {
c.Log.Errorf("Can't renew Lease with pod UUID : %s", ki.PodNameUID)
c.Log.Errorf("Can't renew Lease with pod UUID: %s", ki.PodNameUID)
isOk = false
return
}
} else {
leaseTooYoung, err := c.isLeaseTooYoung(ctx, ki.LeaseId)
if err != nil {
c.Log.Debug("error while trying to retrieve lease age, lease will be cleaned")
c.Log.Debug("Error while trying to retrieve lease age, lease will be cleaned")
}
if leaseTooYoung {
c.Log.Infof("This lease : %s is too young to be cleaned up.", ki.LeaseId)
c.Log.Infof("This lease: %s is too young to be cleaned up.", ki.LeaseId)
return
}
err = c.RevokeOrphanToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace)
if err != nil {
c.Log.Errorf("Can't revok Token with UUID : %s", ki.PodNameUID)
c.Log.Errorf("Can't revoke Token with UUID: %s", ki.PodNameUID)
isOk = false
return
}
status, err := c.DeleteData(ctx, ki.PodNameUID, secretName, ki.PodNameUID, ki.Namespace, prefix)
if err != nil {
c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", ki.PodNameUID, status, err.Error())
c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", ki.PodNameUID, status, err.Error())
isOk = false
return
}
Expand All @@ -252,10 +276,11 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn
promInjector.RenewLeaseCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
promInjector.RenewTokenCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
promInjector.DataDeletedCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
c.Log.Infof("Token has been revoked and data deleted : %s", status)
c.Log.Infof("Token has been revoked and data deleted: %s", status)
}
}(ki)
}

wg.Wait()
c.RevokeSelfToken(ctx, c.client.Token(), "", "")
c.SetToken(c.K8sSaVaultToken)
Expand Down
20 changes: 11 additions & 9 deletions pkg/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Connector struct {
client *vault.Client
RenewalInterval time.Duration
Log logger.Logger
VaultRateLimit int
}

func (c *Connector) GetToken() string {
Expand All @@ -46,15 +47,16 @@ type DbCreds struct {
DbTokenId string
}

func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string) *Connector {
func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string, VaultRateLimit int) *Connector {
return &Connector{
address: address,
authPath: authPath,
dbRole: dbRole,
dbMountPath: dbMountPath,
k8sSaToken: token,
authRole: authRole,
Log: logger.GetLogger(),
address: address,
authPath: authPath,
dbRole: dbRole,
dbMountPath: dbMountPath,
k8sSaToken: token,
authRole: authRole,
Log: logger.GetLogger(),
VaultRateLimit: VaultRateLimit,
}
}

Expand All @@ -67,7 +69,7 @@ func ConnectToVault(ctx context.Context, cfg *config.Config) (*Connector, error)
return nil, errors.Newf("cannot get ServiceAccount token: %s", err.Error())
}
// Configure vault connection using serviceAccount token
vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok)
vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok, cfg.VaultRateLimit)
if err := vaultConn.Login(ctx); // Assuming Login is modified to accept a context
err != nil {
promInjector.ConnectVaultError.WithLabelValues().Inc()
Expand Down

0 comments on commit 2d3b69d

Please sign in to comment.