Skip to content

Commit

Permalink
gd2 plugin: added a plugin for block volume management
Browse files Browse the repository at this point in the history
 - added APIs for creation,deleting and listing block volumes.
 - added pluggable interface for block volume providers.

Refer Design Doc: gluster#1319

Signed-off-by: Oshank Kumar <[email protected]>
  • Loading branch information
Oshank Kumar committed Jan 4, 2019
1 parent 0839909 commit fe42d9e
Show file tree
Hide file tree
Showing 17 changed files with 1,360 additions and 71 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

[[constraint]]
name = "github.com/sirupsen/logrus"
version = "~1.0.3"
version = "~1.2.0"

[[constraint]]
name = "github.com/cockroachdb/cmux"
Expand Down
94 changes: 45 additions & 49 deletions glusterd2/commands/volumes/volume-create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package volumecommands

import (
"context"
"errors"
"net/http"
"path/filepath"
Expand Down Expand Up @@ -102,9 +103,6 @@ func registerVolCreateStepFuncs() {
func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "/volumeCreateHandler")
defer span.End()

logger := gdctx.GetReqLogger(ctx)
var err error

Expand All @@ -114,45 +112,68 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
return
}

if err := validateVolCreateReq(&req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err)
if status, err := CreateVolume(ctx, req); err != nil {
restutils.SendHTTPError(ctx, w, status, err)
return
}

if containsReservedGroupProfile(req.Options) {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, gderrors.ErrReservedGroupProfile)
volinfo, err := volume.GetVolume(req.Name)
if err != nil {
// FIXME: If volume was created successfully in the txn above and
// then the store goes down by the time we reach here, what do
// we return to the client ?
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

logger.WithField("volume-name", volinfo.Name).Info("new volume created")
events.Broadcast(volume.NewEvent(volume.EventVolumeCreated, volinfo))

resp := createVolumeCreateResp(volinfo)
restutils.SetLocationHeader(r, w, volinfo.Name)
restutils.SendHTTPResponse(ctx, w, http.StatusCreated, resp)
}

func createVolumeCreateResp(v *volume.Volinfo) *api.VolumeCreateResp {
return (*api.VolumeCreateResp)(volume.CreateVolumeInfoResp(v))
}

// CreateVolume creates a volume
func CreateVolume(ctx context.Context, req api.VolCreateReq) (status int, err error) {
ctx, span := trace.StartSpan(ctx, "/volumeCreateHandler")
defer span.End()

if err := validateVolCreateReq(&req); err != nil {
return http.StatusBadRequest, err
}

if containsReservedGroupProfile(req.Options) {
return http.StatusBadRequest, gderrors.ErrReservedGroupProfile
}

if req.Size > 0 {
applyDefaults(&req)

if req.SnapshotReserveFactor < 1 {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest,
errors.New("invalid snapshot reserve factor"))
return
return http.StatusBadRequest, errors.New("invalid snapshot reserve factor")
}

if err := bricksplanner.PlanBricks(&req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return http.StatusInternalServerError, err
}
} else {
if err := checkDupBrickEntryVolCreate(req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err)
return
return http.StatusBadRequest, err
}
}

req.Options, err = expandGroupOptions(req.Options)
if err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return http.StatusInternalServerError, err
}

if err := validateOptions(req.Options, req.VolOptionFlags); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err)
return
return http.StatusBadRequest, err
}

// Include default Volume Options profile
Expand All @@ -171,21 +192,17 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {

nodes, err := req.Nodes()
if err != nil {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err)
return
return http.StatusBadRequest, err
}

txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return restutils.ErrToStatusCode(err)
}
defer txn.Done()

if volume.Exists(req.Name) {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, gderrors.ErrVolExists)
return
return http.StatusBadRequest, gderrors.ErrVolExists
}

txn.Steps = []*transaction.Step{
Expand Down Expand Up @@ -219,8 +236,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
}

if err := txn.Ctx.Set("req", &req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return http.StatusInternalServerError, err
}

// Add attributes to the span with info that can be viewed along with traces.
Expand All @@ -231,28 +247,8 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
)

if err := txn.Do(); err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return restutils.ErrToStatusCode(err)
}

volinfo, err := volume.GetVolume(req.Name)
if err != nil {
// FIXME: If volume was created successfully in the txn above and
// then the store goes down by the time we reach here, what do
// we return to the client ?
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

logger.WithField("volume-name", volinfo.Name).Info("new volume created")
events.Broadcast(volume.NewEvent(volume.EventVolumeCreated, volinfo))

resp := createVolumeCreateResp(volinfo)
restutils.SetLocationHeader(r, w, volinfo.Name)
restutils.SendHTTPResponse(ctx, w, http.StatusCreated, resp)
}

func createVolumeCreateResp(v *volume.Volinfo) *api.VolumeCreateResp {
return (*api.VolumeCreateResp)(volume.CreateVolumeInfoResp(v))
return http.StatusCreated, nil
}
45 changes: 25 additions & 20 deletions glusterd2/commands/volumes/volume-start.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ func registerVolStartStepFuncs() {
func volumeStartHandler(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "/volumeStartHandler")
defer span.End()

logger := gdctx.GetReqLogger(ctx)
volname := mux.Vars(r)["volname"]
var req api.VolumeStartReq

Expand All @@ -142,24 +138,39 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) {
return
}

txn, err := transactionv2.NewTxnWithLocks(ctx, volname)
volinfo, status, err := StartVolume(ctx, volname, req)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}

events.Broadcast(volume.NewEvent(volume.EventVolumeStarted, volinfo))

resp := createVolumeStartResp(volinfo)
restutils.SendHTTPResponse(ctx, w, http.StatusOK, resp)
}

// StartVolume starts a volume
func StartVolume(ctx context.Context, volname string, req api.VolumeStartReq) (volInfo *volume.Volinfo, status int, err error) {
logger := gdctx.GetReqLogger(ctx)
ctx, span := trace.StartSpan(ctx, "/volumeStartHandler")
defer span.End()

txn, err := transactionv2.NewTxnWithLocks(ctx, volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
return nil, status, err
}
defer txn.Done()

volinfo, err := volume.GetVolume(volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return nil, status, err
}

if volinfo.State == volume.VolStarted && !req.ForceStartBricks {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errors.ErrVolAlreadyStarted)
return
return nil, http.StatusBadRequest, errors.ErrVolAlreadyStarted
}

txn.Steps = []*transaction.Step{
Expand All @@ -182,15 +193,13 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) {
}

if err := txn.Ctx.Set("oldvolinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}

volinfo.State = volume.VolStarted

if err := txn.Ctx.Set("volinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}

span.AddAttributes(
Expand All @@ -201,14 +210,10 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) {
if err := txn.Do(); err != nil {
logger.WithError(err).WithField(
"volume", volname).Error("transaction to start volume failed")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}

events.Broadcast(volume.NewEvent(volume.EventVolumeStarted, volinfo))

resp := createVolumeStartResp(volinfo)
restutils.SendHTTPResponse(ctx, w, http.StatusOK, resp)
return volinfo, http.StatusOK, nil
}

func createVolumeStartResp(v *volume.Volinfo) *api.VolumeStartResp {
Expand Down
2 changes: 2 additions & 0 deletions glusterd2/plugin/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package plugin

import (
"github.com/gluster/glusterd2/plugins/bitrot"
"github.com/gluster/glusterd2/plugins/blockvolume"
"github.com/gluster/glusterd2/plugins/device"
"github.com/gluster/glusterd2/plugins/events"
"github.com/gluster/glusterd2/plugins/georeplication"
Expand All @@ -25,4 +26,5 @@ var PluginsList = []GlusterdPlugin{
&glustershd.Plugin{},
&device.Plugin{},
&rebalance.Plugin{},
&blockvolume.BlockVolume{},
}
51 changes: 51 additions & 0 deletions glusterd2/volume/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package volume

const (
// BlockHosted is plugin name for FilterBlockHostedVolumes
BlockHosted = "block-hosted"
)

// Filter will receive a slice of *Volinfo and filters out the undesired one and return slice of desired one only
type Filter func([]*Volinfo) []*Volinfo

var filters = make(map[string]Filter)

// InstallFilter will register a custom Filter
func InstallFilter(name string, f Filter) {
filters[name] = f
}

// ApplyFilters applies all registered filters passed in the args to a slice of *Volinfo
func ApplyFilters(volumes []*Volinfo, names ...string) []*Volinfo {
for _, name := range names {
if filter, found := filters[name]; found {
volumes = filter(volumes)
}
}
return volumes
}

// ApplyCustomFilters applies all custom filter to a slice of *Volinfo
func ApplyCustomFilters(volumes []*Volinfo, filters ...Filter) []*Volinfo {
for _, filter := range filters {
volumes = filter(volumes)
}

return volumes
}

// FilterBlockHostedVolumes filters out volume which are suitable for hosting block volume
func FilterBlockHostedVolumes(volumes []*Volinfo) []*Volinfo {
var volInfos []*Volinfo
for _, volume := range volumes {
val, found := volume.Metadata["block-hosting"]
if found && val == "yes" {
volInfos = append(volInfos, volume)
}
}
return volInfos
}

func init() {
InstallFilter(BlockHosted, FilterBlockHostedVolumes)
}
Loading

0 comments on commit fe42d9e

Please sign in to comment.