Skip to content

Commit

Permalink
fix: reserve size to return chunks with po greater than and equal to …
Browse files Browse the repository at this point in the history
…storage radius (#3322)
  • Loading branch information
istae authored and alok committed Sep 23, 2022
1 parent 577e2ec commit e868754
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
37 changes: 37 additions & 0 deletions pkg/localstore/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,40 @@ func withinRadius(db *DB, item shed.Item) bool {
po := db.po(swarm.NewAddress(item.Address))
return po >= item.Radius
}

// ComputeReserveSize iterates on the pull index to count all chunks
// starting at some proximity order with an generated address whose PO
// is used as a starting prefix by the index.
func (db *DB) ComputeReserveSize(startPO uint8) (uint64, error) {

var count uint64

err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
count++
return false, nil
}, &shed.IterateOptions{
StartFrom: &shed.Item{
Address: generateAddressAt(db.baseKey, int(startPO)),
},
})

return count, err
}

func generateAddressAt(baseBytes []byte, prox int) []byte {

addr := make([]byte, 32)

for po := 0; po < prox; po++ {
index := po % 8
if baseBytes[po/8]&(1<<(7-index)) > 0 { // if baseBytes bit is 1
addr[po/8] |= 1 << (7 - index) // set addr bit to 1
}
}

if baseBytes[prox/8]&(1<<(7-(prox%8))) == 0 { // if baseBytes PO bit is zero
addr[prox/8] |= 1 << (7 - (prox % 8)) // set addr bit to 1
}

return addr
}
38 changes: 38 additions & 0 deletions pkg/localstore/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,44 @@ func TestReserveSize(t *testing.T) {
})
}

func TestComputeReserveSize(t *testing.T) {
t.Parallel()

const chunkCountPerPO = 10
const maxPO = 10
var chs []swarm.Chunk

db := newTestDB(t, &Options{
Capacity: 1000,
ReserveCapacity: 1000,
})

for po := 0; po < maxPO; po++ {
for i := 0; i < chunkCountPerPO; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), po).WithBatch(0, 3, 2, false)
chs = append(chs, ch)
}
}

_, err := db.Put(context.Background(), storage.ModePutSync, chs...)
if err != nil {
t.Fatal(err)
}

t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO))

for po := 0; po < maxPO; po++ {
got, err := db.ComputeReserveSize(uint8(po))
if err != nil {
t.Fatal(err)
}
want := (maxPO - po) * chunkCountPerPO
if got != uint64(want) {
t.Fatalf("compute reserve size mismatch, po %d, got %d, want %d", po, got, want)
}
}
}

func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) {
chunkCount := 100

Expand Down
8 changes: 5 additions & 3 deletions pkg/topology/depthmonitor/depthmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
// pledged by the node to the network.
type ReserveReporter interface {
// Current size of the reserve.
ReserveSize() (uint64, error)
ComputeReserveSize(uint8) (uint64, error)
// Capacity of the reserve that is configured.
ReserveCapacity() uint64
}
Expand Down Expand Up @@ -115,14 +115,16 @@ func (s *Service) manage(warmupTime time.Duration) {
case <-time.After(manageWait):
}

currentSize, err := s.reserve.ReserveSize()
reserveState := s.bs.GetReserveState()

currentSize, err := s.reserve.ComputeReserveSize(reserveState.StorageRadius)
if err != nil {
s.logger.Error(err, "depthmonitor: failed reading reserve size")
continue
}

rate := s.syncer.Rate()
s.logger.Debug("depthmonitor size and rate", "current size", currentSize, "chunks/sec rate", rate)
s.logger.Debug("depthmonitor: state", "current size", currentSize, "radius", reserveState.StorageRadius, "chunks/sec rate", rate)

// we have crossed 50% utilization
if currentSize > halfCapacity {
Expand Down
2 changes: 1 addition & 1 deletion pkg/topology/depthmonitor/depthmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ type mockReserveReporter struct {
size uint64
}

func (m *mockReserveReporter) ReserveSize() (uint64, error) {
func (m *mockReserveReporter) ComputeReserveSize(uint8) (uint64, error) {
m.Lock()
defer m.Unlock()
return m.size, nil
Expand Down

0 comments on commit e868754

Please sign in to comment.