Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brain dump #6

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/fs2/netty/Network.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ final class Network[F[_]: Async] private (
Sync[F] delay {
val bootstrap = new ServerBootstrap
bootstrap.group(parent, child)
.option(JChannelOption.AUTO_READ.asInstanceOf[JChannelOption[Any]], false) // backpressure
// TODO: does netty override this? `ServerBootstrap.init` via `bind` override schedules a future to set to autoread to false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Hmm. We may need to explicitly set this, then. If we don't have auto read set to false on the server socket, then we won't be able to backpressure new connections.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for sure. I discovered perchance when digging through Netty source and saw a scheduled task for autoread. Thought that was weird, so made a note to investigate. We can just print Netty config during runtime. Best case, our setting is always accepted and this is false concern. Worse case, we have to schedule an autoread override at the right time so that it executes after Netty's 🙃 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be abjectly horrible. :-D Like complain to Norman on Twitter horrible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha will test it out :)

.option(JChannelOption.AUTO_READ.asInstanceOf[JChannelOption[Any]], false) // backpressure
.channel(serverChannelClazz)
.childHandler(initializer(disp)(sockets.offer))

Expand Down Expand Up @@ -163,8 +164,8 @@ object Network {

(instantiateR("server"), instantiateR("client")) mapN { (server, client) =>
try {
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int])
meth.invoke(server, new Integer(90)) // TODO tweak this a bit more; 100 was worse than 50 and 90 was a dramatic step up from both
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int]) // TODO: this method is set to be depcrecated in the future releases
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously? What's replacing it? The impact this had was substantial.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Epoll source this doesn't even do anything. For KQueue and NIO ELG this has a meaningful impact. As to what's replacing Epoll's method...🤷‍♂️ ha. So it's not really a universal interface for ELG's, but Netty makes it seem like it is.

One takeaway is that this creates a discrepancy between tuning on non-linux OS'es vs. other OS'es.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is really interesting!

I'm not that fussed about the tuning being a bit different on different OSes, though if it starts getting to the point where people might actually take that into account and build their applications and profiling to those kinds of parameters, then it might be an issue. At some point, differences between platforms are very much unavoidable. Like if you're profiling kqueue and using it to tune a server running against epoll or (even worse) io_uring, you're going to have a bad time with or without our meddling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

meth.invoke(server, new Integer(90)) // TODO tweak this a bit more; 100 was worse than 50 and 90 was a dramatic step up from both
meth.invoke(client, new Integer(90))
} catch {
case _: Exception => ()
Expand All @@ -174,6 +175,7 @@ object Network {
}
}

//TODO: Why not use the Netty methods
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there Netty methods for doing this kind of detection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

import io.netty.channel.epoll.Epoll
import io.netty.channel.kqueue.KQueue
if (Epoll.isAvailable) {epoll elg} else if (KQueue.isAvailable) {kqueue elg} else {nio elg}

// Don't see any the incubator package in current Netty version, but presumably it would look similar to above

Epoll source code for more details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So. Much. Nicer. Let's do this.

private[this] def uring() =
try {
if (sys.props.get("fs2.netty.use.io_uring").map(_.toBoolean).getOrElse(false)) {
Expand Down
179 changes: 179 additions & 0 deletions notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# FS2-Netty Developer Docs

## What is FS2-Netty
A general networking library for TCP and UDP based protocols over the Netty framework using typesafe FS2 streams.
It should be general enough to support protocols such as DNS, FTP, MQTT, Redis, HTTP/1.1/ HTTP/2, HTTP/3, Quic, etc.

(M. Mienko - I don't know FS2, so cannot comment on its role in detail. I will just write "stream" or "queue" where-ever makes sense :) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


## Key Questions for FS2-Netty
1) (API) How much of Netty should this library expose to users?
- How to expose Netty's pipeline, event-loop scheduler, channel, channel handlers, etc. such that users the
flexibility of Netty's pipeline and typesafe FP abstractions.
- At a minimum, stop exposing Netty pipeline after we produce `Stream[Socket]`
Comment on lines +10 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should have a Pipeline[I, O] which basically just consists of a bunch of handlers which will be added to the pipeline together with a final stage which just takes the messages and shovels them into a Queue using Dispatcher, the same way the current SocketHandler does with ByteBuf. We then generalize Socket to something like Socket[F, I, O], where the default is Socket[F, Byte, Byte] if you don't supply a pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that all makes sense.

This requirement is too strong since we need to allow pipeline mutations, e.g. HTTP to WS. We're back to exposing all of Netty again since users have access to handlers.

One thing to think about is Netty's custom events. Specifically IdleState events, for detecting lack of inbound or outbound channel activity. So this would imply another type Pipeline[I, O, U] where U is some Netty User triggered event. Or it's a custom ADT and users would also need to supply a mapping Any => Either[Throwable, U]. Basically this would interface with override def userEventTriggered(ctx: ChannelHandlerContext, evt: Any): Unit in Netty handlers. Then it could be another stream on Socket, i.e. events: Stream[F, U].

2) (Safety) When Netty constructs are exposed, how can it do safely do so? Safety is defined as:
- preventing users from crashing clients or servers
- preventing leaking resources such as connections or memory
- mitigating negative performance, namely around the socket operations
- prevent unintentionally protocol violations, if the connection uses a websocket protocol, then app should never
send an HTTP or Redis message. Likewise, TCP and UDP shouldn't mix.
Comment on lines +14 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky one. I'm not sure we can without really messing up the way that pipelines work. One thing we could do is, rather than having a big opaque Pipeline container which just specifies the final types, we could build a type-aligned sequence. Something like: Handler[InFrom, InTo, OutFrom, OutTo] where you can compose Handlers together in a type safe fashion. That sounds really awkward though, and it doesn't really prevent leaking resources or violating protocols.

Pipelines are super-unsafe, and that tends to make them fast, but it also makes them… super unsafe. I think the main way we can help people is by giving them the tools to get out of Netty as quickly as possible. Like the idea should be that you would use native Netty pipeline handlers for any protocol implementation that you find useful and performant (e.g. HTTP or WebSockets), but then all actual user code should be off in Fs2. In other words, users would be discouraged from defining new handlers, but using existing ones should be fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup agree with you here too. This part of the definition of safety should be weakened: "Using streams provided by FS2-Netty should prevent unintentionally protocol violations" or something like that.

3) (Liveliness) When an app creates a client or server and the "world" is happy (networking layer is working and
peer(s) are up), then how does it ensure messages are always received or sent?
Comment on lines +20 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. You mean just within the local state itself? How do we separate this from Netty, or the kernel for that matter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to capture the idea that " 'something' good will happen" and/or "there are no bugs". Haven't really thought this through 😅, but definitely within the local state itself. So I guess, Given that network layer is working ,And Netty is working, And FS2 is working, Then socket will send and receive packets...
I'll get back with more details or remove this.

4) (Robustness) When a peer goes down, there's a malicious peer, network fails, protocol is violated,
or machine resource usage is high, then how does the framework:
- gracefully handle these scenarios itself (auto-recover if possible and makes sense), or
- permit the user to handle them gracefully, or
- notify the user of this non-fatal error?
Comment on lines +22 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this I've thought about! So in the event that errors are encountered, they will be propagated into the stream, where they can be handled by the user. Users can then implement recovery semantics on the streams and attempt to reconnect, or they might have some other semantic, but either way the semantics of Stream itself will ensure that any resources downstream of fs2-netty will be gracefully cleaned up and any handlers shut down. This is the super-nice thing about Stream: it's safe for both producers and consumers to do crazy stuff with resources and processing and concurrency and stuff, and everything will get neatly tucked away if the stream ends at any point or if errors are raised.

Within Netty itself is kind of a different story. Basically anything that happens in pipeline handlers, you're on your own. :-P

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, essentially what SocketHandler does which is offer the exception to the queue. This raises error on the stream then?
Some errors we'd likely catch and report them as channel close/inactive. This happens around Epoll, but it's a technicality.

5) (Visibility) ...
6) (Efficiency + Performance) ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To this particular point, something I've thought about a lot is both efficiency and backpressure. Staying within Fs2's assumptions regarding upstream production (i.e. that we don't have some hidden unbounded queue that we're filling up while we wait to feed it things) basically gives us a ton of this stuff for free. Fs2 itself is kind of shockingly efficient, and the guarantees it offers around backpressure and resource safety are extremely strong out of the box.


## Use Cases
Below are the use-cases FS2-Netty needs to support.

### A Pub-Sub like System
This system uses the websocket protocol to integrate with clients. On the backend, messages can be sent via HTTP/1.1.
Published events from clients are arbitrarily processed. The following are requirements of a networking framework from
the point of view of developers building the pub-sub system.

#### High Level Requirements
HTTP:
- We want a simple (GET & POST) but robust (handles malformed requests) HTTP server that handles payloads on the order of KB's.
- We want HTTP connections to be efficient for client use, i.e. a client can pool connections and reuse them for multiple HTTP requests.
- We want the HTTP server to avoid leaking connections; it should detect and clean up dropped connections.
- We want the HTTP server to rotate client connections to load balance across multiple instances.
Comment on lines +40 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much of this can we just get for free out of Netty's default HTTP handler? I've never used it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is a mix of ReadTimeoutHandler and custom logic to respond to Netty's Idle events, namely close the connection, but this would outside Netty handlers.
I presume this would be custom logic all outside of netty, just schedule a single event. Could also be done with netty of course, but might as well use CE or whatever for this.

- We don't want to allow HTTP pipelining.
- We want HTTP logging (and metrics?).
- Access log with URI path, HTTP method, HTTP headers, payload size in bytes, response time, response code, and response headers.
- Metrics on how often the TCP connection is closed by the server and the client. How often client drops connections?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could actually be a fairly trivial Pipe we apply to the stream, I suspect, giving the downstream full control.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

- We don't need SSL since this service will be behind a Load Balancer that will terminate SSL connections.

WebSocket (at a minimum):
- We want the HTTP connection to transition to an arbitrary WebSocket connection, i.e. there's not a single WS controller with a single path for the whole server.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is fun. :-P I think that we basically just don't try to suppress handler's ability to mutate the pipeline. We really prevent it anyway, and by allowing it provided that handlers are able to within the Pipeline[In, Out] types, we get to keep type safety and get the efficiency that this feature allows

- We want to receive websocket frames.
- We want to backpressure the sender. For context, say our service proxies frames to other tcp servers. Furthermore,
one of those servers is slow. We want the client to slow down their rate without an explicit higher layer protocol
message. However, we can optimize this so that a single slow downstream does backpressure all other frames going to
other destinations. We maintain a queue of frames and only backpressure when queue is full. (Without an explicit
higher layer flow control or protocol other TCP, this will only mitigate the problem. Frames for different
downstreams are multiplexed over a single TCP connection, so once queue is full because a single slow consumer than
we block all other frame processing, a.k.a Head-of-line (HOL) blocking. This is partly a fundamental limitation of
TCP, i.e. there's no virtual streams. Such limitations are overcome in QUIC and HTTP/3.)
Either of these 2 approaches may work:
- Only read socket when we finish processing a message. Pub-Sub app may queue up the frame for later processing, or it may immedieiately process it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the approach currently taken in SocketHandler

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, going to presume this will be the way going forward. Curious if we'll need to interact with channelReadComplete for pipelines that automatically aggregate, e.g. an HTTP server with HttpObjectAggregator in the pipeline. Basically, might have to call read until channelReadComplete is hit. I never messed with this method. Will write tests around it, but think it's worth calling out.

- Give FS2-Netty a queue to fill and only backpressure when queue is full (or above a high watermark, start reading when below a low watermark).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we could emulate this directly in SocketHandler by widening the queue, or we can just allow downstreams to use prefetchN.

The one thing though is that backpressure can only be applied in this model to one stream at a time. By which I mean, downstream backpressure on one stream will result in that socket getting slower read() frequencies, while the other sockets might continue read()ing just as fast as normal. If I understand you correctly regarding HoL blockage, this isn't really a concern, but I figured I would call it out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you're right this is really a concern for this library.

- We want to send ws frames.
- When sending any frame, we want to know if it was written to the connection.
- When sending a close frame, then the frame itself should be sent, then the connection should be closed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one's fun. Right now, SocketHandler only terminates streams when the upstream Socket is closed. This is actually pretty graceful, though there are race conditions with writes (as you would expect). It wouldn't be hard to do better with websockets, though we would need to make the endpoint handler (which jumps from the netty pipeline into Fs2) aware of what a close frame is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though we would need to make the endpoint handler (which jumps from the netty pipeline into Fs2) aware of what a close frame is.

yup

- We want to know if we're writing too fast and respond to backpressure (namely TCP backpressure).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we don't toss the frame into a queue and forget about it, this should happen basically for free since the next write (from downstream) won't happen until we're done writing the previous frame.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I was alluding to override def channelWritabilityChanged(ctx: ChannelHandlerContext): Unit in netty handlers which is a proxy for TCP's backpressure (the OS'es buffer stops accepting bytes). Netty invokes that method if the bytes in its internal queue cross high or low watermarks.

I think I know what you mean, but have a few more questions. Will circle back later.

- We want to know why a websocket connection closed, what was the close code if any.

WebSocket (extras that will save us time dev time if framework can do these):
- We want to group connections, label an individual connection, write to a single connection, broadcast to group, close a single connection, and close all connections.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sound like really easy things to implement on top of the Stream/Pipe pair produced by fs2-netty, using other Fs2 primitives.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could definitely be done in FS2. Was thinking that we could also expose ChannelGroup just to leverage Netty, but this is an impl detail. I haven't played with ChannelGroup, so this is just a note.

- We want to know if connection is still alive.
- Close connection if it's unresponsive.
Comment on lines +72 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More annoying. Downstream can implement a heartbeating check mechanism to detect… things that you detect with a heartbeat. If TCP informs us that the socket is gone then the streams will be closed proactively and any associated resources cleaned up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, also can leverage Netty's ReadTimeoutHandler.

- We want metrics for WS server
- Count of different frame types
- The size of data frames
- Count of different close codes
Comment on lines +74 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely do this on top of the streams that we produce from fs2-netty. Basically all of those semantics work really nicely and composably downstream because of how Fs2 works.


General Server configs:
- We want to limit the number of TCP connections? To protect the server?
- Backpressure accepting connections (and reads) if memory is getting high?
E.g. Netty thread pool is queuing too many tasks in compute thread pool. Queue will hit OOM.
Comment on lines +80 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this is set up right now is the read() on the server socket will happen less frequently if the server is overloaded (naturally as it slows down). In general, most downstream consumers of the Network interface would be expected to use the bounded variant of parJoin. In the echo server I lazy-moded parJoinUnbounded because I knew we wouldn't get overwhelmed, but generally I think we would do something like parJoin(65536) or something like that. This functions as a hard TCP connection bound.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parJoin(65536)

and/or

parJoinUnbounded

I now understand these, so yeah that'll do the trick. :)


## Implementation thoughts for above use cases
Based on what currently exists and past discussions with additional modifications.

## Package Structure
- fs2-netty-core
- Netty
- wrappers
- ops
- fs2-netty-server
- tcp
- udp
- fs2-netty-client
- tcp
- udp
Comment on lines +88 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually kinda wanna squeeze these three together. My inspiration here is fs2-io: https://github.com/typelevel/fs2/tree/main/io/src/main/scala/fs2/io/net There's some nice API design in here, but one thing that's really cool is that it's just dead-simple to get a server or client stood up, using the Network[IO].client(...) or Network[IO].server(...) call. The only reason this can be made so simple is the fact that Stream is so powerful that a lot of the stuff you would normally have to worry about in the Network layer (e.g. backpressure or connection limiting) is just… handled for you.

Otherwise I agree though. Also I have literally never written a program that does anything with UDP and I feel deep shame.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes a lot sense to keep them as simple functions.

- fs2-netty-all
- fs2-netty-simple-http-server
- websockers
Comment on lines +99 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having some nicer wrappers around the existing pipeline handlers in these areas. So exactly what you're saying here, I think. :-) We should still allow users to wrap their own arbitrary handlers if they need to, but nice experiences out of the box are important.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly, there's the core and some user-friendly default pipelines to make people's lives easier.


### API
We'll look at how intermediate library/module authors will use FS2-Netty to build protocols for above use-cases.

#### TCP Server
The idea is that FS2-Netty users provide a custom Netty pipeline and probably some configs to `FS2TcpServer`, which
builds the server resource that exposes a Stream of `Socket`'s. Then users can map over that to produce their desired
api. `Socket` should be the bridge out of Netty, i.e. no more Netty further exposed (maybe this isn't practical,
there's also more things Netty does besides sockets, so api will be refined).
```scala
// similar to SocketHandler, a bridge out of Netty that exposes streams for I/O types and other TCP connection api methods
class NettyPipeline[I, O](handlers: List[NettyHandler])

object FS2TcpServer {
def make[F[_], I, O](pipeline: NettyPipeline[I, O]): Resource[F, Fs2Stream[F, ServerSocket[F, I, O]]] = ???
}
Comment on lines +112 to +116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much exactly this!


object MySimpleHttpServer {

private trait HttpSocket[F[_]] {
def read(f: HttpRequest => F[HttpResponse]): F[Unit]
}
Comment on lines +120 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can actually be a bit more general than this! It would be something like Socket[F, HttpRequest, HttpResponse]. Which really just boils down to a pair of two things: Stream[F, HttpRequest] and Pipe[F, HttpResponse, Unit]. So to write to the socket, you need to supply a Stream[F, HttpResponse], which you then feed to that pipe. To read from the socket, you simply consume from the stream of requests. Note that your Stream[F, HttpResponse] is probably a transformation of Stream[F, HttpRequest]. Thus, if you have a function HttpRequest => F[HttpResponse], you can get the obvious semantic really easily:

val server: Stream[F, Socket[F, HttpRequest, HttpResponse]] = Network[F].server(host, port, httpPipeline) // or something

val handler: HttpRequest => F[HttpResponse] = ???

val handles = server map { (socket: Socket[F, HttpRequest, HttpResponse]) =>
  socket.reads
    .evalMap(handler)
    .through(socket.writes)
    .onError(e => logF("caught exception during request", e))
    .attempt
    .void
}

handles.parJoin(8196)     // limit to 8196 concurrent connections

Does that kind of make sense? Basically we're getting a lot from Stream for free here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, I think.
So at a high level?

object ExampleHttpServer extends IOApp {
  private[this] val router = Kleisli[IO, FullHttpRequest, FullHttpResponse] {
    request => IO {
          new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1,
            HttpResponseStatus.NOT_FOUND
          )
        }
  }
  
  override def run(args: List[String]): IO[ExitCode] =
    HttpServer.start[IO](HttpServer.HttpConfigs(
      requestTimeoutPeriod = 500.milliseconds,
      HttpServer.HttpConfigs.Parsing.default
    ))
      .evalMap { httpClientConnections =>
        httpClientConnections
          .map(_.successfullyDecodedReads(router))
          .parJoin(65536)
          .compile
          .drain
      }
      .useForever
      .as(ExitCode.Success)
    
}

and under the hood, the overly specific successfullyDecodedReads method is the handles in your example. The HttpServerConnection in this PR (which I'll now call HttpClientConnection) will handle piping Stream[F, HttpRequest] through Pipe[F, HttpResponse, Unit] via Kleisli[F, FullHttpRequest, FullHttpResponse] transform.


private[this] val httpPipeline = new NettyPipeline[HttpRequest, HttpResponse](handlers = Nil) // This would contain actual handlers

def make[F[_]](): Resource[F, Fs2Stream[F, HttpSocket]] =
FS2TcpServer.make(httpPipeline)
.map(stream => stream.map(tcpServerSocket => new HttpSocket[F] {
override def read(f: HttpRequest => F[HttpResponse]): F[Unit] = ??? // interface with tcpServerSocket
}))
}
```

# Random extras
```scala
abstract class WebSocketFrame(underlying: NettyWebSocketFrame)
trait DataFrame extends WebSocketFrame
trait ControlFrame extends WebSocketFrame

final case class BinaryFrame() extends DataFrame
final case class TextFrame() extends DataFrame
// Pipeline Framework may also handle this, but we should expose for advanced usage
final case class ContinuationFrame() extends DataFrame

final case class PingFrame() extends ControlFrame
final case class PongFrame() extends ControlFrame
final case class CloseFrame() extends ControlFrame

trait CloseReason
final case class ConnectionError(cause: Throwable) extends CloseReason
case object TcpConnectionDropped extends CloseReason
final case class FrameworkInitiated(closeFrame: CloseFrame) extends CloseReason
final case class UserInitiated(closeFrame: CloseFrame) extends CloseReason
Comment on lines +136 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😳 This is cool.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha and to clarify, "user" is the higher layer sending a close frame via the FS2-Netty Streams.


// Or Observer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is actually where things will get SUPER awesome, because for websockets, I think we'll just expose a Socket[F, WebSocketFrame, WebSocketFrame]. So in other words, a socket where you can send and receive websocket frames, just as how without any pipeline handlers we get a socket where you can send and receive bytes, and with the http handler we get a socket where you can send and receive HttpRequest/HttpResponse, respectively.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

trait WebSocketListener[F[_]] {
def connectionOpened(
subprotocol: Option[String],
headers: NettyHttpHeaders)(
implicit context: NettyWebSocketContext
): F[Unit]

def connectionHandshakeError(
cause: Throwable
): F[Unit]

def received(frame: WebSocketFrame)(
implicit context: NettyWebSocketContext
): F[Unit]

def receivedAggregated(frame: DataFrame)(
implicit context: NettyWebSocketContext
): F[Unit]

def connectionBackPressured()

def connectionClosed(reason: CloseReason): F[Unit]
}
```