Skip to content

Commit

Permalink
les, les/lespay: implement new server pool (ethereum#20758)
Browse files Browse the repository at this point in the history
This PR reimplements the light client server pool. It is also a first step
to move certain logic into a new lespay package. This package will contain
the implementation of the lespay token sale functions, the token buying and
selling logic and other components related to peer selection/prioritization
and service quality evaluation. Over the long term this package will be
reusable for incentivizing future protocols.

Since the LES peer logic is now based on enode.Iterator, it can now use
DNS-based fallback discovery to find servers.

This document describes the function of the new components:
https://gist.github.com/zsfelfoldi/3c7ace895234b7b345ab4f71dab102d4
  • Loading branch information
zsfelfoldi authored May 22, 2020
1 parent 65ce550 commit b4a2681
Show file tree
Hide file tree
Showing 30 changed files with 2,884 additions and 1,008 deletions.
21 changes: 14 additions & 7 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,19 +1563,19 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.NetworkId = 3
}
cfg.Genesis = core.DefaultRopstenGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.RopstenGenesisHash])
setDNSDiscoveryDefaults(cfg, params.RopstenGenesisHash)
case ctx.GlobalBool(RinkebyFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 4
}
cfg.Genesis = core.DefaultRinkebyGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.RinkebyGenesisHash])
setDNSDiscoveryDefaults(cfg, params.RinkebyGenesisHash)
case ctx.GlobalBool(GoerliFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 5
}
cfg.Genesis = core.DefaultGoerliGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.GoerliGenesisHash])
setDNSDiscoveryDefaults(cfg, params.GoerliGenesisHash)
case ctx.GlobalBool(DeveloperFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 1337
Expand Down Expand Up @@ -1604,18 +1604,25 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
}
default:
if cfg.NetworkId == 1 {
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.MainnetGenesisHash])
setDNSDiscoveryDefaults(cfg, params.MainnetGenesisHash)
}
}
}

// setDNSDiscoveryDefaults configures DNS discovery with the given URL if
// no URLs are set.
func setDNSDiscoveryDefaults(cfg *eth.Config, url string) {
func setDNSDiscoveryDefaults(cfg *eth.Config, genesis common.Hash) {
if cfg.DiscoveryURLs != nil {
return
return // already set through flags/config
}

protocol := "eth"
if cfg.SyncMode == downloader.LightSync {
protocol = "les"
}
if url := params.KnownDNSNetwork(genesis, protocol); url != "" {
cfg.DiscoveryURLs = []string{url}
}
cfg.DiscoveryURLs = []string{url}
}

// RegisterEthService adds an Ethereum client to the stack.
Expand Down
18 changes: 15 additions & 3 deletions core/forkid/forkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
Expand All @@ -44,6 +44,18 @@ var (
ErrLocalIncompatibleOrStale = errors.New("local incompatible or needs update")
)

// Blockchain defines all necessary method to build a forkID.
type Blockchain interface {
// Config retrieves the chain's fork configuration.
Config() *params.ChainConfig

// Genesis retrieves the chain's genesis block.
Genesis() *types.Block

// CurrentHeader retrieves the current head header of the canonical chain.
CurrentHeader() *types.Header
}

// ID is a fork identifier as defined by EIP-2124.
type ID struct {
Hash [4]byte // CRC32 checksum of the genesis block and passed fork block numbers
Expand All @@ -54,7 +66,7 @@ type ID struct {
type Filter func(id ID) error

// NewID calculates the Ethereum fork ID from the chain config and head.
func NewID(chain *core.BlockChain) ID {
func NewID(chain Blockchain) ID {
return newID(
chain.Config(),
chain.Genesis().Hash(),
Expand Down Expand Up @@ -85,7 +97,7 @@ func newID(config *params.ChainConfig, genesis common.Hash, head uint64) ID {

// NewFilter creates a filter that returns if a fork ID should be rejected or not
// based on the local chain's status.
func NewFilter(chain *core.BlockChain) Filter {
func NewFilter(chain Blockchain) Filter {
return newFilter(
chain.Config(),
chain.Genesis().Hash(),
Expand Down
6 changes: 3 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Ethereum struct {
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
dialCandiates enode.Iterator
dialCandidates enode.Iterator

// DB interfaces
chainDb ethdb.Database // Block chain database
Expand Down Expand Up @@ -226,7 +226,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

eth.dialCandiates, err = eth.setupDiscovery(&ctx.Config.P2P)
eth.dialCandidates, err = eth.setupDiscovery(&ctx.Config.P2P)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -523,7 +523,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
for i, vsn := range ProtocolVersions {
protos[i] = s.protocolManager.makeProtocol(vsn)
protos[i].Attributes = []enr.Entry{s.currentEthEntry()}
protos[i].DialCandidates = s.dialCandiates
protos[i].DialCandidates = s.dialCandidates
}
if s.lesServer != nil {
protos = append(protos, s.lesServer.Protocols()...)
Expand Down
54 changes: 30 additions & 24 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ import (
type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker
peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker
dialCandidates enode.Iterator

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
Expand Down Expand Up @@ -104,11 +105,19 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb),
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
serverPool: newServerPool(chainDb, config.UltraLightServers),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)

dnsdisc, err := leth.setupDiscovery(&ctx.Config.P2P)
if err != nil {
return nil, err
}
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, dnsdisc, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)

leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
Expand Down Expand Up @@ -140,11 +149,6 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain)

leth.handler = newClientHandler(config.UltraLightServers, config.UltraLightFraction, checkpoint, leth)
if leth.handler.ulc != nil {
log.Warn("Ultra light client is enabled", "trustedNodes", len(leth.handler.ulc.keys), "minTrustedFraction", leth.handler.ulc.fraction)
leth.blockchain.DisableCheckFreq()
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
Expand All @@ -159,6 +163,11 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams)

leth.handler = newClientHandler(config.UltraLightServers, config.UltraLightFraction, checkpoint, leth)
if leth.handler.ulc != nil {
log.Warn("Ultra light client is enabled", "trustedNodes", len(leth.handler.ulc.keys), "minTrustedFraction", leth.handler.ulc.fraction)
leth.blockchain.DisableCheckFreq()
}
return leth, nil
}

Expand Down Expand Up @@ -260,30 +269,29 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
return p.Info()
}
return nil
})
}, s.dialCandidates)
}

// Start implements node.Service, starting all internal goroutines needed by the
// light ethereum protocol implementation.
func (s *LightEthereum) Start(srvr *p2p.Server) error {
log.Warn("Light client mode is an experimental feature")

s.serverPool.start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient)

s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.config.NetworkId)

// clients are searching for the first advertised protocol in the list
protocolVersion := AdvertiseProtocolVersions[0]
s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash(), protocolVersion))
return nil
}

// Stop implements node.Service, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.serverPool.stop()
s.valueTracker.Stop()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
Expand All @@ -295,8 +303,6 @@ func (s *LightEthereum) Stop() error {
s.txPool.Stop()
s.engine.Close()
s.eventMux.Stop()
s.serverPool.stop()
s.valueTracker.Stop()
s.chainDb.Close()
s.wg.Wait()
log.Info("Light ethereum stopped")
Expand Down
11 changes: 1 addition & 10 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
if checkpoint != nil {
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
handler.fetcher = newLightFetcher(handler)
handler.fetcher = newLightFetcher(handler, backend.serverPool.getTimeout)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
Expand All @@ -85,14 +85,9 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
}
peer := newServerPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version)))
defer peer.close()
peer.poolEntry = h.backend.serverPool.connect(peer, peer.Node())
if peer.poolEntry == nil {
return p2p.DiscRequested
}
h.wg.Add(1)
defer h.wg.Done()
err := h.handle(peer)
h.backend.serverPool.disconnect(peer.poolEntry)
return err
}

Expand Down Expand Up @@ -129,10 +124,6 @@ func (h *clientHandler) handle(p *serverPeer) error {

h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td})

// pool entry can be nil during the unit test.
if p.poolEntry != nil {
h.backend.serverPool.registered(p.poolEntry)
}
// Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0)
Expand Down
5 changes: 3 additions & 2 deletions les/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type NodeInfo struct {
}

// makeProtocols creates protocol descriptors for the given LES versions.
func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error, peerInfo func(id enode.ID) interface{}) []p2p.Protocol {
func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error, peerInfo func(id enode.ID) interface{}, dialCandidates enode.Iterator) []p2p.Protocol {
protos := make([]p2p.Protocol, len(versions))
for i, version := range versions {
version := version
Expand All @@ -93,7 +93,8 @@ func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return runPeer(version, peer, rw)
},
PeerInfo: peerInfo,
PeerInfo: peerInfo,
DialCandidates: dialCandidates,
}
}
return protos
Expand Down
11 changes: 5 additions & 6 deletions les/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,11 @@ func (d *requestDistributor) loop() {
type selectPeerItem struct {
peer distPeer
req *distReq
weight int64
weight uint64
}

// Weight implements wrsItem interface
func (sp selectPeerItem) Weight() int64 {
return sp.weight
func selectPeerWeight(i interface{}) uint64 {
return i.(selectPeerItem).weight
}

// nextRequest returns the next possible request from any peer, along with the
Expand Down Expand Up @@ -220,9 +219,9 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
wait, bufRemain := peer.waitBefore(cost)
if wait == 0 {
if sel == nil {
sel = utils.NewWeightedRandomSelect()
sel = utils.NewWeightedRandomSelect(selectPeerWeight)
}
sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
sel.Update(selectPeerItem{peer: peer, req: req, weight: uint64(bufRemain*1000000) + 1})
} else {
if bestWait == 0 || wait < bestWait {
bestWait = wait
Expand Down
12 changes: 12 additions & 0 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package les

import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)

Expand All @@ -30,3 +33,12 @@ type lesEntry struct {
func (e lesEntry) ENRKey() string {
return "les"
}

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
if /*cfg.NoDiscovery || */ len(eth.config.DiscoveryURLs) == 0 {
return nil, nil
}
client := dnsdisc.NewClient(dnsdisc.Config{})
return client.NewIterator(eth.config.DiscoveryURLs...)
}
Loading

0 comments on commit b4a2681

Please sign in to comment.