Skip to content

Commit

Permalink
fix: concurrent map read and writes
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Oct 30, 2024
1 parent a679321 commit 8a5c2d5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion p2p/discover/portal_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (p *PortalProtocol) setupUDPListening() error {
func(buf []byte, addr *net.UDPAddr) (int, error) {
p.Log.Info("will send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))

if n, ok := p.DiscV5.cachedAddrNode[addr.String()]; ok {
if n, ok := p.DiscV5.GetCachedNode(addr.String()); ok {
//_, err := p.DiscV5.TalkRequestToID(id, addr, string(portalwire.UTPNetwork), buf)
req := &v5wire.TalkRequest{Protocol: string(portalwire.Utp), Message: buf}
p.DiscV5.sendFromAnotherThreadWithNode(n, netip.AddrPortFrom(netutil.IPToAddr(addr.IP), uint16(addr.Port)), req)
Expand Down
25 changes: 19 additions & 6 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type UDPv5 struct {
// static fields
conn UDPConn
tab *Table
cachedIds map[enode.ID]*enode.Node
nodeMu sync.Mutex
cachedAddrNode map[string]*enode.Node
netrestrict *netutil.Netlist
priv *ecdsa.PrivateKey
Expand Down Expand Up @@ -155,7 +155,6 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
// static fields
conn: newMeteredConn(conn),
cachedAddrNode: make(map[string]*enode.Node),
cachedIds: make(map[enode.ID]*enode.Node),
localNode: ln,
db: ln.Database(),
netrestrict: cfg.NetRestrict,
Expand Down Expand Up @@ -729,8 +728,7 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet,
return nonce, err
}
if c != nil && c.Node != nil {
t.cachedIds[toID] = c.Node
t.cachedAddrNode[toAddr.String()] = c.Node
t.putCache(toAddr.String(), c.Node)
}

_, err = t.conn.WriteToUDPAddrPort(enc, toAddr)
Expand Down Expand Up @@ -793,8 +791,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error {
if fromNode != nil {
// Handshake succeeded, add to table.
t.tab.addInboundNode(fromNode)
t.cachedIds[fromID] = fromNode
t.cachedAddrNode[fromAddr.String()] = fromNode
t.putCache(fromAddr.String(), fromNode)
}
if packet.Kind() != v5wire.WhoareyouPacket {
// WHOAREYOU logged separately to report errors.
Expand Down Expand Up @@ -999,3 +996,19 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
}
return resp
}

func (t *UDPv5) putCache(addr string, node *enode.Node) {
t.nodeMu.Lock()
defer t.nodeMu.Unlock()
if n, ok := t.cachedAddrNode[addr]; ok {
t.log.Debug("Update cached node", "old", n.ID(), "new", node.ID())
}
t.cachedAddrNode[addr] = node
}

func (t *UDPv5) GetCachedNode(addr string) (*enode.Node, bool) {
t.nodeMu.Lock()
defer t.nodeMu.Unlock()
n, ok := t.cachedAddrNode[addr]
return n, ok
}
35 changes: 34 additions & 1 deletion portalnetwork/history/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
deleteSql = "DELETE FROM kvstore WHERE key = (?1);"
containSql = "SELECT 1 FROM kvstore WHERE key = (?1);"
getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;"
getFarthestDistanceSql = "SELECT key, xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC Limit 1;"
deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1"
XorFindFarthestQuery = `SELECT
xor(key, (?1)) as distance
Expand Down Expand Up @@ -117,8 +118,8 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora
}

err = hs.initStmts()

// Check whether we already have data, and use it to set radius
hs.setRadiusToFarthestDistance()

// necessary to test NetworkName==history because state also initialize HistoryStorage
if strings.ToLower(config.NetworkName) == "history" {
Expand Down Expand Up @@ -376,6 +377,38 @@ func (p *ContentStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256
return currentRadius, nil
}

func (p *ContentStorage) setRadiusToFarthestDistance() {
rows, err := p.sqliteDB.Query(getFarthestDistanceSql, p.nodeId[:])
if err != nil {
p.log.Error("failed to query farthest distance ", "err", err)
return
}
defer func(rows *sql.Rows) {
if rows != nil {
return
}
err = rows.Close()
if err != nil {
p.log.Error("failed to close rows", "err", err)
}
}(rows)

if rows.Next() {
var contentId []byte
var distance []byte
err = rows.Scan(&contentId, &distance)
if err != nil {
p.log.Error("failed to scan rows for farthest distance", "err", err)
}
dis := uint256.NewInt(0)
err = dis.UnmarshalSSZ(distance)
if err != nil {
p.log.Error("failed to unmarshal ssz for farthest distance", "err", err)
}
p.radius.Store(dis)
}
}

func (p *ContentStorage) deleteContentFraction(fraction float64) (deleteCount int, err error) {
if fraction <= 0 || fraction >= 1 {
return deleteCount, errors.New("fraction should be between 0 and 1")
Expand Down

0 comments on commit 8a5c2d5

Please sign in to comment.