diff --git a/ipfs/node_test.go b/ipfs/node_test.go index ca07dc7..8464a66 100644 --- a/ipfs/node_test.go +++ b/ipfs/node_test.go @@ -237,54 +237,54 @@ func TestDownload2(t *testing.T) { } /* -func TestPin(t *testing.T) { - log := zaptest.NewLogger(t) + func TestPin(t *testing.T) { + log := zaptest.NewLogger(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - privateKey, _, err := crypto.GenerateEd25519Key(frand.Reader) - if err != nil { - t.Fatal(err) - } + privateKey, _, err := crypto.GenerateEd25519Key(frand.Reader) + if err != nil { + t.Fatal(err) + } - store, err := badger.OpenDatabase(filepath.Join(t.TempDir(), "fsd.badgerdb"), log.Named("badger")) - if err != nil { - log.Fatal("failed to open badger database", zap.Error(err)) - } - defer store.Close() + store, err := badger.OpenDatabase(filepath.Join(t.TempDir(), "fsd.badgerdb"), log.Named("badger")) + if err != nil { + log.Fatal("failed to open badger database", zap.Error(err)) + } + defer store.Close() - memBlockStore := &memoryBlockStore{ - blocks: make(map[cid.Cid][]byte), - } + memBlockStore := &memoryBlockStore{ + blocks: make(map[cid.Cid][]byte), + } - node, err := ipfs.NewNode(ctx, privateKey, config.IPFS{}, memBlockStore) - if err != nil { - t.Fatal(err) - } - defer node.Close() + node, err := ipfs.NewNode(ctx, privateKey, config.IPFS{}, memBlockStore) + if err != nil { + t.Fatal(err) + } + defer node.Close() - time.Sleep(time.Second) + time.Sleep(time.Second) - c := cid.MustParse("QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm") + c := cid.MustParse("QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm") - _, err = node.PinCID(ctx, c, func(c cid.Cid, path string) error { - r, err := node.DownloadCID(ctx, c, nil) + _, err = node.PinCID(ctx, c, func(c cid.Cid, path string) error { + r, err := node.DownloadCID(ctx, c, nil) + if err != nil { + return err + } + defer r.Close() + h := sha256.New() + if _, err := io.Copy(h, r); err != nil { + return err + } + shaChecksum := hex.EncodeToString(h.Sum(nil)) + t.Log("pinned", c, path, shaChecksum) + return nil + }) if err != nil { - return err - } - defer r.Close() - h := sha256.New() - if _, err := io.Copy(h, r); err != nil { - return err + t.Fatal(err) } - shaChecksum := hex.EncodeToString(h.Sum(nil)) - t.Log("pinned", c, path, shaChecksum) - return nil - }) - if err != nil { - t.Fatal(err) + t.Fail() } - t.Fail() -} */ diff --git a/renterd/downloader/downloader.go b/renterd/downloader/downloader.go index 9c62932..7302a81 100644 --- a/renterd/downloader/downloader.go +++ b/renterd/downloader/downloader.go @@ -210,17 +210,18 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) }() buf, err := bd.downloadBlockData(ctx, task.cid, task.bucket, task.key) + bd.mu.Lock() if err != nil { log.Error("failed to download block", zap.Error(err)) task.err = err - delete(bd.inflight, cidKey(task.cid)) } else { log.Debug("block downloaded", zap.Int("size", len(buf)), zap.Duration("elapsed", time.Since(start))) task.b = buf bd.dataCache.Add(cidKey(task.cid), buf) } close(task.ch) - + delete(bd.inflight, cidKey(task.cid)) + bd.mu.Unlock() } func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) { @@ -241,7 +242,7 @@ func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) { task := heap.Pop(bd.queue).(*blockResponse) log := log.With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority)) log.Debug("popped task from queue") - bd.mu.Unlock() + bd.mu.Unlock() // unlock prior to downloading to prevent blocking other workers bd.doDownloadTask(task, log) } }