-
Notifications
You must be signed in to change notification settings - Fork 787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce MultipartReceiver for custom, fail-fast multipart decoding #7411
base: series/0.23
Are you sure you want to change the base?
Conversation
Moved MultipartReceiver/PartReceiver code from my prototype/sandbox area and integrated into MultipartDecoder
Also, add a bunch of doc comments, and fiddle with the PartialApply pattern.
Pull.raiseError[F](MalformedMessageBodyFailure("Malformed Malformed match")) | ||
Pull.raiseError[F](MalformedMessageBodyFailure("Malformed match")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Malformed Malformed Malformed ... seemed like a typo to me
Not sure of the right syntax for linking methods in scaladoc, but just the object+name didn't cut it. Switching to `backticks` syntax to appease the build for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long feedback delay. This PR is a huge one! Due to its size, I believe many of us might feel intimidated. Therefore, please bear with us as it may take some time to review this by the team thoroughly. Likely, this is going to be an iterative process.
I've flipped through the PR, so I'll start with possible silly questions.
- Current
MultipartDecoder
s are represented as functions operating withEntityDecoder
,Resource
andMultipart
. Also, they're private implementations. In turn,PartReceiver
andMultipartReceiver
are defined as public. Do we need to keep them public? Introducing new entities complicates the comprehension of relatively simple concepts like Multipart. - If
PartReceiver
is meant to be used only within theMultipartReceiver
context, could we somehow combine them into one entity?
core/shared/src/main/scala/org/http4s/multipart/MultipartDecoder.scala
Outdated
Show resolved
Hide resolved
*/ | ||
private[this] def decodePartEventsSupervised[F[_], A]( | ||
supervisor: Supervisor[F], | ||
partDecoder: Part[F] => DecodeResult[Resource[F, *], A], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DecodeResult[Resource[F, *], A]
shouldn't this be inverted inside out in a sense of effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revisit this next week when I have some more time, and it's been a little while since my head was fully around all this, but I think it was important for Resource
to be the outer type. That said, it might be possible to instead use Part[F] => Resource[F, EntityDecoder[F, A]]
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, if I change to partDecoder: Part[F] => Resource[DecodeResult[F, *], A]
then things get complicated...
With DecodeResult[Resource[F, *], A]
I can just unwrap things with .value
and have a Resource[F, Either[DecodeFailure, A]]
which keeps me in F
for the supervisor and the side-effects.
With Resource[DecodeResult[F, *], A]
the side-effects need to be lifted into EitherT
and the useForever
call needs to be somehow converted from a Fiber[DecodeResult, Nothing]
back down to a Fiber[F, Nothing]
.
supervisor.supervise[Nothing] {
partDecoder(Part(ps.value, channel.stream.unchunks)).value.attempt.evalTap { r =>
resultPromise.complete(r.flatten) *> channel.close
}.useForever.value.map {
??? // what to do with an Either[DecodeError, Nothing]
}
}
With Resource[F, EntityDecoder[F, A]]
it's a little better but the Part's body stream has to be constructed/referenced twice, which seems pitfall-y given that the body is coming from a Channel in this context:
supervisor.supervise[Nothing] {
partDecoder(Part(ps.value, channel.stream.unchunks)).evalMap { decoder =>
decoder.decode(Part(ps.value, channel.stream.unchunks), strict = false).value
}.attempt.evalTap { r =>
resultPromise.complete(r.flatten) *> channel.close
}.useForever
}
This problem would be alleviated if I used Headers
instead of Part
as the argument to partDecoder
, but the typical implementations want to inspect the name/filename values, which currently come from the Part
class, so they'd have to copy Part's implementation logic in order to compensate.
I'm thinking the best option is to just leave this as-is.
core/shared/src/main/scala/org/http4s/multipart/MultipartParser.scala
Outdated
Show resolved
Hide resolved
core/shared/src/main/scala/org/http4s/multipart/MultipartReceiver.scala
Outdated
Show resolved
Hide resolved
core/shared/src/main/scala/org/http4s/multipart/MultipartReceiver.scala
Outdated
Show resolved
Hide resolved
core/shared/src/main/scala/org/http4s/multipart/PartReceiver.scala
Outdated
Show resolved
Hide resolved
core/shared/src/main/scala/org/http4s/multipart/MultipartReceiver.scala
Outdated
Show resolved
Hide resolved
I think it should be possible to replace I think |
private def readToBuffer[F[_]: Files: MonadThrow]( | ||
input: Stream[F, Byte], | ||
maxSizeBeforeFile: Int, | ||
chunkSize: Int, | ||
)(implicit c: Compiler[F, F]): Resource[F, Stream[F, Byte]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apologize if I am missing something, but I cannot figure this out: what it the point of creating a Resource
of Stream
here? The Stream
type is already a scoped thing, it should not require additional scope managing. Moreover, I cannot figure out where the Resource
created would be managing any resource per se: looks like it is either Resource.pure
or Resource.suspend
. Neither of them introduce any new scope for further management. Moreover, the latter Resource.suspend
does not seem necessary at all in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok, I see now – it is used for Files[F].tempFile
. Sorry, missed it out.
Anyway, it does not seem necessary to enclose Stream
into Resource
, something like this can be used instead:
Stream.resource(Files[F].tempFile(...)).flatMap { path =>
Files[F].readAll(path)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Responding from my phone; apologies for any errors...
readToBuffer
is my attempt to represent the existing functionality of parseToPartsSupervisedFile
in terms of the new PartReceiver
trait. This method is a private implementation detail of toMixedBuffer
, defined a few lines up from here.
The intention here is that allocation of the resource is what causes the part body to be collected to a buffer (which is either an in-memory chunk a temp file), and the returned stream acts as a facade for that buffer while it is still valid. Reading that steam multiple times should not have to re-create the temp file, so I don't think I can Construct it the way you suggest.
maxSizeBeforeFile: Int, | ||
chunkSize: Int, | ||
)(implicit c: Compiler[F, F]): Resource[F, Stream[F, Byte]] = { | ||
final case class Acc(bytes: Stream[Pure, Byte], bytesSize: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the way it is used in the code, it seems that the entire Acc
class can be replaced with just one Chunk
, which already is aware of its size therefore no additional field is required to pass it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very nice work, and I'm sorry it took me so long to get around to it.
val keepPulling = F.pure(true) | ||
val stopPulling = F.pure(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like these values could be cached, but it's not a big deal.
implicit def multipartReceiverApplicative[F[_]]: Applicative[MultipartReceiver[F, *]] = | ||
new MultipartReceiverApplicative[F] | ||
|
||
private class MultipartReceiverApplicative[F[_]] extends Applicative[MultipartReceiver[F, *]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An instance of an unsealed type makes me a little nervous. I'm getting too sleepy to think about whether the abstract methods could affect the legality of the instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what you're saying here. Is the issue with defining the private class
that extends Applicative
, as opposed to just returning an anonymous class from def multipartReceiverApplicative
? Or is this more related to "cats laws" legality?
Assuming it's about cats laws, I initially inquired about this in the typelevel discord's #cats channel - here. I got the impression that the "order matters" nature of decide
when combining MultipartReceiver
s was acceptable.
private def partName(headers: Headers) = | ||
headers.get[`Content-Disposition`].flatMap(_.parameters.get(ci"name")) | ||
private def partFilename(headers: Headers) = | ||
headers.get[`Content-Disposition`].flatMap(_.parameters.get(ci"filename")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I can just construct a Part(headers, Stream.empty)
and delegate to its name
and filename
methods?
|
||
/** Creates a PartReceiver that ignores the part body. */ | ||
def ignore[F[_]]: PartReceiver[F, Unit] = | ||
_ => Resource.pure(Right(())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is Resource.unit
, or maybe Applicative[Resource[F, *]].unit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless that *
is doing some heavy lifting, I think I'd still have to .map(Right(_))
it, since the abstract method being fulfilled here is expected to return Resource[F, Either[DecodeFailure, A]]
.
)(implicit | ||
files: Files[F], | ||
mt: MonadThrow[F], | ||
c: Compiler[F, F], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been warned by the fs2 authors to not take Compiler
as a constraint. Maybe it's just better to slap a Concurrent
on F
?
(filename, _) => Part.fileData(name, filename, bytes[F], headers: _*), | ||
) | ||
} | ||
object PartValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to me that there's no OfBytes
. It has to go to a file to be a binary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. PartValue
was intended as the representation of a decoded part from MultipartReceiver.auto
, which picks between decoding the part to a String or dumping it to a File depending on the part headers. An OfBytes
case would imply the auto
would sometimes decide not to decode a regular part to text, and I don't know offhand what kind of edge cases might motivate that.
If there was a relevant "business logic" motiviation, then the user wouldn't use MultipartReceiver.auto
; instead they'd set up a custom MultipartReceiver that decides to use PartReceiver.toMixedBuffer
for arbitrary "bytes" receiving.
You bringing this up made me realize that the docs I wrote on trait PartValue
are wrongfully referencing PartReceiver.toMixedBuffer
. Maybe correcting that will be enough?
Sorry for the long delay on this last round of changes.. I think I've addressed everybody's comments, either by making changes or replying with a question or argument. |
Fixes #7408
Introduces two new types;
PartReceiver
andMultipartReceiver
. A PartReceiver is like anEntityDecoder
but with its result represented as aResource
. A MultipartReceiver decides what PartReceiver to use for a part based on the part's headers. A MultipartReceiver doesn't do any explicit buffering; the part bodies are decoded as they are received. A MultipartReceiver can get wrapped up as anEntityDecoder
, using the same supervisor-based semantics asmixedMultipartResource
.To solve the issues I brought up in #7408:
.ignoreUnexpectedParts
method to just skip past unexpected parts without consuming them or raising an error)PartReceiver.toTempFile
, which gets you aPath
to the temp file (the underlying supervisor resource will delete the temp file when released).withSizeLimit(n)
method which will cause it to raise a DecodeFailure and stop pulling after the limit is hit. This allows business logic to protect against arbitrarily-large (even infinite-sized) individual parts.Path
from one part, and aString
from another part.