Skip to content

Commit

Permalink
feat: add dns discovery to sensor logic (#409)
Browse files Browse the repository at this point in the history
* add dns discovery to sensor logic

* touch ups

* fix: discovery dns (#410)

* fix discovery dns

* update gen-doc

* fix error

* fix typo

---------

Co-authored-by: Minh Vu <[email protected]>
  • Loading branch information
rebelArtists and minhd-vu authored Oct 24, 2024
1 parent c07405b commit 43b4e81
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 7 deletions.
75 changes: 69 additions & 6 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
ethp2p "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -68,6 +69,7 @@ type (
NAT string
QuickStart bool
TTL time.Duration
DiscoveryDNS string

bootnodes []*enode.Node
nodes []*enode.Node
Expand Down Expand Up @@ -249,21 +251,27 @@ var SensorCmd = &cobra.Command{
sub := server.SubscribeEvents(events)
defer sub.Unsubscribe()

ticker := time.NewTicker(2 * time.Second)
ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds
hourlyTicker := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour
defer ticker.Stop()
defer hourlyTicker.Stop()

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

peers := make(map[enode.ID]string)
var peersMutex sync.Mutex

for _, node := range inputSensorParams.nodes {
// Because the node URLs can change, map them to the node ID to prevent
// duplicates.
// Map node URLs to node IDs to avoid duplicates
peers[node.ID()] = node.URLv4()
}

go handleAPI(&server, msgCounter)

// Run DNS discovery immediately at startup
go handleDNSDiscovery(&server, peers, &peersMutex)

for {
select {
case <-ticker.C:
Expand All @@ -274,14 +282,19 @@ var SensorCmd = &cobra.Command{

db.WritePeers(context.Background(), server.Peers())
case peer := <-opts.Peers:
// Update the peer list and the nodes file.
// Lock the peers map before modifying it
peersMutex.Lock()
// Update the peer list and the nodes file
if _, ok := peers[peer.ID()]; !ok {
peers[peer.ID()] = peer.URLv4()

if err := p2p.WritePeers(inputSensorParams.NodesFile, peers); err != nil {
log.Error().Err(err).Msg("Failed to write nodes to file")
}
}
peersMutex.Unlock()
case <-hourlyTicker.C:
go handleDNSDiscovery(&server, peers, &peersMutex)
case <-signals:
// This gracefully stops the sensor so that the peers can be written to
// the nodes file.
Expand Down Expand Up @@ -373,6 +386,51 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
}
}

// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
// the p2p server. It syncs the DNS discovery tree and adds any newly discovered
// peers not already in the peers map.
func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersMutex *sync.Mutex) {
if len(inputSensorParams.DiscoveryDNS) == 0 {
return
}

log.Info().
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
Msg("Starting DNS discovery sync")

client := dnsdisc.NewClient(dnsdisc.Config{})
tree, err := client.SyncTree(inputSensorParams.DiscoveryDNS)
if err != nil {
log.Error().Err(err).Msg("Failed to sync DNS discovery tree")
return
}

// Log the number of nodes in the tree
log.Info().
Int("unique_nodes", len(tree.Nodes())).
Msg("Successfully synced DNS discovery tree")

// Lock the peers map and server operations
peersMutex.Lock()
defer peersMutex.Unlock()

// Add DNS-discovered peers
for _, node := range tree.Nodes() {
if _, ok := peers[node.ID()]; ok {
continue
}

log.Debug().
Str("enode", node.URLv4()).
Msg("Discovered new peer through DNS")

// Instruct server to connect to the new peer
server.AddPeer(node)
}

log.Info().Msg("Finished adding DNS discovery peers")
}

// getPeerMessages retrieves the count of various types of eth packets sent by a
// peer.
func getPeerMessages(url string, counter *prometheus.CounterVec) p2p.MessageCount {
Expand Down Expand Up @@ -426,8 +484,12 @@ func removePeerMessages(counter *prometheus.CounterVec, peers []*ethp2p.Peer) er
}
}

// During DNS-discovery or when the server is taking a while to discover
// peers and has yet to receive a message, the sensor_messages prometheus
// metric may not exist yet.
if family == nil {
return errors.New("could not find sensor_messages metric family")
log.Trace().Msg("Could not find sensor_messages metric family")
return nil
}

for _, metric := range family.GetMetric() {
Expand Down Expand Up @@ -473,7 +535,7 @@ func init() {
if err := SensorCmd.MarkFlagRequired("sensor-id"); err != nil {
log.Error().Err(err).Msg("Failed to mark sensor-id as required persistent flag")
}
SensorCmd.Flags().IntVarP(&inputSensorParams.MaxPeers, "max-peers", "m", 200, "Maximum number of peers to connect to")
SensorCmd.Flags().IntVarP(&inputSensorParams.MaxPeers, "max-peers", "m", 2000, "Maximum number of peers to connect to")
SensorCmd.Flags().IntVarP(&inputSensorParams.MaxDatabaseConcurrency, "max-db-concurrency", "D", 10000,
`Maximum number of concurrent database operations to perform. Increasing this
will result in less chance of missing data (i.e. broken pipes) but can
Expand Down Expand Up @@ -510,4 +572,5 @@ This produces faster development cycles but can prevent the sensor from being to
connect to new peers if the nodes.json file is large.`)
SensorCmd.Flags().StringVar(&inputSensorParams.TrustedNodesFile, "trusted-nodes", "", "Trusted nodes file")
SensorCmd.Flags().DurationVar(&inputSensorParams.TTL, "ttl", 14*24*time.Hour, "Time to live")
SensorCmd.Flags().StringVar(&inputSensorParams.DiscoveryDNS, "discovery-dns", "", "DNS discovery ENR tree url")
}
3 changes: 2 additions & 1 deletion doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ If no nodes.json file exists, it will be created.
-d, --database-id string Datastore database ID
--dial-ratio int Ratio of inbound to dialed connections. A dial ratio of 2 allows 1/2 of
connections to be dialed. Setting this to 0 defaults it to 3.
--discovery-dns string DNS discovery ENR tree url
--discovery-port int UDP P2P discovery port (default 30303)
--fork-id bytesHex The hex encoded fork id (omit the 0x) (default F097BC13)
--genesis-hash string The genesis block hash (default "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b")
Expand All @@ -37,7 +38,7 @@ If no nodes.json file exists, it will be created.
-D, --max-db-concurrency int Maximum number of concurrent database operations to perform. Increasing this
will result in less chance of missing data (i.e. broken pipes) but can
significantly increase memory usage. (default 10000)
-m, --max-peers int Maximum number of peers to connect to (default 200)
-m, --max-peers int Maximum number of peers to connect to (default 2000)
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
-n, --network-id uint Filter discovered nodes by this network ID
--port int TCP network listening port (default 30303)
Expand Down

0 comments on commit 43b4e81

Please sign in to comment.