Skip to content

Commit

Permalink
Add AsyncSequenceResponseBodyStreamer (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler authored Feb 16, 2023
1 parent d6056f4 commit 9042192
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import HummingbirdCore

/// Response body streamer which uses an AsyncSequence as its input.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class AsyncSequenceResponseBodyStreamer<ByteBufferSequence: AsyncSequence>: HBResponseBodyStreamer where ByteBufferSequence.Element == ByteBuffer {
var iterator: ByteBufferSequence.AsyncIterator

init(_ asyncSequence: ByteBufferSequence) {
self.iterator = asyncSequence.makeAsyncIterator()
}

func read(on eventLoop: EventLoop) -> EventLoopFuture<HBStreamerOutput> {
let promise = eventLoop.makePromise(of: HBStreamerOutput.self)
promise.completeWithTask {
if let buffer = try await self.iterator.next() {
return .byteBuffer(buffer)
} else {
return .end
}
}
return promise.futureResult
}
}

/// Extend AsyncThrowingStream to conform to `HBResponseGenerator` so it can be returned
/// from a route
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension AsyncThrowingStream: HBResponseGenerator where Element == ByteBuffer {
/// Return self as the response
public func response(from request: HBRequest) -> HBResponse {
return .init(status: .ok, body: .stream(AsyncSequenceResponseBodyStreamer(self)))
}
}

/// Extend AsyncStream to conform to `HBResponseGenerator` so it can be returned from
/// a route
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension AsyncStream: HBResponseGenerator where Element == ByteBuffer {
/// Return self as the response
public func response(from request: HBRequest) -> HBResponse {
return .init(status: .ok, body: .stream(AsyncSequenceResponseBodyStreamer(self)))
}
}

/// Wrapper object for AsyncSequence that conforms to `HBResponseGenerator`
///
/// This can be returned from a route to generate a response that includes the
/// sequence of ByteBuffers as its payload.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct AsyncSequenceResponseGenerator<ByteBufferSequence: AsyncSequence>: HBResponseGenerator where ByteBufferSequence.Element == ByteBuffer {
let asyncSequence: ByteBufferSequence

/// Return self as the response
public func response(from request: HBRequest) -> HBResponse {
return .init(status: .ok, body: .stream(AsyncSequenceResponseBodyStreamer(self.asyncSequence)))
}
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension AsyncSequence where Element == ByteBuffer {
public typealias ResponseGenerator = AsyncSequenceResponseGenerator<Self>
/// Return type that conforms to `HBResponseGenerator` that will serialize contents of sequence
///
/// Preferably I would like to conform `AsyncSequence` to `HBResponseGenerator` but it is not
/// possible to add conformances to protocols in extensions. So the solution is to return
/// another object which wraps the `AsyncSequence`
public var responseGenerator: ResponseGenerator { .init(asyncSequence: self) }
}

#if compiler(>=5.6)
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
// can guarantee Sendable because the read function is only ever called on the same EventLoop
extension AsyncSequenceResponseBodyStreamer: @unchecked Sendable {}
#endif // compiler(>=5.6)
#endif // compiler(>=5.5.2) && canImport(_Concurrency)
55 changes: 55 additions & 0 deletions Tests/HummingbirdTests/AsyncAwaitTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#if compiler(>=5.5.2) && canImport(_Concurrency)

import Hummingbird
import HummingbirdCore
import HummingbirdXCT
import NIOHTTP1
import XCTest
Expand Down Expand Up @@ -130,6 +131,60 @@ final class AsyncAwaitTests: XCTestCase {
XCTAssertEqual(String(buffer: body), "530001")
}
}

/// Test streaming of response via AsyncSequence
func testResponseAsyncSequence() throws {
#if os(macOS)
// disable macOS tests in CI. GH Actions are currently running this when they shouldn't
guard HBEnvironment().get("CI") != "true" else { throw XCTSkip() }
#endif
let app = HBApplication(testing: .asyncTest)
app.router.get("buffer", options: .streamBody) { request -> HBRequestBodyStreamerSequence.ResponseGenerator in
guard let stream = request.body.stream else { throw HBHTTPError(.badRequest) }
return stream.sequence.responseGenerator
}

try app.XCTStart()
defer { app.XCTStop() }

let buffer = self.randomBuffer(size: 530_001)
try app.XCTExecute(uri: "/buffer", method: .GET, body: buffer) { response in
XCTAssertEqual(response.status, .ok)
XCTAssertEqual(response.body, buffer)
}
}

/// Test streaming of response via AsyncSequence
func testResponseAsyncStream() throws {
#if os(macOS)
// disable macOS tests in CI. GH Actions are currently running this when they shouldn't
guard HBEnvironment().get("CI") != "true" else { throw XCTSkip() }
#endif
let app = HBApplication(testing: .asyncTest)
app.router.get("alphabet") { _ in
AsyncStream<ByteBuffer> { cont in
let alphabet = "abcdefghijklmnopqrstuvwxyz"
var index = alphabet.startIndex
while index != alphabet.endIndex {
let nextIndex = alphabet.index(after: index)
let buffer = ByteBufferAllocator().buffer(substring: alphabet[index..<nextIndex])
cont.yield(buffer)
index = nextIndex
}
cont.finish()
}
}

try app.XCTStart()
defer { app.XCTStop() }

let buffer = self.randomBuffer(size: 530_001)
try app.XCTExecute(uri: "/alphabet", method: .GET, body: buffer) { response in
let body = try XCTUnwrap(response.body)
XCTAssertEqual(response.status, .ok)
XCTAssertEqual(String(buffer: body), "abcdefghijklmnopqrstuvwxyz")
}
}
}

#endif // compiler(>=5.5) && canImport(_Concurrency)

0 comments on commit 9042192

Please sign in to comment.