Skip to content

Commit

Permalink
Make modify volume interval configurable
Browse files Browse the repository at this point in the history
We're having problems related to race conditions when several changes
are applied to volumes. We don't want those changes to be applied
independently, so we're extracting this param to a driverOption to make it
configurable from the outside.
  • Loading branch information
adolsalamanca committed Jan 25, 2024
1 parent 1511973 commit 5f6fc61
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 16 deletions.
6 changes: 4 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

flag "github.com/spf13/pflag"

logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/logs/json"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
logsapi "k8s.io/component-base/logs/api/v1"
json "k8s.io/component-base/logs/json"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -73,6 +74,7 @@ func main() {
driver.WithUserAgentExtra(options.ControllerOptions.UserAgentExtra),
driver.WithOtelTracing(options.ServerOptions.EnableOtelTracing),
driver.WithBatching(options.ControllerOptions.Batching),
driver.WithCustomModifyVolumeInterval(options.ControllerOptions.ModifyVolumeInterval),
)
if err != nil {
klog.ErrorS(err, "failed to create driver")
Expand Down
5 changes: 5 additions & 0 deletions cmd/options/controller_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package options

import (
"time"

flag "github.com/spf13/pflag"

cliflag "k8s.io/component-base/cli/flag"
Expand All @@ -41,6 +43,8 @@ type ControllerOptions struct {
UserAgentExtra string
// flag to enable batching of API calls
Batching bool
// ModifyVolumeInterval is the interval of time that the controller waits to process a volume change, default is 2s.
ModifyVolumeInterval time.Duration
}

func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
Expand All @@ -51,4 +55,5 @@ func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.WarnOnInvalidTag, "warn-on-invalid-tag", false, "To warn on invalid tags, instead of returning an error")
fs.StringVar(&s.UserAgentExtra, "user-agent-extra", "", "Extra string appended to user agent.")
fs.BoolVar(&s.Batching, "batching", false, "To enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits.")
fs.DurationVar(&s.ModifyVolumeInterval, "modify-volume-interval", 2*time.Second, "is the interval of time that the controller waits to process a volume change, default is 2s. This interval might be useful to avoid race conditions of subsequent changes")
}
5 changes: 3 additions & 2 deletions pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
)

const (
Expand Down Expand Up @@ -118,7 +119,7 @@ func (d *controllerService) processModifyVolumeRequests(h *modifyVolumeRequestHa
select {
case req := <-h.requestChan:
process(req)
case <-time.After(modifyVolumeRequestHandlerTimeout):
case <-time.After(d.driverOptions.modifyVolumeInterval):
d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID)
// At this point, no new requests can come in on the request channel because it has been removed from the map
// However, the request channel may still have requests waiting on it
Expand Down
33 changes: 21 additions & 12 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"github.com/container-storage-interface/spec/lib/go/csi"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"k8s.io/klog/v2"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
)

// Mode is the operating mode of the CSI driver.
Expand Down Expand Up @@ -62,16 +64,17 @@ type Driver struct {
}

type DriverOptions struct {
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
modifyVolumeInterval time.Duration
}

func NewDriver(options ...func(*DriverOptions)) (*Driver, error) {
Expand Down Expand Up @@ -167,6 +170,12 @@ func WithEndpoint(endpoint string) func(*DriverOptions) {
}
}

func WithCustomModifyVolumeInterval(interval time.Duration) func(*DriverOptions) {
return func(o *DriverOptions) {
o.modifyVolumeInterval = interval
}
}

func WithExtraTags(extraTags map[string]string) func(*DriverOptions) {
return func(o *DriverOptions) {
o.extraTags = extraTags
Expand Down

0 comments on commit 5f6fc61

Please sign in to comment.