Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ox integration: SSE & WebSockets? #2158

Open
adamw opened this issue Apr 26, 2024 · 2 comments
Open

Add Ox integration: SSE & WebSockets? #2158

adamw opened this issue Apr 26, 2024 · 2 comments
Assignees

Comments

@adamw
Copy link
Member

adamw commented Apr 26, 2024

The default backend is already fully usable for direct style Scala, along with streaming (via InputStream bodies), and web sockets (using the WebSocket's blocking .send() and .receive() methods).

However, what we are missing is integration with SSE, streaming websockets (sometimes its more convenient to work this way) and high-level streaming (mapping over byte chunks or lines, for example).

That's why it might reasonable to have an Ox integration module. In fact, this is a joint Ox+sttp-client issue, as it requires changes in both. Going from the end:

  1. for support high-level streaming operations (on the byte-chunk or line level), we could add some I/O capabilities to Ox's Source, such as two-way conversions between an InputStream and a Source[byte chunk]; between an InputStream and a Source[String] (lines); and finally writing such sources to files/reading from files. What remains to be determined here is what's a good representation of a byte chunk. A simple Array[Byte]? ByteBuffer? Pekko's BytString?
  2. for web sockets, we could provide a fairly simple conversion between a WebSocket and a (Source[WebSocketFrame], Sink[WebSocketFrame]
  3. finally, for SSE, I do have some code, but it might need polishing. And it would be great to include it in the integration module:
def parseSse(is: InputStream)(using Ox): Source[ServerSentEvent] =
  val chunks = StageCapacity.newChannel[Array[Byte]]
  fork {
    try
      repeatWhile {
        val a = new Array[Byte](1024)
        val r = is.read(a)
        if r == -1 then
          chunks.done()
          false
        else
          chunks.send(a.take(r))
          true
      }
    catch case t: Throwable => chunks.errorSafe(t)
  }

  chunks
    .mapStatefulConcat(() => Array.empty[Byte]) { case (buffer, nextChunk) =>
      @tailrec
      def splitChunksAtNewLine(buf: Array[Byte], chunk: Array[Byte], acc: Vector[Array[Byte]])
          : (Array[Byte], Vector[Array[Byte]]) =
        val newlineIdx = chunk.indexOf('\n')
        if newlineIdx == -1 then (buf ++ chunk, acc)
        else
          val (chunk1, chunk2) = chunk.splitAt(newlineIdx + 1)
          splitChunksAtNewLine(Array.empty[Byte], chunk2, acc :+ (buffer ++ chunk1))

      val (newBuffer, toEmit) = splitChunksAtNewLine(buffer, nextChunk, Vector.empty)

      (newBuffer, toEmit)
    }
    .mapAsView(new String(_))
    .mapStatefulConcat(() => Vector.empty[String]) { case (acc, el) =>
      if el.isBlank then (Vector.empty, Some(acc)) else (acc :+ el.dropRight(1), Nil)
    }
    .map(lines => ServerSentEvent.parse(lines.asInstanceOf[Vector[String]].toList))

Example usage:

@main def sseClient(): Unit =
  supervised {
    basicRequest
      .post(uri"http://localhost:51823/sse/echo3")
      .body("1234567890")
      .response(asInputStreamAlways { is =>
        parseSse(is).foreach(el => println(s"Got: $el"))
        ()
      })
      .send()
      .body
  }
@kciesielski kciesielski self-assigned this May 6, 2024
@lbialy
Copy link

lbialy commented May 7, 2024

Wouldn't that be a asSSEAlways?

@adamw
Copy link
Member Author

adamw commented May 7, 2024

For all other streaming approaches we simply offer a SSE-parsing stream stage, e.g.: https://sttp.softwaremill.com/en/stable/backends/akka.html#server-sent-events

asSSEAlways is hard to do generically, as you don't know what's the stream type underneath. So it could be done, but would need to be backend-specific

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants