Skip to content

Commit

Permalink
downloader: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 11, 2024
1 parent bef6483 commit 1880df3
Showing 1 changed file with 109 additions and 90 deletions.
199 changes: 109 additions & 90 deletions renterd/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"bytes"
"container/heap"
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -21,7 +21,7 @@ import (
)

const (
downloadPriorityLow = iota + 1
downloadPriorityLow downloadPriority = iota + 1
downloadPriorityMedium
downloadPriorityHigh
downloadPriorityMax
Expand Down Expand Up @@ -65,9 +65,10 @@ type (

ch chan struct{}

mu sync.Mutex // protects the fields below
cache *lru.TwoQueueCache[string, *blockResponse]
queue *priorityQueue
mu sync.Mutex // protects the fields below
inflight map[string]*blockResponse
queue *priorityQueue
dataCache *lru.TwoQueueCache[string, []byte]
}
)

Expand Down Expand Up @@ -131,80 +132,86 @@ func (br *blockResponse) block(ctx context.Context, c cid.Cid) (blocks.Block, er
return blocks.NewBlockWithCid(br.b, c)
}

func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) {
log = log.Named("doDownloadTask").With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority))
blockBuf := bytes.NewBuffer(make([]byte, 0, 2<<20))

start := time.Now()
bucket, key, err := bd.store.BlockLocation(task.cid)
func (bd *BlockDownloader) downloadBlockData(ctx context.Context, c cid.Cid) ([]byte, error) {
bucket, key, err := bd.store.BlockLocation(c)
if err != nil {
log.Error("failed to get block location", zap.Error(err))
task.err = err
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
return nil, fmt.Errorf("failed to get block location: %w", err)
}

log.Debug("downloading block", zap.String("bucket", bucket), zap.String("key", key))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

blockBuf := bytes.NewBuffer(make([]byte, 0, 2<<20))
err = bd.workerClient.DownloadObject(ctx, blockBuf, bucket, key, api.DownloadObjectOptions{})
if err != nil {
log.Error("failed to download block", zap.Error(err))
task.err = err
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
return nil, fmt.Errorf("failed to download block: %w", err)
}

c := task.cid
h, err := mh.Sum(blockBuf.Bytes(), c.Prefix().MhType, -1)
if err != nil {
log.Error("failed to hash block", zap.Error(err))
task.err = err
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
return nil, fmt.Errorf("failed to verify block: %w", err)
} else if c.Hash().HexString() != h.HexString() {
log.Error("block hash mismatch", zap.String("expected", c.Hash().HexString()), zap.String("actual", h.HexString()))
task.err = errors.New("block hash mismatch")
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
return nil, fmt.Errorf("block hash mismatch: expected %s, actual %s", c.Hash().HexString(), h.HexString())
}
log.Info("downloaded block", zap.Duration("elapsed", time.Since(start)))
task.b = blockBuf.Bytes()
close(task.ch)
return blockBuf.Bytes(), nil
}

func (bd *BlockDownloader) getResponse(c cid.Cid, priority downloadPriority) *blockResponse {
bd.mu.Lock()
defer bd.mu.Unlock()
key := cidKey(c)

if task, ok := bd.cache.Get(key); ok {
bd.log.Debug("cache hit", zap.String("key", key))
// update the priority if the task is still queued
if task.priority < priority && task.index != -1 {
task.priority = priority
heap.Fix(bd.queue, task.index)
}
return task
}
task := &blockResponse{
cid: c,
priority: priority,
timestamp: time.Now(),
ch: make(chan struct{}),
}
bd.cache.Add(key, task)
heap.Push(bd.queue, task)
func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) {
start := time.Now()
log = log.Named("doDownloadTask").With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

go func() {
bd.ch <- struct{}{}
siblings, err := bd.store.BlockSiblings(task.cid, 10)
if err != nil {
log.Error("failed to get block siblings", zap.Error(err))
return
}

for _, sibling := range siblings {
bd.mu.Lock()
if bd.dataCache.Contains(cidKey(sibling)) {
bd.mu.Unlock()
continue
}

if _, ok := bd.queueBlock(sibling, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", sibling))
}
bd.mu.Unlock()
}

children, err := bd.store.BlockChildren(task.cid, 10)
if err != nil {
log.Error("failed to get block children", zap.Error(err))
return
}

for _, child := range children {
bd.mu.Lock()
if bd.dataCache.Contains(cidKey(child)) {
bd.mu.Unlock()
continue
}

if _, ok := bd.queueBlock(child, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", child))
}
bd.mu.Unlock()
}
}()
return task

buf, err := bd.downloadBlockData(ctx, task.cid)
if err != nil {
log.Error("failed to download block", zap.Error(err))
task.err = err
delete(bd.inflight, cidKey(task.cid))
} else {
log.Debug("block downloaded", zap.Int("size", len(buf)), zap.Duration("elapsed", time.Since(start)))
task.b = buf
bd.dataCache.Add(cidKey(task.cid), buf)
}
close(task.ch)

}

Check failure on line 215 in renterd/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / test

unnecessary trailing newline (whitespace)

Check failure on line 215 in renterd/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.21)

unnecessary trailing newline (whitespace)

func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) {
Expand All @@ -230,33 +237,43 @@ func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) {
}
}

func (bd *BlockDownloader) queueBlock(c cid.Cid, priority downloadPriority) (*blockResponse, bool) {
resp, ok := bd.inflight[cidKey(c)]
if ok {
if resp.priority < priority {
resp.priority = priority
heap.Fix(bd.queue, resp.index)
}
return resp, false
}

resp = &blockResponse{
cid: c,
priority: priority,
timestamp: time.Now(),
ch: make(chan struct{}),
}
bd.inflight[cidKey(c)] = resp
heap.Push(bd.queue, resp)
bd.ch <- struct{}{}
return resp, true
}

// Get returns a block by CID.
func (bd *BlockDownloader) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
bd.mu.Lock()

log := bd.log.Named("Get").With(zap.Stringer("cid", c))
resp := bd.getResponse(c, downloadPriorityMax)
go func() {
// prefetch children
children, err := bd.store.BlockChildren(c, 10)
if err != nil {
log.Error("failed to get block children", zap.Error(err))
} else if len(children) > 0 {
log.Debug("prefetching children", zap.Int("count", len(children)))
}
for _, child := range children {
bd.getResponse(child, downloadPriorityLow)
}

// prefetch siblings
siblings, err := bd.store.BlockSiblings(c, 50)
if err != nil {
log.Error("failed to get block siblings", zap.Error(err))
} else if len(siblings) > 0 {
log.Debug("prefetching siblings", zap.Int("count", len(siblings)))
}
for _, sibling := range siblings {
bd.getResponse(sibling, downloadPriorityMedium)
}
}()
// check if the block data is already in the cache
if blockData, ok := bd.dataCache.Get(cidKey(c)); ok {
bd.mu.Unlock()
log.Debug("cache hit")
return blocks.NewBlockWithCid(blockData, c)
}

resp, _ := bd.queueBlock(c, downloadPriorityMax)
bd.mu.Unlock()
return resp.block(ctx, c)
}

Expand All @@ -266,7 +283,7 @@ func cidKey(c cid.Cid) string {

// NewBlockDownloader creates a new BlockDownloader.
func NewBlockDownloader(store MetadataStore, bucket string, cacheSize, workers int, workerClient *worker.Client, log *zap.Logger) (*BlockDownloader, error) {
cache, err := lru.New2Q[string, *blockResponse](cacheSize)
cache, err := lru.New2Q[string, []byte](cacheSize)
if err != nil {
return nil, err
}
Expand All @@ -275,8 +292,10 @@ func NewBlockDownloader(store MetadataStore, bucket string, cacheSize, workers i
store: store,
workerClient: workerClient,
log: log,
cache: cache,
queue: &priorityQueue{},

inflight: make(map[string]*blockResponse),
queue: &priorityQueue{},
dataCache: cache,

ch: make(chan struct{}, workers),
bucket: bucket,
Expand Down

0 comments on commit 1880df3

Please sign in to comment.