Skip to content

Commit

Permalink
2.x.x - Wrap InboundStream iterator in request body (#369)
Browse files Browse the repository at this point in the history
* Stick NIOAsyncChannel in request body streamer

* Clean up, remove task group in HTTPChannelHandler

* Add more checking in  HBStreamedRequestBody.AsyncIterator.next

* Fix benchmarks

* Fix finished request flush

* don't use function to flush body parts

* Add support for creating stream from byteBuffer, only allow one iterate of HBStreamedRequestBody

* 2.x.x - Use AnyAsyncSequence and creating request body streams (#373)

* Use AnyAsyncSequence to hold request body async sequence

* Make contents of HBRequestBody internal

* Remove HBRequestBody.collate(maxSize) as we can use `collect` now
  • Loading branch information
adam-fowler authored Feb 5, 2024
1 parent 6a678c9 commit 5c1f2ea
Show file tree
Hide file tree
Showing 14 changed files with 492 additions and 156 deletions.
28 changes: 15 additions & 13 deletions Benchmarks/Benchmarks/Router/RouterBenchmarks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ extension Benchmark {
context: Context.Type = BasicBenchmarkContext.self,
configuration: Benchmark.Configuration = Benchmark.defaultConfiguration,
request: HTTPRequest,
writeBody: @escaping @Sendable (HBStreamedRequestBody) async throws -> Void = { _ in },
writeBody: @escaping @Sendable (HBStreamedRequestBody.InboundStream.TestSource) async throws -> Void = { _ in },
setupRouter: @escaping @Sendable (HBRouter<Context>) async throws -> Void
) {
let router = HBRouter(context: Context.self)
Expand All @@ -60,15 +60,17 @@ extension Benchmark {
allocator: ByteBufferAllocator(),
logger: Logger(label: "Benchmark")
)
let requestBodyStream = HBStreamedRequestBody()
let requestBody = HBRequestBody.stream(requestBodyStream)
let (inbound, source) = NIOAsyncChannelInboundStream<HTTPRequestPart>.makeTestingStream()
let streamer = HBStreamedRequestBody(iterator: inbound.makeAsyncIterator())
let requestBody = HBRequestBody.stream(streamer)
let hbRequest = HBRequest(head: request, body: requestBody)
group.addTask {
let response = try await responder.respond(to: hbRequest, context: context)
_ = try await response.body.write(BenchmarkBodyWriter())
}
try await writeBody(requestBodyStream)
requestBodyStream.finish()
try await writeBody(source)
source.yield(.end(nil))
source.finish()
}
}
}
Expand Down Expand Up @@ -100,10 +102,10 @@ func routerBenchmarks() {
configuration: .init(warmupIterations: 10),
request: .init(method: .put, scheme: "http", authority: "localhost", path: "/")
) { bodyStream in
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
} setupRouter: { router in
router.put { request, _ in
let body = try await request.body.collate(maxSize: .max)
Expand All @@ -116,10 +118,10 @@ func routerBenchmarks() {
configuration: .init(warmupIterations: 10),
request: .init(method: .post, scheme: "http", authority: "localhost", path: "/")
) { bodyStream in
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
await bodyStream.send(buffer)
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
bodyStream.yield(.body(buffer))
} setupRouter: { router in
router.post { request, _ in
HBResponse(status: .ok, headers: [:], body: .init { writer in
Expand Down
2 changes: 1 addition & 1 deletion Sources/Hummingbird/Codable/JSON/JSONCoding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ extension JSONDecoder: HBRequestDecoder {
/// - type: Type to decode
/// - request: Request to decode from
public func decode<T: Decodable>(_ type: T.Type, from request: HBRequest, context: some HBBaseRequestContext) async throws -> T {
let buffer = try await request.body.collate(maxSize: context.maxUploadSize)
let buffer = try await request.body.collect(upTo: context.maxUploadSize)
return try self.decode(T.self, from: buffer)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ extension URLEncodedFormDecoder: HBRequestDecoder {
/// - type: Type to decode
/// - request: Request to decode from
public func decode<T: Decodable>(_ type: T.Type, from request: HBRequest, context: some HBBaseRequestContext) async throws -> T {
let buffer = try await request.body.collate(maxSize: context.maxUploadSize)
let buffer = try await request.body.collect(upTo: context.maxUploadSize)
let string = String(buffer: buffer)
return try self.decode(T.self, from: string)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Hummingbird/Exports.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@_exported import struct HummingbirdCore.HBHTTPError
@_exported import protocol HummingbirdCore.HBHTTPResponseError
@_exported import struct HummingbirdCore.HBRequest
@_exported import enum HummingbirdCore.HBRequestBody
@_exported import struct HummingbirdCore.HBRequestBody
@_exported import struct HummingbirdCore.HBResponse
@_exported import struct HummingbirdCore.HBResponseBody
@_exported import protocol HummingbirdCore.HBResponseBodyWriter
Expand Down
23 changes: 2 additions & 21 deletions Sources/Hummingbird/Files/FileIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ public struct HBFileIO: Sendable {
try? handle.close()
}
context.logger.debug("[FileIO] PUT", metadata: ["file": .string(path)])
switch contents {
case .byteBuffer(let buffer):
try await self.writeFile(buffer: buffer, handle: handle, on: eventLoop)
case .stream(let streamer):
try await self.writeFile(asyncSequence: streamer, handle: handle, on: eventLoop)
for try await buffer in contents {
try await self.fileIO.write(fileHandle: handle, buffer: buffer, eventLoop: eventLoop).get()
}
}

Expand Down Expand Up @@ -153,20 +150,4 @@ public struct HBFileIO: Sendable {
try handle.close()
}
}

/// write byte buffer to file
func writeFile(buffer: ByteBuffer, handle: NIOFileHandle, on eventLoop: EventLoop) async throws {
return try await self.fileIO.write(fileHandle: handle, buffer: buffer, eventLoop: eventLoop).get()
}

/// write output of streamer to file
func writeFile<BufferSequence: AsyncSequence>(
asyncSequence: BufferSequence,
handle: NIOFileHandle,
on eventLoop: EventLoop
) async throws where BufferSequence.Element == ByteBuffer {
for try await buffer in asyncSequence {
try await self.fileIO.write(fileHandle: handle, buffer: buffer, eventLoop: eventLoop).get()
}
}
}
4 changes: 2 additions & 2 deletions Sources/Hummingbird/Server/Request.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ extension HBRequest {
/// - Parameter context: request context
/// - Returns: Collated body
public mutating func collateBody(context: some HBBaseRequestContext) async throws -> ByteBuffer {
let byteBuffer = try await self.body.collate(maxSize: context.maxUploadSize)
self.body = .byteBuffer(byteBuffer)
let byteBuffer = try await self.body.collect(upTo: context.maxUploadSize)
self.body = .init(buffer: byteBuffer)
return byteBuffer
}

Expand Down
Loading

0 comments on commit 5c1f2ea

Please sign in to comment.