Skip to content

Commit

Permalink
downloader: check if cid is in store before queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 11, 2024
1 parent 1880df3 commit 651f60c
Showing 1 changed file with 45 additions and 26 deletions.
71 changes: 45 additions & 26 deletions renterd/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type (
err error

cid cid.Cid
bucket string
key string
priority downloadPriority
index int
timestamp time.Time
Expand Down Expand Up @@ -132,14 +134,9 @@ func (br *blockResponse) block(ctx context.Context, c cid.Cid) (blocks.Block, er
return blocks.NewBlockWithCid(br.b, c)
}

func (bd *BlockDownloader) downloadBlockData(ctx context.Context, c cid.Cid) ([]byte, error) {
bucket, key, err := bd.store.BlockLocation(c)
if err != nil {
return nil, fmt.Errorf("failed to get block location: %w", err)
}

func (bd *BlockDownloader) downloadBlockData(ctx context.Context, c cid.Cid, bucket, key string) ([]byte, error) {
blockBuf := bytes.NewBuffer(make([]byte, 0, 2<<20))
err = bd.workerClient.DownloadObject(ctx, blockBuf, bucket, key, api.DownloadObjectOptions{})
err := bd.workerClient.DownloadObject(ctx, blockBuf, bucket, key, api.DownloadObjectOptions{})
if err != nil {
return nil, fmt.Errorf("failed to download block: %w", err)
}
Expand Down Expand Up @@ -168,16 +165,22 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
}

for _, sibling := range siblings {
bd.mu.Lock()
if bd.dataCache.Contains(cidKey(sibling)) {
bd.mu.Unlock()
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(sibling)
if err != nil {
continue
}

if _, ok := bd.queueBlock(sibling, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", sibling))
}
bd.mu.Unlock()
func() {
bd.mu.Lock()
defer bd.mu.Unlock()

if bd.dataCache.Contains(cidKey(sibling)) {
return
} else if _, ok := bd.queueBlock(sibling, bucket, key, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", sibling))
}
}()
}

children, err := bd.store.BlockChildren(task.cid, 10)
Expand All @@ -187,20 +190,26 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
}

for _, child := range children {
bd.mu.Lock()
if bd.dataCache.Contains(cidKey(child)) {
bd.mu.Unlock()
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(child)
if err != nil {
continue
}

if _, ok := bd.queueBlock(child, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", child))
}
bd.mu.Unlock()
func() {
bd.mu.Lock()
defer bd.mu.Unlock()

if bd.dataCache.Contains(cidKey(child)) {
return
} else if _, ok := bd.queueBlock(child, bucket, key, downloadPriorityLow); ok {
bd.log.Debug("queued child", zap.Stringer("cid", child))
}
}()
}
}()

buf, err := bd.downloadBlockData(ctx, task.cid)
buf, err := bd.downloadBlockData(ctx, task.cid, task.bucket, task.key)
if err != nil {
log.Error("failed to download block", zap.Error(err))
task.err = err
Expand Down Expand Up @@ -237,7 +246,7 @@ func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) {
}
}

func (bd *BlockDownloader) queueBlock(c cid.Cid, priority downloadPriority) (*blockResponse, bool) {
func (bd *BlockDownloader) queueBlock(c cid.Cid, bucket, key string, priority downloadPriority) (*blockResponse, bool) {
resp, ok := bd.inflight[cidKey(c)]
if ok {
if resp.priority < priority {
Expand All @@ -248,10 +257,14 @@ func (bd *BlockDownloader) queueBlock(c cid.Cid, priority downloadPriority) (*bl
}

resp = &blockResponse{
cid: c,
cid: c,
bucket: bucket,
key: key,

priority: priority,
timestamp: time.Now(),
ch: make(chan struct{}),

ch: make(chan struct{}),
}
bd.inflight[cidKey(c)] = resp
heap.Push(bd.queue, resp)
Expand All @@ -261,6 +274,12 @@ func (bd *BlockDownloader) queueBlock(c cid.Cid, priority downloadPriority) (*bl

// Get returns a block by CID.
func (bd *BlockDownloader) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(c)
if err != nil {
return nil, err
}

bd.mu.Lock()

log := bd.log.Named("Get").With(zap.Stringer("cid", c))
Expand All @@ -272,7 +291,7 @@ func (bd *BlockDownloader) Get(ctx context.Context, c cid.Cid) (blocks.Block, er
return blocks.NewBlockWithCid(blockData, c)
}

resp, _ := bd.queueBlock(c, downloadPriorityMax)
resp, _ := bd.queueBlock(c, bucket, key, downloadPriorityMax)
bd.mu.Unlock()
return resp.block(ctx, c)
}
Expand Down

0 comments on commit 651f60c

Please sign in to comment.