Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(mixnet): enable transport adapter #1220

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libp2p.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ requires "nim >= 1.6.0",
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
"websock", "unittest2",
"https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc"
"https://github.com/status-im/nim-quic.git#0e4677b3e8cafdaaaba52de59164a8e64ed3906e"

let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
Expand Down
38 changes: 33 additions & 5 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,29 @@ proc dialAndUpgrade(
address: MultiAddress,
dir = Direction.Out,
): Future[Muxer] {.async.} =
echo "\n\n> Dialer::dialAndUpgrade"
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname
let dialed =
try:
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address, peerId)
echo "> Dialer::dialAndUpgrade::0"
# echo hostname
# echo address
# echo peerId
echo transport.log()
echo type(transport)
echo type(hostname)
echo type(address)
echo type(peerId)
let dialFut = transport.dial(hostname, address, peerId)
echo "> Dialer::dialAndUpgrade::after-dial"
let myD = await dialFut
echo "< Dialer::dialAndUpgrade::after-dial"
echo myD.shortLog()
echo "<<<< Dialer::dialAndUpgrade::after-dial"
myD
except CancelledError as exc:
trace "Dialing canceled",
description = exc.msg, peerId = peerId.get(default(PeerId))
Expand All @@ -70,7 +86,8 @@ proc dialAndUpgrade(
description = exc.msg, peerId = peerId.get(default(PeerId))
libp2p_failed_dials.inc()
return nil # Try the next address

echo "> Dialer::dialAndUpgrade::1"
echo dialed.shortLog()
libp2p_successful_dials.inc()

let mux =
Expand All @@ -80,6 +97,11 @@ proc dialAndUpgrade(
# The if below is more general and might handle other use cases in the future.
if dialed.dir != dir:
dialed.dir = dir
echo "> Dialer::dialAndUpgrade::2"
echo transport.log()
echo dialed.shortLog()
# TODO: dialed should be MixnetConnectionAdapter
echo "< Dialer::dialAndUpgrade::2"
await transport.upgrade(dialed, peerId)
except CancelledError as exc:
await dialed.close()
Expand All @@ -97,7 +119,7 @@ proc dialAndUpgrade(

# Try other address
return nil

echo "> Dialer::dialAndUpgrade::3"
doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir
debug "Dial successful", peerId = mux.connection.peerId
return mux
Expand Down Expand Up @@ -170,6 +192,7 @@ proc internalConnect(
reuseConnection = true,
dir = Direction.Out,
): Future[Muxer] {.async.} =
echo "> Dialer::internalConnect"
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")

Expand All @@ -184,16 +207,17 @@ proc internalConnect(
return mux

let slot = self.connManager.getOutgoingSlot(forceDial)
echo "> Dialer::internalConnect::0"
let muxed =
try:
await self.dialAndUpgrade(peerId, addrs, dir)
except CatchableError as exc:
slot.release()
raise exc
echo "> Dialer::internalConnect::1"
slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")

try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
Expand Down Expand Up @@ -302,7 +326,7 @@ method dial*(
## create a protocol stream and establish
## a connection if one doesn't exist already
##

echo "> Dialer::dial"
var
conn: Muxer
stream: Connection
Expand All @@ -315,14 +339,18 @@ method dial*(
await conn.close()

try:
echo "> Dialer::0"
trace "Dialing (new)", peerId, protos
conn = await self.internalConnect(Opt.some(peerId), addrs, forceDial)
echo "> Dialer::1"
trace "Opening stream", conn
stream = await self.connManager.getStream(conn)
echo "> Dialer::2"

if isNil(stream):
raise newException(DialFailedError, "Couldn't get muxed stream")

echo "< Dialer::dial"
return await self.negotiateStream(stream, protos)
except CancelledError as exc:
trace "Dial canceled", conn
Expand Down
4 changes: 4 additions & 0 deletions libp2p/multistream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ proc select*(
): Future[string] {.async: (raises: [CancelledError, LPStreamError, MultiStreamError]).} =
trace "initiating handshake", conn, codec = Codec
## select a remote protocol
echo "> MultiStreamSelect::select"
await conn.writeLp(Codec & "\n") # write handshake
echo "> MultiStreamSelect::select - 0"
if proto.len() > 0:
trace "selecting proto", conn, proto = proto[0]
await conn.writeLp((proto[0] & "\n")) # select proto
Expand All @@ -67,6 +69,7 @@ proc select*(
trace "multistream handshake success", conn

if proto.len() == 0: # no protocols, must be a handshake call
echo "< MultiStreamSelect::select - Handshake"
return Codec
else:
s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto
Expand All @@ -75,6 +78,7 @@ proc select*(
if s == proto[0]:
trace "successfully selected ", conn, proto = proto[0]
conn.protocol = proto[0]
echo "< MultiStreamSelect::select - ", proto[0]
return proto[0]
elif proto.len > 1:
# Try to negotiate alternatives
Expand Down
9 changes: 8 additions & 1 deletion libp2p/protocols/ping.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ proc new*(

method init*(p: Ping) =
proc handle(conn: Connection, proto: string) {.async.} =
echo "######### Before Ping #########"
try:
trace "handling ping", conn
var buf: array[PingSize, byte]
Expand All @@ -64,13 +65,15 @@ method init*(p: Ping) =
raise exc
except CatchableError as exc:
trace "exception in ping handler", description = exc.msg, conn
echo "######### After Ping #########"

p.handler = handle
p.codec = PingCodec

proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
## Sends ping to `conn`, returns the delay

echo "######### Pinging #########"
echo conn.shortLog()
trace "initiating ping", conn

var
Expand All @@ -82,9 +85,12 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
let startTime = Moment.now()

trace "sending ping", conn
echo "# Before Write. Is conn closed? ", conn.isClosed, conn.isEof
await conn.write(@randomBuf)
echo "# After Write. Is conn closed? ", conn.isClosed, conn.isEof

await conn.readExactly(addr resultBuf[0], PingSize)
echo "# After Read. Is conn closed? ", conn.isClosed

let responseDur = Moment.now() - startTime

Expand All @@ -95,4 +101,5 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} =
raise newException(WrongPingAckError, "Incorrect ping data from peer!")

trace "valid ping response", conn
echo "######### Pinged #########"
return responseDur
2 changes: 2 additions & 0 deletions libp2p/protocols/secure/secure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ method init*(s: Secure) =
try:
# We don't need the result but we
# definitely need to await the handshake
echo "Secure::handle"
discard await s.handleConn(conn, false, Opt.none(PeerId))
trace "connection secured", conn
except CancelledError as exc:
Expand All @@ -165,6 +166,7 @@ method secure*(
): Future[Connection] {.
async: (raises: [CancelledError, LPStreamError], raw: true), base
.} =
echo "> Secure::secure"
s.handleConn(conn, conn.dir == Direction.Out, peerId)

method readOnce*(
Expand Down
13 changes: 5 additions & 8 deletions libp2p/stream/connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ type

proc timeoutMonitor(s: Connection) {.async: (raises: []).}

func shortLog*(conn: Connection): string =
try:
if conn == nil:
"Connection(nil)"
else:
&"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raiseAssert(exc.msg)
method shortLog*(conn: Connection): string {.raises: [].} =
if conn == nil:
"Connection(nil)"
else:
&"{shortLog(conn.peerId)}:{conn.oid}:{conn.protocol}"

chronicles.formatIt(Connection):
shortLog(it)
Expand Down
18 changes: 10 additions & 8 deletions libp2p/stream/lpstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ method initStream*(s: LPStream) {.base.} =
trackCounter(s.objName)
trace "Stream created", s, objName = s.objName, dir = $s.dir

proc join*(
method join*(
s: LPStream
): Future[void] {.async: (raises: [CancelledError], raw: true), public.} =
## Wait for the stream to be closed
Expand All @@ -134,9 +134,10 @@ method readOnce*(
## available
raiseAssert("Not implemented!")

proc readExactly*(
method readExactly*(
s: LPStream, pbytes: pointer, nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# echo "readExactly. Is conn closed? ", s.isClosed
## Waits for `nbytes` to be available, then read
## them and return them
if s.atEof:
Expand All @@ -159,9 +160,9 @@ proc readExactly*(

if read == 0:
doAssert s.atEof()
trace "couldn't read all bytes, stream EOF", s, nbytes, read
# Re-readOnce to raise a more specific error than EOF
# Raise EOF if it doesn't raise anything(shouldn't happen)
# echo "readExactly3. Is conn closed? ", s.isClosed
discard await s.readOnce(addr pbuffer[read], nbytes - read)
warn "Read twice while at EOF"
raise newLPStreamEOFError()
Expand All @@ -170,7 +171,7 @@ proc readExactly*(
trace "couldn't read all bytes, incomplete data", s, nbytes, read
raise newLPStreamIncompleteError()

proc readLine*(
method readLine*(
s: LPStream, limit = 0, sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Reads up to `limit` bytes are read, or a `sep` is found
Expand Down Expand Up @@ -198,7 +199,7 @@ proc readLine*(
if len(result) == lim:
break

proc readVarint*(
method readVarint*(
conn: LPStream
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
var buffer: array[10, byte]
Expand All @@ -217,7 +218,7 @@ proc readVarint*(
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")

proc readLp*(
method readLp*(
s: LPStream, maxSize: int
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## read length prefixed msg, with the length encoded as a varint
Expand All @@ -243,7 +244,7 @@ method write*(
# Write `msg` to stream, waiting for the write to be finished
raiseAssert("Not implemented!")

proc writeLp*(
method writeLp*(
s: LPStream, msg: openArray[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
## Write `msg` with a varint-encoded length prefix
Expand All @@ -253,7 +254,7 @@ proc writeLp*(
buf[vbytes.len ..< buf.len] = msg
s.write(buf)

proc writeLp*(
method writeLp*(
s: LPStream, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
writeLp(s, msg.toOpenArrayByte(0, msg.high))
Expand Down Expand Up @@ -305,6 +306,7 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} =
##

trace "Closing with EOF", s
echo "> Closing with EOF: ", s.shortLog()
if s.closedWithEOF:
trace "Already closed"
return
Expand Down
9 changes: 8 additions & 1 deletion libp2p/transports/tcptransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ method accept*(self: TcpTransport): Future[Connection] =
proc impl(
self: TcpTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
echo "> TcpTransport::accept"
proc cancelAcceptFuts() =
for fut in self.acceptFuts:
if not fut.completed():
Expand All @@ -239,19 +240,23 @@ method accept*(self: TcpTransport): Future[Connection] =
elif self.acceptFuts.len == 0:
# Holds futures representing ongoing accept calls on multiple servers.
self.acceptFuts = self.servers.mapIt(it.accept())

echo "> TcpTransport::accept - 0"
let
finished =
try:
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
echo "###############################################################"
await one(self.acceptFuts)
except ValueError:
echo "> TcpTransport::accept - 02"
raiseAssert "Accept futures should not be empty"
except CancelledError as exc:
echo "> TcpTransport::accept - 03"
cancelAcceptFuts()
raise exc
index = self.acceptFuts.find(finished)

echo "> TcpTransport::accept - 1"
# A new connection has been accepted. The corresponding server should immediately start accepting another connection.
# Thus we replace the completed future with a new one by calling accept on the same server again.
self.acceptFuts[index] = self.servers[index].accept()
Expand All @@ -274,6 +279,7 @@ method accept*(self: TcpTransport): Future[Connection] =
cancelAcceptFuts()
raise exc

echo "> TcpTransport::accept - 2"
if not self.running: # Stopped while waiting
await transp.closeWait()
raise newTransportClosedError()
Expand All @@ -289,6 +295,7 @@ method accept*(self: TcpTransport): Future[Connection] =

let observedAddr =
MultiAddress.init(remote).expect("Can initialize from remote address")
echo "- TcpTransport::accept"
self.connHandler(transp, Opt.some(observedAddr), Direction.In)

impl(self)
Expand Down
8 changes: 6 additions & 2 deletions libp2p/transports/transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type
upgrader*: Upgrade
networkReachability*: NetworkReachability

method log*(self: Transport): string {.base, gcsafe.} =
"<Transport>"

proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError =
newException(TransportClosedError, "Transport closed, no more connections!", parent)

Expand Down Expand Up @@ -69,10 +72,10 @@ method dial*(
): Future[Connection] {.base, gcsafe.} =
## dial a peer
##

echo "Transport::dial"
doAssert(false, "Not implemented!")

proc dial*(
method dial*(
self: Transport, address: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId)
): Future[Connection] {.gcsafe.} =
self.dial("", address)
Expand All @@ -83,6 +86,7 @@ method upgrade*(
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
echo "> Transport::upgrade"
self.upgrader.upgrade(conn, peerId)

method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} =
Expand Down
Loading
Loading