Skip to content

Commit

Permalink
ipfs: raise bulk send parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed May 8, 2024
1 parent 4dd7537 commit d1f34b2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
18 changes: 11 additions & 7 deletions ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,18 @@ func NewNode(ctx context.Context, privateKey crypto.PrivKey, cfg config.IPFS, rs
return nil, fmt.Errorf("failed to create libp2p host: %w", err)
}

dhtOpts := []dht.Option{
dht.Mode(dht.ModeServer),
dht.BootstrapPeers(bootstrapPeers...),
dht.BucketSize(20),
dht.Concurrency(30),
dht.Datastore(ds),
fullRTOpts := []fullrt.Option{
fullrt.DHTOption([]dht.Option{
dht.Mode(dht.ModeServer),
dht.BootstrapPeers(bootstrapPeers...),
dht.BucketSize(40),
dht.Concurrency(60),
dht.Datastore(ds),
}...),
fullrt.WithBulkSendParallelism(256),
}
frt, err := fullrt.NewFullRT(host, dht.DefaultPrefix, fullrt.DHTOption(dhtOpts...))

frt, err := fullrt.NewFullRT(host, dht.DefaultPrefix, fullRTOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create fullrt: %w", err)
}
Expand Down
44 changes: 21 additions & 23 deletions ipfs/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,52 +71,50 @@ func (r *Reprovider) Run(ctx context.Context, interval time.Duration) {
continue
}

doProvide := func(ctx context.Context, keys []multihash.Multihash) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
return r.provider.ProvideMany(ctx, keys)
}

for {
start := time.Now()
cids, err := r.store.ProvideCIDs(5000)

cids, err := r.store.ProvideCIDs(1000)
if err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to fetch CIDs to provide", zap.Error(err))
break
}

if len(cids) == 0 {
r.log.Debug("no CIDs to provide")
reprovideSleep = 15 * time.Minute // set a minimum sleep time even
} else if len(cids) == 0 {
reprovideSleep = 10 * time.Minute
r.log.Debug("reprovide complete")
break
}

nextAnnounce := time.Until(cids[0].LastAnnouncement.Add(interval))

if nextAnnounce > 0 {
r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", nextAnnounce))
reprovideSleep = nextAnnounce
rem := time.Until(cids[0].LastAnnouncement.Add(interval))
if rem > 0 {
reprovideSleep = rem
r.log.Debug("reprovide complete")
break
}

announced := make([]cid.Cid, 0, len(cids))
keys := make([]multihash.Multihash, 0, len(cids))
for _, c := range cids {
if time.Since(c.LastAnnouncement) < interval {
reprovideSleep = time.Until(c.LastAnnouncement.Add(interval))
break
}

keys = append(keys, c.CID.Hash())
announced = append(announced, c.CID)
}

ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

if err := r.provider.ProvideMany(ctx, keys); err != nil {
if err := doProvide(ctx, keys); err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to provide CIDs", zap.Error(err))
break
} else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil {
r.log.Error("failed to update last announcement", zap.Error(err))
reprovideSleep = time.Minute
r.log.Error("failed to update last announcement time", zap.Error(err))
break
}

r.log.Debug("announced CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start)))
r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start)))
}
}
})
Expand Down

0 comments on commit d1f34b2

Please sign in to comment.