Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add replicator retry #3107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type DB interface {
// Peerstore returns the peerstore where known host information is stored.
//
// It sits within the rootstore returned by [Root].
Peerstore() datastore.DSBatching
Peerstore() datastore.DSReaderWriter

// Headstore returns the headstore where the current heads of the database are stored.
//
Expand Down
12 changes: 6 additions & 6 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions client/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,26 @@

package client

import "github.com/libp2p/go-libp2p/core/peer"
import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
)

// Replicator is a peer that a set of local collections are replicated to.
type Replicator struct {
Info peer.AddrInfo
Schemas []string
Info peer.AddrInfo
Schemas []string
Status ReplicatorStatus
LastStatusChange time.Time
}

// ReplicatorStatus is the status of a Replicator.
type ReplicatorStatus uint8

const (
// ReplicatorStatusActive is the status of a Replicator that is actively replicating.
ReplicatorStatusActive ReplicatorStatus = iota
// ReplicatorStatusInactive is the status of a Replicator that is inactive/offline.
ReplicatorStatusInactive
)
12 changes: 6 additions & 6 deletions datastore/mocks/txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions datastore/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package datastore

import (
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
)

var (
Expand All @@ -31,7 +30,7 @@ type multistore struct {
data DSReaderWriter
enc Blockstore
head DSReaderWriter
peer DSBatching
peer DSReaderWriter
system DSReaderWriter
dag Blockstore
}
Expand All @@ -46,7 +45,7 @@ func MultiStoreFrom(rootstore ds.Datastore) MultiStore {
data: prefix(rootRW, dataStoreKey),
enc: newBlockstore(prefix(rootRW, encStoreKey)),
head: prefix(rootRW, headStoreKey),
peer: namespace.Wrap(rootstore, peerStoreKey),
peer: prefix(rootRW, peerStoreKey),
system: prefix(rootRW, systemStoreKey),
dag: newBlockstore(prefix(rootRW, blockStoreKey)),
}
Expand All @@ -70,7 +69,7 @@ func (ms multistore) Headstore() DSReaderWriter {
}

// Peerstore implements MultiStore.
func (ms multistore) Peerstore() DSBatching {
func (ms multistore) Peerstore() DSReaderWriter {
return ms.peer
}

Expand Down
7 changes: 1 addition & 6 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type MultiStore interface {

// Peerstore is a wrapped root DSReaderWriter as a ds.Batching, embedded into a DSBatching
// under the /peers namespace
Peerstore() DSBatching
Peerstore() DSReaderWriter

// Blockstore is a wrapped root DSReaderWriter as a Blockstore, embedded into a Blockstore
// under the /blocks namespace
Expand Down Expand Up @@ -81,8 +81,3 @@ type IPLDStorage interface {
storage.ReadableStorage
storage.WritableStorage
}

// DSBatching wraps the Batching interface from go-datastore
type DSBatching interface {
ds.Batching
}
9 changes: 9 additions & 0 deletions docs/website/references/http/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,20 @@
},
"type": "object"
},
"LastStatusChange": {
"format": "date-time",
"type": "string"
},
"Schemas": {
"items": {
"type": "string"
},
"type": "array"
},
"Status": {
"maximum": 255,
"minimum": 0,
"type": "integer"
}
},
"type": "object"
Expand Down
18 changes: 16 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
PeerInfoName = Name("peer-info")
// ReplicatorName is the name of the replicator event.
ReplicatorName = Name("replicator")
// ReplicatorFailureName is the name of the replicator failure event.
ReplicatorFailureName = Name("replicator-failure")
// P2PTopicCompletedName is the name of the network p2p topic update completed event.
P2PTopicCompletedName = Name("p2p-topic-completed")
// ReplicatorCompletedName is the name of the replicator completed event.
Expand Down Expand Up @@ -68,8 +70,12 @@ type Update struct {
// also formed this update.
Block []byte

// IsCreate is true if this update is the creation of a new document.
IsCreate bool
// IsRetry is true if this update is a retry of a previously failed update.
IsRetry bool

// Success is a channel that will receive a boolean value indicating if the update was successful.
// It is used during retries.
Success chan bool
}

// Merge is a notification that a merge can be performed up to the provided CID.
Expand Down Expand Up @@ -137,3 +143,11 @@ type Replicator struct {
// and those collections have documents to be replicated.
Docs <-chan Update
}

// ReplicatorFailure is an event that is published when a replicator fails to replicate a document.
type ReplicatorFailure struct {
// PeerID is the id of the peer that failed to replicate the document.
PeerID peer.ID
// DocID is the unique immutable identifier of the document that failed to replicate.
DocID string
}
2 changes: 1 addition & 1 deletion http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@
panic("client side database")
}

func (c *Client) Peerstore() datastore.DSBatching {
func (c *Client) Peerstore() datastore.DSReaderWriter {

Check warning on line 496 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L496

Added line #L496 was not covered by tests
panic("client side database")
}

Expand Down
2 changes: 1 addition & 1 deletion http/client_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
panic("client side transaction")
}

func (c *Transaction) Peerstore() datastore.DSBatching {
func (c *Transaction) Peerstore() datastore.DSReaderWriter {

Check warning on line 102 in http/client_tx.go

View check run for this annotation

Codecov / codecov/patch

http/client_tx.go#L102

Added line #L102 was not covered by tests
panic("client side transaction")
}

Expand Down
81 changes: 80 additions & 1 deletion internal/core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
FIELD_ID_SEQ = "/seq/field"
PRIMARY_KEY = "/pk"
DATASTORE_DOC_VERSION_FIELD_ID = "v"
REPLICATOR = "/replicator/id"
P2P_COLLECTION = "/p2p/collection"
REPLICATOR = "/rep/id"
REPLICATOR_RETRY_ID = "/rep/retry/id"
REPLICATOR_RETRY_DOC = "/rep/retry/doc"
)

// Key is an interface that represents a key in the database.
Expand Down Expand Up @@ -946,3 +948,80 @@
// maximal byte string (i.e. already \xff...).
return b
}

type ReplicatorRetryIDKey struct {
PeerID string
}

var _ Key = (*ReplicatorRetryIDKey)(nil)

func NewReplicatorRetryIDKey(peerID string) ReplicatorRetryIDKey {
return ReplicatorRetryIDKey{
PeerID: peerID,
}
}

// NewReplicatorRetryIDKeyFromString creates a new [ReplicatorRetryIDKey] from a string.
//
// It expects the input string to be in the format `/rep/retry/id/[PeerID]`.
func NewReplicatorRetryIDKeyFromString(key string) (ReplicatorRetryIDKey, error) {
peerID := strings.TrimPrefix(key, REPLICATOR_RETRY_ID+"/")
if peerID == "" {
return ReplicatorRetryIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key))

Check warning on line 970 in internal/core/key.go

View check run for this annotation

Codecov / codecov/patch

internal/core/key.go#L970

Added line #L970 was not covered by tests
}
return NewReplicatorRetryIDKey(peerID), nil
}

func (k ReplicatorRetryIDKey) ToString() string {
return REPLICATOR_RETRY_ID + "/" + k.PeerID
}

func (k ReplicatorRetryIDKey) Bytes() []byte {
return []byte(k.ToString())

Check warning on line 980 in internal/core/key.go

View check run for this annotation

Codecov / codecov/patch

internal/core/key.go#L979-L980

Added lines #L979 - L980 were not covered by tests
}

func (k ReplicatorRetryIDKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

type ReplicatorRetryDocIDKey struct {
PeerID string
DocID string
}

var _ Key = (*ReplicatorRetryDocIDKey)(nil)

func NewReplicatorRetryDocIDKey(peerID, docID string) ReplicatorRetryDocIDKey {
return ReplicatorRetryDocIDKey{
PeerID: peerID,
DocID: docID,
}
}

// NewReplicatorRetryDocIDKeyFromString creates a new [ReplicatorRetryDocIDKey] from a string.
//
// It expects the input string to be in the format `/rep/retry/doc/[PeerID]/[DocID]`.
func NewReplicatorRetryDocIDKeyFromString(key string) (ReplicatorRetryDocIDKey, error) {
trimmedKey := strings.TrimPrefix(key, REPLICATOR_RETRY_DOC+"/")
keyArr := strings.Split(trimmedKey, "/")
if len(keyArr) != 2 {
return ReplicatorRetryDocIDKey{}, errors.WithStack(ErrInvalidKey, errors.NewKV("Key", key))

Check warning on line 1008 in internal/core/key.go

View check run for this annotation

Codecov / codecov/patch

internal/core/key.go#L1008

Added line #L1008 was not covered by tests
}
return NewReplicatorRetryDocIDKey(keyArr[0], keyArr[1]), nil
}

func (k ReplicatorRetryDocIDKey) ToString() string {
keyString := REPLICATOR_RETRY_DOC + "/" + k.PeerID
if k.DocID != "" {
keyString += "/" + k.DocID
}
return keyString
}

func (k ReplicatorRetryDocIDKey) Bytes() []byte {
return []byte(k.ToString())

Check warning on line 1022 in internal/core/key.go

View check run for this annotation

Codecov / codecov/patch

internal/core/key.go#L1021-L1022

Added lines #L1021 - L1022 were not covered by tests
}

func (k ReplicatorRetryDocIDKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}
1 change: 0 additions & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ func (c *collection) save(
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
IsCreate: isCreate,
}
txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
Expand Down
29 changes: 28 additions & 1 deletion internal/db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package db

import (
"time"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -20,7 +22,24 @@ const (
)

type dbOptions struct {
maxTxnRetries immutable.Option[int]
maxTxnRetries immutable.Option[int]
RetryIntervals []time.Duration
}

// defaultOptions returns the default db options.
func defaultOptions() *dbOptions {
return &dbOptions{
RetryIntervals: []time.Duration{
// exponential backoff retry intervals
time.Second * 30,
time.Minute,
time.Minute * 2,
time.Minute * 4,
time.Minute * 8,
time.Minute * 16,
time.Minute * 32,
Comment on lines +33 to +40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Interesting strat! Do we want the retry intervals to be configurable?

Copy link
Collaborator Author

@fredcarle fredcarle Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a ticket to make the retry configurable #3073. It is already configurable if devs use the go api though.

},
}
}

// Option is a funtion that sets a config value on the db.
Expand All @@ -32,3 +51,11 @@ func WithMaxRetries(num int) Option {
opts.maxTxnRetries = immutable.Some(num)
}
}

func WithRetryInterval(interval []time.Duration) Option {
return func(opt *dbOptions) {
if len(interval) > 0 {
opt.RetryIntervals = interval
}
}
}
Loading
Loading