Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: add ReadIterate implementing rbd_read_iterate2 #624

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions rbd/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,15 +706,13 @@ func (image *Image) BreakLock(client string, cookie string) error {
return getError(C.rbd_break_lock(image.image, cClient, cCookie))
}

// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, char *buf);
// TODO: int64_t rbd_read_iterate(rbd_image_t image, uint64_t ofs, size_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_read_iterate2(rbd_image_t image, uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_diff_iterate(rbd_image_t image,
// const char *fromsnapname,
// uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, int, void *), void *arg);
// Read data from the image. The length of the read is determined by the length
// of the buffer slice. The position of the read is determined by an internal
// offset which is not safe in concurrent code. Prefer ReadAt when possible.
//
// Implements:
// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len,
// char *buf);
func (image *Image) Read(data []byte) (int, error) {
if err := image.validate(imageIsOpen); err != nil {
return 0, err
Expand Down Expand Up @@ -742,7 +740,13 @@ func (image *Image) Read(data []byte) (int, error) {
return ret, nil
}

// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, const char *buf);
// Write data to an image. The length of the write is determined by the length of
// the buffer slice. The position of the write is determined by an internal
// offset which is not safe in concurrent code. Prefer WriteAt when possible.
//
// Implements:
// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len,
// const char *buf);
func (image *Image) Write(data []byte) (n int, err error) {
if err := image.validate(imageIsOpen); err != nil {
return 0, err
Expand Down
94 changes: 94 additions & 0 deletions rbd/read_iterate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package rbd

/*
#cgo LDFLAGS: -lrbd
#undef _GNU_SOURCE
#include <errno.h>
#include <stdlib.h>
#include <rbd/librbd.h>

extern int readIterateCallback(uint64_t, size_t, char*, uintptr_t);

// inline wrapper to cast uintptr_t to void*
static inline int wrap_rbd_read_iterate2(
rbd_image_t image, uint64_t ofs, uint64_t len, uintptr_t arg) {
return rbd_read_iterate2(image, ofs, len,
(void*)readIterateCallback, (void*)arg);
};
*/
import "C"

import (
"unsafe"

"github.com/ceph/go-ceph/internal/callbacks"
)

var readIterateCallbacks = callbacks.New()

// ReadIterateCallback defines the function signature needed for the
// ReadIterate callback function.
//
// The function will be called with the arguments: offset, length, buffer, and
// data. The offset and length correspond to a region of the image. The buffer
// will contain the data read from the image, or be nil if the region in the
// image is zeroed (a hole). The data value is an extra private parameter that
// can be set in the ReadIterateConfig and is meant to be used for passing
// arbitrary user-defined items to the callback function.
type ReadIterateCallback func(uint64, uint64, []byte, interface{}) int

// ReadIterateConfig is used to define the parameters of a ReadIterate call.
// Callback, Offset, and Length should always be specified when passed to
// ReadIterate. The data value is optional.
type ReadIterateConfig struct {
Offset uint64
Length uint64
Callback ReadIterateCallback
Data interface{}
}

// ReadIterate walks over regions in the image, calling the callback function
// for each region. Typically, the size of each region is the stripe size of
// the image.
//
// Implements:
// int rbd_read_iterate2(rbd_image_t image,
// uint64_t ofs,
// uint64_t len,
// int (*cb)(uint64_t, size_t, const char *, void *),
// void *arg);
func (image *Image) ReadIterate(config ReadIterateConfig) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
// the provided callback must be a real function
if config.Callback == nil {
return rbdError(C.EINVAL)
}

cbIndex := readIterateCallbacks.Add(config)
defer readIterateCallbacks.Remove(cbIndex)

ret := C.wrap_rbd_read_iterate2(
image.image,
C.uint64_t(config.Offset),
C.uint64_t(config.Length),
C.uintptr_t(cbIndex))

return getError(ret)
}

//export readIterateCallback
func readIterateCallback(
offset C.uint64_t, length C.size_t, cbuf *C.char, index uintptr) C.int {

var gbuf []byte
v := readIterateCallbacks.Lookup(index)
config := v.(ReadIterateConfig)
if cbuf != nil {
// should we assert than length is < max val C.int?
gbuf = C.GoBytes(unsafe.Pointer(cbuf), C.int(length))
}
return C.int(config.Callback(
uint64(offset), uint64(length), gbuf, config.Data))
}
74 changes: 74 additions & 0 deletions rbd/read_iterate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package rbd

import (
"fmt"
_ "sync"
"testing"
_ "time"

"github.com/ceph/go-ceph/rados"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReadIterate(t *testing.T) {
conn := radosConnect(t)
defer conn.Shutdown()

poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
defer conn.DeletePool(poolname)

ioctx, err := conn.OpenIOContext(poolname)
require.NoError(t, err)
defer ioctx.Destroy()

t.Run("basic", func(t *testing.T) {
testReadIterateBasic(t, ioctx)
})
}

func testReadIterateBasic(t *testing.T, ioctx *rados.IOContext) {
name := GetUUID()
isize := uint64(1 << 23) // 8MiB
iorder := 20 // 1MiB
options := NewRbdImageOptions()
defer options.Destroy()
assert.NoError(t,
options.SetUint64(RbdImageOptionOrder, uint64(iorder)))
err := CreateImage(ioctx, name, isize, options)
assert.NoError(t, err)

img, err := OpenImage(ioctx, name, NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, img.Close())
assert.NoError(t, img.Remove())
}()

_, err = img.WriteAt([]byte("mary had a little lamb"), 0)
assert.NoError(t, err)
_, err = img.WriteAt([]byte("it's fleece was white as #FFFFF"), 2048)
assert.NoError(t, err)

//_, err = img.Discard(0, 1<<23)
//assert.NoError(t, err)
assert.NoError(t, img.Close())
img, err = OpenImage(ioctx, name, NoSnapshot)

err = img.ReadIterate(ReadIterateConfig{
Offset: 0,
Length: isize,
Callback: func(o, l uint64, b []byte, d interface{}) int {
if b == nil {
fmt.Println("ZZZ", o, l)
return 0
}
fmt.Println("QQQ", o, l, b)
return 0
},
})
assert.NoError(t, err)
}