-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Brain dump #6
base: main
Are you sure you want to change the base?
Brain dump #6
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,7 +85,8 @@ final class Network[F[_]: Async] private ( | |
Sync[F] delay { | ||
val bootstrap = new ServerBootstrap | ||
bootstrap.group(parent, child) | ||
.option(JChannelOption.AUTO_READ.asInstanceOf[JChannelOption[Any]], false) // backpressure | ||
// TODO: does netty override this? `ServerBootstrap.init` via `bind` override schedules a future to set to autoread to false | ||
.option(JChannelOption.AUTO_READ.asInstanceOf[JChannelOption[Any]], false) // backpressure | ||
.channel(serverChannelClazz) | ||
.childHandler(initializer(disp)(sockets.offer)) | ||
|
||
|
@@ -163,8 +164,8 @@ object Network { | |
|
||
(instantiateR("server"), instantiateR("client")) mapN { (server, client) => | ||
try { | ||
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int]) | ||
meth.invoke(server, new Integer(90)) // TODO tweak this a bit more; 100 was worse than 50 and 90 was a dramatic step up from both | ||
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int]) // TODO: this method is set to be depcrecated in the future releases | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seriously? What's replacing it? The impact this had was substantial. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to Epoll source this doesn't even do anything. For KQueue and NIO ELG this has a meaningful impact. As to what's replacing Epoll's method...🤷♂️ ha. So it's not really a universal interface for ELG's, but Netty makes it seem like it is. One takeaway is that this creates a discrepancy between tuning on non-linux OS'es vs. other OS'es. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh this is really interesting! I'm not that fussed about the tuning being a bit different on different OSes, though if it starts getting to the point where people might actually take that into account and build their applications and profiling to those kinds of parameters, then it might be an issue. At some point, differences between platforms are very much unavoidable. Like if you're profiling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
meth.invoke(server, new Integer(90)) // TODO tweak this a bit more; 100 was worse than 50 and 90 was a dramatic step up from both | ||
meth.invoke(client, new Integer(90)) | ||
} catch { | ||
case _: Exception => () | ||
|
@@ -174,6 +175,7 @@ object Network { | |
} | ||
} | ||
|
||
//TODO: Why not use the Netty methods | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there Netty methods for doing this kind of detection? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example,
Epoll source code for more details. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So. Much. Nicer. Let's do this. |
||
private[this] def uring() = | ||
try { | ||
if (sys.props.get("fs2.netty.use.io_uring").map(_.toBoolean).getOrElse(false)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
# FS2-Netty Developer Docs | ||
|
||
## What is FS2-Netty | ||
A general networking library for TCP and UDP based protocols over the Netty framework using typesafe FS2 streams. | ||
It should be general enough to support protocols such as DNS, FTP, MQTT, Redis, HTTP/1.1/ HTTP/2, HTTP/3, Quic, etc. | ||
|
||
(M. Mienko - I don't know FS2, so cannot comment on its role in detail. I will just write "stream" or "queue" where-ever makes sense :) ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
## Key Questions for FS2-Netty | ||
1) (API) How much of Netty should this library expose to users? | ||
- How to expose Netty's pipeline, event-loop scheduler, channel, channel handlers, etc. such that users the | ||
flexibility of Netty's pipeline and typesafe FP abstractions. | ||
- At a minimum, stop exposing Netty pipeline after we produce `Stream[Socket]` | ||
Comment on lines
+10
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we should have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, that all makes sense. This requirement is too strong since we need to allow pipeline mutations, e.g. HTTP to WS. We're back to exposing all of Netty again since users have access to handlers. One thing to think about is Netty's custom events. Specifically IdleState events, for detecting lack of inbound or outbound channel activity. So this would imply another type |
||
2) (Safety) When Netty constructs are exposed, how can it do safely do so? Safety is defined as: | ||
- preventing users from crashing clients or servers | ||
- preventing leaking resources such as connections or memory | ||
- mitigating negative performance, namely around the socket operations | ||
- prevent unintentionally protocol violations, if the connection uses a websocket protocol, then app should never | ||
send an HTTP or Redis message. Likewise, TCP and UDP shouldn't mix. | ||
Comment on lines
+14
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a tricky one. I'm not sure we can without really messing up the way that pipelines work. One thing we could do is, rather than having a big opaque Pipelines are super-unsafe, and that tends to make them fast, but it also makes them… super unsafe. I think the main way we can help people is by giving them the tools to get out of Netty as quickly as possible. Like the idea should be that you would use native Netty pipeline handlers for any protocol implementation that you find useful and performant (e.g. HTTP or WebSockets), but then all actual user code should be off in Fs2. In other words, users would be discouraged from defining new handlers, but using existing ones should be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup agree with you here too. This part of the definition of safety should be weakened: "Using streams provided by FS2-Netty should prevent unintentionally protocol violations" or something like that. |
||
3) (Liveliness) When an app creates a client or server and the "world" is happy (networking layer is working and | ||
peer(s) are up), then how does it ensure messages are always received or sent? | ||
Comment on lines
+20
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand. You mean just within the local state itself? How do we separate this from Netty, or the kernel for that matter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to capture the idea that " 'something' good will happen" and/or "there are no bugs". Haven't really thought this through 😅, but definitely within the local state itself. So I guess, Given that network layer is working ,And Netty is working, And FS2 is working, Then socket will send and receive packets... |
||
4) (Robustness) When a peer goes down, there's a malicious peer, network fails, protocol is violated, | ||
or machine resource usage is high, then how does the framework: | ||
- gracefully handle these scenarios itself (auto-recover if possible and makes sense), or | ||
- permit the user to handle them gracefully, or | ||
- notify the user of this non-fatal error? | ||
Comment on lines
+22
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah this I've thought about! So in the event that errors are encountered, they will be propagated into the stream, where they can be handled by the user. Users can then implement recovery semantics on the streams and attempt to reconnect, or they might have some other semantic, but either way the semantics of Within Netty itself is kind of a different story. Basically anything that happens in pipeline handlers, you're on your own. :-P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, essentially what |
||
5) (Visibility) ... | ||
6) (Efficiency + Performance) ... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To this particular point, something I've thought about a lot is both efficiency and backpressure. Staying within Fs2's assumptions regarding upstream production (i.e. that we don't have some hidden unbounded queue that we're filling up while we wait to feed it things) basically gives us a ton of this stuff for free. Fs2 itself is kind of shockingly efficient, and the guarantees it offers around backpressure and resource safety are extremely strong out of the box. |
||
|
||
## Use Cases | ||
Below are the use-cases FS2-Netty needs to support. | ||
|
||
### A Pub-Sub like System | ||
This system uses the websocket protocol to integrate with clients. On the backend, messages can be sent via HTTP/1.1. | ||
Published events from clients are arbitrarily processed. The following are requirements of a networking framework from | ||
the point of view of developers building the pub-sub system. | ||
|
||
#### High Level Requirements | ||
HTTP: | ||
- We want a simple (GET & POST) but robust (handles malformed requests) HTTP server that handles payloads on the order of KB's. | ||
- We want HTTP connections to be efficient for client use, i.e. a client can pool connections and reuse them for multiple HTTP requests. | ||
- We want the HTTP server to avoid leaking connections; it should detect and clean up dropped connections. | ||
- We want the HTTP server to rotate client connections to load balance across multiple instances. | ||
Comment on lines
+40
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How much of this can we just get for free out of Netty's default HTTP handler? I've never used it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above is a mix of |
||
- We don't want to allow HTTP pipelining. | ||
- We want HTTP logging (and metrics?). | ||
- Access log with URI path, HTTP method, HTTP headers, payload size in bytes, response time, response code, and response headers. | ||
- Metrics on how often the TCP connection is closed by the server and the client. How often client drops connections? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could actually be a fairly trivial There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup |
||
- We don't need SSL since this service will be behind a Load Balancer that will terminate SSL connections. | ||
|
||
WebSocket (at a minimum): | ||
- We want the HTTP connection to transition to an arbitrary WebSocket connection, i.e. there's not a single WS controller with a single path for the whole server. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is fun. :-P I think that we basically just don't try to suppress handler's ability to mutate the pipeline. We really prevent it anyway, and by allowing it provided that handlers are able to within the |
||
- We want to receive websocket frames. | ||
- We want to backpressure the sender. For context, say our service proxies frames to other tcp servers. Furthermore, | ||
one of those servers is slow. We want the client to slow down their rate without an explicit higher layer protocol | ||
message. However, we can optimize this so that a single slow downstream does backpressure all other frames going to | ||
other destinations. We maintain a queue of frames and only backpressure when queue is full. (Without an explicit | ||
higher layer flow control or protocol other TCP, this will only mitigate the problem. Frames for different | ||
downstreams are multiplexed over a single TCP connection, so once queue is full because a single slow consumer than | ||
we block all other frame processing, a.k.a Head-of-line (HOL) blocking. This is partly a fundamental limitation of | ||
TCP, i.e. there's no virtual streams. Such limitations are overcome in QUIC and HTTP/3.) | ||
Either of these 2 approaches may work: | ||
- Only read socket when we finish processing a message. Pub-Sub app may queue up the frame for later processing, or it may immedieiately process it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the approach currently taken in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, going to presume this will be the way going forward. Curious if we'll need to interact with |
||
- Give FS2-Netty a queue to fill and only backpressure when queue is full (or above a high watermark, start reading when below a low watermark). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically we could emulate this directly in The one thing though is that backpressure can only be applied in this model to one stream at a time. By which I mean, downstream backpressure on one stream will result in that socket getting slower There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, you're right this is really a concern for this library. |
||
- We want to send ws frames. | ||
- When sending any frame, we want to know if it was written to the connection. | ||
- When sending a close frame, then the frame itself should be sent, then the connection should be closed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one's fun. Right now, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yup |
||
- We want to know if we're writing too fast and respond to backpressure (namely TCP backpressure). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As long as we don't toss the frame into a queue and forget about it, this should happen basically for free since the next write (from downstream) won't happen until we're done writing the previous frame. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I was alluding to I think I know what you mean, but have a few more questions. Will circle back later. |
||
- We want to know why a websocket connection closed, what was the close code if any. | ||
|
||
WebSocket (extras that will save us time dev time if framework can do these): | ||
- We want to group connections, label an individual connection, write to a single connection, broadcast to group, close a single connection, and close all connections. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These sound like really easy things to implement on top of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could definitely be done in FS2. Was thinking that we could also expose ChannelGroup just to leverage Netty, but this is an impl detail. I haven't played with |
||
- We want to know if connection is still alive. | ||
- Close connection if it's unresponsive. | ||
Comment on lines
+72
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More annoying. Downstream can implement a heartbeating check mechanism to detect… things that you detect with a heartbeat. If TCP informs us that the socket is gone then the streams will be closed proactively and any associated resources cleaned up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, also can leverage Netty's |
||
- We want metrics for WS server | ||
- Count of different frame types | ||
- The size of data frames | ||
- Count of different close codes | ||
Comment on lines
+74
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would definitely do this on top of the streams that we produce from fs2-netty. Basically all of those semantics work really nicely and composably downstream because of how Fs2 works. |
||
|
||
General Server configs: | ||
- We want to limit the number of TCP connections? To protect the server? | ||
- Backpressure accepting connections (and reads) if memory is getting high? | ||
E.g. Netty thread pool is queuing too many tasks in compute thread pool. Queue will hit OOM. | ||
Comment on lines
+80
to
+82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this is set up right now is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
and/or
I now understand these, so yeah that'll do the trick. :) |
||
|
||
## Implementation thoughts for above use cases | ||
Based on what currently exists and past discussions with additional modifications. | ||
|
||
## Package Structure | ||
- fs2-netty-core | ||
- Netty | ||
- wrappers | ||
- ops | ||
- fs2-netty-server | ||
- tcp | ||
- udp | ||
- fs2-netty-client | ||
- tcp | ||
- udp | ||
Comment on lines
+88
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually kinda wanna squeeze these three together. My inspiration here is fs2-io: https://github.com/typelevel/fs2/tree/main/io/src/main/scala/fs2/io/net There's some nice API design in here, but one thing that's really cool is that it's just dead-simple to get a server or client stood up, using the Otherwise I agree though. Also I have literally never written a program that does anything with UDP and I feel deep shame. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes a lot sense to keep them as simple functions. |
||
- fs2-netty-all | ||
- fs2-netty-simple-http-server | ||
- websockers | ||
Comment on lines
+99
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the idea of having some nicer wrappers around the existing pipeline handlers in these areas. So exactly what you're saying here, I think. :-) We should still allow users to wrap their own arbitrary handlers if they need to, but nice experiences out of the box are important. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah exactly, there's the core and some user-friendly default pipelines to make people's lives easier. |
||
|
||
### API | ||
We'll look at how intermediate library/module authors will use FS2-Netty to build protocols for above use-cases. | ||
|
||
#### TCP Server | ||
The idea is that FS2-Netty users provide a custom Netty pipeline and probably some configs to `FS2TcpServer`, which | ||
builds the server resource that exposes a Stream of `Socket`'s. Then users can map over that to produce their desired | ||
api. `Socket` should be the bridge out of Netty, i.e. no more Netty further exposed (maybe this isn't practical, | ||
there's also more things Netty does besides sockets, so api will be refined). | ||
```scala | ||
// similar to SocketHandler, a bridge out of Netty that exposes streams for I/O types and other TCP connection api methods | ||
class NettyPipeline[I, O](handlers: List[NettyHandler]) | ||
|
||
object FS2TcpServer { | ||
def make[F[_], I, O](pipeline: NettyPipeline[I, O]): Resource[F, Fs2Stream[F, ServerSocket[F, I, O]]] = ??? | ||
} | ||
Comment on lines
+112
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pretty much exactly this! |
||
|
||
object MySimpleHttpServer { | ||
|
||
private trait HttpSocket[F[_]] { | ||
def read(f: HttpRequest => F[HttpResponse]): F[Unit] | ||
} | ||
Comment on lines
+120
to
+122
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it can actually be a bit more general than this! It would be something like val server: Stream[F, Socket[F, HttpRequest, HttpResponse]] = Network[F].server(host, port, httpPipeline) // or something
val handler: HttpRequest => F[HttpResponse] = ???
val handles = server map { (socket: Socket[F, HttpRequest, HttpResponse]) =>
socket.reads
.evalMap(handler)
.through(socket.writes)
.onError(e => logF("caught exception during request", e))
.attempt
.void
}
handles.parJoin(8196) // limit to 8196 concurrent connections Does that kind of make sense? Basically we're getting a lot from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it does, I think.
and under the hood, the overly specific |
||
|
||
private[this] val httpPipeline = new NettyPipeline[HttpRequest, HttpResponse](handlers = Nil) // This would contain actual handlers | ||
|
||
def make[F[_]](): Resource[F, Fs2Stream[F, HttpSocket]] = | ||
FS2TcpServer.make(httpPipeline) | ||
.map(stream => stream.map(tcpServerSocket => new HttpSocket[F] { | ||
override def read(f: HttpRequest => F[HttpResponse]): F[Unit] = ??? // interface with tcpServerSocket | ||
})) | ||
} | ||
``` | ||
|
||
# Random extras | ||
```scala | ||
abstract class WebSocketFrame(underlying: NettyWebSocketFrame) | ||
trait DataFrame extends WebSocketFrame | ||
trait ControlFrame extends WebSocketFrame | ||
|
||
final case class BinaryFrame() extends DataFrame | ||
final case class TextFrame() extends DataFrame | ||
// Pipeline Framework may also handle this, but we should expose for advanced usage | ||
final case class ContinuationFrame() extends DataFrame | ||
|
||
final case class PingFrame() extends ControlFrame | ||
final case class PongFrame() extends ControlFrame | ||
final case class CloseFrame() extends ControlFrame | ||
|
||
trait CloseReason | ||
final case class ConnectionError(cause: Throwable) extends CloseReason | ||
case object TcpConnectionDropped extends CloseReason | ||
final case class FrameworkInitiated(closeFrame: CloseFrame) extends CloseReason | ||
final case class UserInitiated(closeFrame: CloseFrame) extends CloseReason | ||
Comment on lines
+136
to
+153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😳 This is cool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haha and to clarify, "user" is the higher layer sending a close frame via the FS2-Netty Streams. |
||
|
||
// Or Observer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this is actually where things will get SUPER awesome, because for websockets, I think we'll just expose a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup |
||
trait WebSocketListener[F[_]] { | ||
def connectionOpened( | ||
subprotocol: Option[String], | ||
headers: NettyHttpHeaders)( | ||
implicit context: NettyWebSocketContext | ||
): F[Unit] | ||
|
||
def connectionHandshakeError( | ||
cause: Throwable | ||
): F[Unit] | ||
|
||
def received(frame: WebSocketFrame)( | ||
implicit context: NettyWebSocketContext | ||
): F[Unit] | ||
|
||
def receivedAggregated(frame: DataFrame)( | ||
implicit context: NettyWebSocketContext | ||
): F[Unit] | ||
|
||
def connectionBackPressured() | ||
|
||
def connectionClosed(reason: CloseReason): F[Unit] | ||
} | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! Hmm. We may need to explicitly set this, then. If we don't have auto read set to
false
on the server socket, then we won't be able to backpressure new connections.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah for sure. I discovered perchance when digging through Netty source and saw a scheduled task for autoread. Thought that was weird, so made a note to investigate. We can just print Netty config during runtime. Best case, our setting is always accepted and this is false concern. Worse case, we have to schedule an autoread override at the right time so that it executes after Netty's 🙃 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be abjectly horrible. :-D Like complain to Norman on Twitter horrible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha will test it out :)