Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brain dump #6

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Brain dump #6

wants to merge 1 commit into from

Conversation

mmienko
Copy link
Collaborator

@mmienko mmienko commented Feb 9, 2021

No description provided.

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this took me so long to dig through! This is awesome.

@@ -85,7 +85,8 @@ final class Network[F[_]: Async] private (
Sync[F] delay {
val bootstrap = new ServerBootstrap
bootstrap.group(parent, child)
.option(JChannelOption.AUTO_READ.asInstanceOf[JChannelOption[Any]], false) // backpressure
// TODO: does netty override this? `ServerBootstrap.init` via `bind` override schedules a future to set to autoread to false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Hmm. We may need to explicitly set this, then. If we don't have auto read set to false on the server socket, then we won't be able to backpressure new connections.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for sure. I discovered perchance when digging through Netty source and saw a scheduled task for autoread. Thought that was weird, so made a note to investigate. We can just print Netty config during runtime. Best case, our setting is always accepted and this is false concern. Worse case, we have to schedule an autoread override at the right time so that it executes after Netty's 🙃 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be abjectly horrible. :-D Like complain to Norman on Twitter horrible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha will test it out :)

@@ -163,8 +164,8 @@ object Network {

(instantiateR("server"), instantiateR("client")) mapN { (server, client) =>
try {
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int])
meth.invoke(server, new Integer(90)) // TODO tweak this a bit more; 100 was worse than 50 and 90 was a dramatic step up from both
val meth = eventLoopClazz.getDeclaredMethod("setIoRatio", classOf[Int]) // TODO: this method is set to be depcrecated in the future releases
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously? What's replacing it? The impact this had was substantial.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Epoll source this doesn't even do anything. For KQueue and NIO ELG this has a meaningful impact. As to what's replacing Epoll's method...🤷‍♂️ ha. So it's not really a universal interface for ELG's, but Netty makes it seem like it is.

One takeaway is that this creates a discrepancy between tuning on non-linux OS'es vs. other OS'es.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is really interesting!

I'm not that fussed about the tuning being a bit different on different OSes, though if it starts getting to the point where people might actually take that into account and build their applications and profiling to those kinds of parameters, then it might be an issue. At some point, differences between platforms are very much unavoidable. Like if you're profiling kqueue and using it to tune a server running against epoll or (even worse) io_uring, you're going to have a bad time with or without our meddling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -174,6 +175,7 @@ object Network {
}
}

//TODO: Why not use the Netty methods
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there Netty methods for doing this kind of detection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

import io.netty.channel.epoll.Epoll
import io.netty.channel.kqueue.KQueue
if (Epoll.isAvailable) {epoll elg} else if (KQueue.isAvailable) {kqueue elg} else {nio elg}

// Don't see any the incubator package in current Netty version, but presumably it would look similar to above

Epoll source code for more details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So. Much. Nicer. Let's do this.

A general networking library for TCP and UDP based protocols over the Netty framework using typesafe FS2 streams.
It should be general enough to support protocols such as DNS, FTP, MQTT, Redis, HTTP/1.1/ HTTP/2, HTTP/3, Quic, etc.

(M. Mienko - I don't know FS2, so cannot comment on its role in detail. I will just write "stream" or "queue" where-ever makes sense :) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +10 to +13
1) (API) How much of Netty should this library expose to users?
- How to expose Netty's pipeline, event-loop scheduler, channel, channel handlers, etc. such that users the
flexibility of Netty's pipeline and typesafe FP abstractions.
- At a minimum, stop exposing Netty pipeline after we produce `Stream[Socket]`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should have a Pipeline[I, O] which basically just consists of a bunch of handlers which will be added to the pipeline together with a final stage which just takes the messages and shovels them into a Queue using Dispatcher, the same way the current SocketHandler does with ByteBuf. We then generalize Socket to something like Socket[F, I, O], where the default is Socket[F, Byte, Byte] if you don't supply a pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that all makes sense.

This requirement is too strong since we need to allow pipeline mutations, e.g. HTTP to WS. We're back to exposing all of Netty again since users have access to handlers.

One thing to think about is Netty's custom events. Specifically IdleState events, for detecting lack of inbound or outbound channel activity. So this would imply another type Pipeline[I, O, U] where U is some Netty User triggered event. Or it's a custom ADT and users would also need to supply a mapping Any => Either[Throwable, U]. Basically this would interface with override def userEventTriggered(ctx: ChannelHandlerContext, evt: Any): Unit in Netty handlers. Then it could be another stream on Socket, i.e. events: Stream[F, U].

Comment on lines +99 to +100
- fs2-netty-simple-http-server
- websockers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having some nicer wrappers around the existing pipeline handlers in these areas. So exactly what you're saying here, I think. :-) We should still allow users to wrap their own arbitrary handlers if they need to, but nice experiences out of the box are important.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly, there's the core and some user-friendly default pipelines to make people's lives easier.

Comment on lines +112 to +116
class NettyPipeline[I, O](handlers: List[NettyHandler])

object FS2TcpServer {
def make[F[_], I, O](pipeline: NettyPipeline[I, O]): Resource[F, Fs2Stream[F, ServerSocket[F, I, O]]] = ???
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much exactly this!

Comment on lines +120 to +122
private trait HttpSocket[F[_]] {
def read(f: HttpRequest => F[HttpResponse]): F[Unit]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can actually be a bit more general than this! It would be something like Socket[F, HttpRequest, HttpResponse]. Which really just boils down to a pair of two things: Stream[F, HttpRequest] and Pipe[F, HttpResponse, Unit]. So to write to the socket, you need to supply a Stream[F, HttpResponse], which you then feed to that pipe. To read from the socket, you simply consume from the stream of requests. Note that your Stream[F, HttpResponse] is probably a transformation of Stream[F, HttpRequest]. Thus, if you have a function HttpRequest => F[HttpResponse], you can get the obvious semantic really easily:

val server: Stream[F, Socket[F, HttpRequest, HttpResponse]] = Network[F].server(host, port, httpPipeline) // or something

val handler: HttpRequest => F[HttpResponse] = ???

val handles = server map { (socket: Socket[F, HttpRequest, HttpResponse]) =>
  socket.reads
    .evalMap(handler)
    .through(socket.writes)
    .onError(e => logF("caught exception during request", e))
    .attempt
    .void
}

handles.parJoin(8196)     // limit to 8196 concurrent connections

Does that kind of make sense? Basically we're getting a lot from Stream for free here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, I think.
So at a high level?

object ExampleHttpServer extends IOApp {
  private[this] val router = Kleisli[IO, FullHttpRequest, FullHttpResponse] {
    request => IO {
          new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1,
            HttpResponseStatus.NOT_FOUND
          )
        }
  }
  
  override def run(args: List[String]): IO[ExitCode] =
    HttpServer.start[IO](HttpServer.HttpConfigs(
      requestTimeoutPeriod = 500.milliseconds,
      HttpServer.HttpConfigs.Parsing.default
    ))
      .evalMap { httpClientConnections =>
        httpClientConnections
          .map(_.successfullyDecodedReads(router))
          .parJoin(65536)
          .compile
          .drain
      }
      .useForever
      .as(ExitCode.Success)
    
}

and under the hood, the overly specific successfullyDecodedReads method is the handles in your example. The HttpServerConnection in this PR (which I'll now call HttpClientConnection) will handle piping Stream[F, HttpRequest] through Pipe[F, HttpResponse, Unit] via Kleisli[F, FullHttpRequest, FullHttpResponse] transform.

Comment on lines +136 to +153
abstract class WebSocketFrame(underlying: NettyWebSocketFrame)
trait DataFrame extends WebSocketFrame
trait ControlFrame extends WebSocketFrame

final case class BinaryFrame() extends DataFrame
final case class TextFrame() extends DataFrame
// Pipeline Framework may also handle this, but we should expose for advanced usage
final case class ContinuationFrame() extends DataFrame

final case class PingFrame() extends ControlFrame
final case class PongFrame() extends ControlFrame
final case class CloseFrame() extends ControlFrame

trait CloseReason
final case class ConnectionError(cause: Throwable) extends CloseReason
case object TcpConnectionDropped extends CloseReason
final case class FrameworkInitiated(closeFrame: CloseFrame) extends CloseReason
final case class UserInitiated(closeFrame: CloseFrame) extends CloseReason
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😳 This is cool.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha and to clarify, "user" is the higher layer sending a close frame via the FS2-Netty Streams.

final case class FrameworkInitiated(closeFrame: CloseFrame) extends CloseReason
final case class UserInitiated(closeFrame: CloseFrame) extends CloseReason

// Or Observer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is actually where things will get SUPER awesome, because for websockets, I think we'll just expose a Socket[F, WebSocketFrame, WebSocketFrame]. So in other words, a socket where you can send and receive websocket frames, just as how without any pipeline handlers we get a socket where you can send and receive bytes, and with the http handler we get a socket where you can send and receive HttpRequest/HttpResponse, respectively.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants