Skip to content

Commit

Permalink
normalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
wangcheng committed Mar 31, 2024
1 parent 288f41f commit 95345bc
Show file tree
Hide file tree
Showing 10 changed files with 1,542 additions and 599 deletions.
22 changes: 11 additions & 11 deletions packages/connect-miniprogram/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@
"format": "eslint ./src --ext ts,tsx --fix && prettier ./src --write"
},
"dependencies": {
"headers-polyfill": "^4.0.2"
"headers-polyfill": "^4.0.3"
},
"peerDependencies": {
"@bufbuild/protobuf": "^1.4.1",
"@connectrpc/connect": "^1.1.3"
},
"devDependencies": {
"@bufbuild/protobuf": "^1.4.1",
"@connectrpc/connect": "^1.1.3",
"@typescript-eslint/eslint-plugin": "^6.9.0",
"@typescript-eslint/parser": "^6.9.0",
"eslint": "^8.52.0",
"eslint-plugin-simple-import-sort": "^10.0.0",
"@bufbuild/protobuf": "^1.8.0",
"@connectrpc/connect": "^1.4.0",
"@typescript-eslint/eslint-plugin": "^7.4.0",
"@typescript-eslint/parser": "^7.4.0",
"eslint": "^8.57.0",
"eslint-plugin-simple-import-sort": "^12.0.0",
"jest": "^29.7.0",
"miniprogram-api-typings": "^3.12.1",
"prettier": "^3.0.3",
"ts-jest": "^29.1.1",
"typescript": "^5.2.2"
"miniprogram-api-typings": "^3.12.2",
"prettier": "^3.2.5",
"ts-jest": "^29.1.2",
"typescript": "^5.4.3"
}
}
71 changes: 67 additions & 4 deletions packages/connect-miniprogram/src/connect/connect-transport-wx.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* @see https://github.com/connectrpc/connect-es/blob/main/packages/connect-web/src/connect-transport.ts
*/

import type {
AnyMessage,
JsonValue,
Expand All @@ -6,28 +10,34 @@ import type {
PartialMessage,
ServiceType,
} from '@bufbuild/protobuf';
import { MethodKind } from '@bufbuild/protobuf';
import type {
ContextValues,
StreamResponse,
Transport,
UnaryResponse,
} from '@connectrpc/connect';
import { appendHeaders } from '@connectrpc/connect';
import { ConnectError } from '@connectrpc/connect';
import type { EnvelopedMessage } from '@connectrpc/connect/protocol';
import {
createClientMethodSerializers,
createMethodUrl,
} from '@connectrpc/connect/protocol';
import { encodeEnvelope } from '@connectrpc/connect/protocol';
import {
errorFromJson,
requestHeader,
trailerDemux,
validateResponse,
} from '@connectrpc/connect/protocol-connect';
import {
endStreamFlag,
endStreamFromJson,
} from '@connectrpc/connect/protocol-connect';
import { headersToObject } from 'headers-polyfill';

import { warnUnsupportedOptions } from './compatbility';
import { createRequestBody } from './message-body/create';
import { parseResponseBody } from './message-body/parse-connect';
import { normalize, normalizeIterable } from './protocol/normalize';
import type { CreateTransportOptions } from './types';
import {
Expand Down Expand Up @@ -77,6 +87,7 @@ export function createConnectTransport(
useBinaryFormat,
timeoutMs,
header,
false,
);

reqHeader.delete('User-Agent');
Expand Down Expand Up @@ -139,6 +150,57 @@ export function createConnectTransport(
options.binaryOptions,
);

async function* parseResponseBody(
body: AsyncGenerator<EnvelopedMessage>,
trailerTarget: Headers,
header: Headers,
) {
try {
let endStreamReceived = false;
for (;;) {
const result = await body.next();
if (result.done) {
break;
}
const { flags, data } = result.value;
if ((flags & endStreamFlag) === endStreamFlag) {
endStreamReceived = true;
const endStream = endStreamFromJson(data);
if (endStream.error) {
const error = endStream.error;
header.forEach((value, key) => {
error.metadata.append(key, value);
});
throw error;
}
endStream.metadata.forEach((value, key) =>
trailerTarget.set(key, value),
);
continue;
}
yield parse(data);
}
if (!endStreamReceived) {
throw 'missing EndStreamResponse';
}
} catch (e) {
throw ConnectError.from(e);
}
}

async function createRequestBody(
input: AsyncIterable<I>,
): Promise<Uint8Array> {
if (method.kind != MethodKind.ServerStreaming) {
throw 'Weixin does not support streaming request bodies';
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw 'missing request message';
}
return encodeEnvelope(0, serialize(r.value));
}

timeoutMs =
timeoutMs === undefined
? options.defaultTimeoutMs
Expand All @@ -152,12 +214,13 @@ export function createConnectTransport(
useBinaryFormat,
timeoutMs,
header,
false,
);

reqHeader.delete('User-Agent');

const reqMessage = normalizeIterable(method.I, input);
const body = await createRequestBody(reqMessage, serialize, method);
const body = await createRequestBody(reqMessage);
const response = await requestAsAsyncIterable({
url,
header: headersToObject(reqHeader),
Expand All @@ -168,7 +231,7 @@ export function createConnectTransport(
const resMessage = parseResponseBody(
response.messageStream,
trailerTarget,
parse,
response.header,
);
return {
service,
Expand Down
7 changes: 7 additions & 0 deletions packages/connect-miniprogram/src/connect/envelope.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
/**
* @see https://github.com/connectrpc/connect-es/blob/main/packages/connect/src/protocol/envelope.ts
*/

import { Code, ConnectError } from '@connectrpc/connect';
import type { EnvelopedMessage } from '@connectrpc/connect/protocol';

/**
* basically the same as `createEnvelopeReadableStream` in the upstream code
*/
export async function* createEnvelopeAsyncGenerator(
stream: AsyncGenerator<Uint8Array>,
): AsyncGenerator<EnvelopedMessage> {
Expand Down
119 changes: 107 additions & 12 deletions packages/connect-miniprogram/src/connect/grpc-web-transport-wx.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
/**
* @see https://github.com/connectrpc/connect-es/blob/main/packages/connect-web/src/grpc-web-transport.ts
*/

import type {
AnyMessage,
Message,
MethodInfo,
PartialMessage,
ServiceType,
} from '@bufbuild/protobuf';
import { MethodKind } from '@bufbuild/protobuf';
import type {
ContextValues,
StreamResponse,
Transport,
UnaryResponse,
} from '@connectrpc/connect';
import { ConnectError } from '@connectrpc/connect';
import type { EnvelopedMessage } from '@connectrpc/connect/protocol';
import {
createClientMethodSerializers,
createMethodUrl,
Expand All @@ -20,14 +27,14 @@ import {
requestHeader,
validateResponse,
} from '@connectrpc/connect/protocol-grpc-web';
import {
trailerFlag,
trailerParse,
validateTrailer,
} from '@connectrpc/connect/protocol-grpc-web';
import { headersToObject } from 'headers-polyfill';

import { warnUnsupportedOptions } from './compatbility';
import { createRequestBody } from './message-body/create';
import {
parseStreamResponseBody,
parseUaryResponseBody,
} from './message-body/parse-grpc';
import { normalize, normalizeIterable } from './protocol/normalize';
import type { CreateTransportOptions } from './types';
import { createWxRequestAsAsyncGenerator } from './wx-request';
Expand Down Expand Up @@ -68,7 +75,7 @@ export function createGrpcWebTransport(
: timeoutMs;

const url = createMethodUrl(options.baseUrl, service, method);
const reqHeader = requestHeader(useBinaryFormat, timeoutMs, header);
const reqHeader = requestHeader(useBinaryFormat, timeoutMs, header, false);

reqHeader.delete('User-Agent');

Expand All @@ -83,8 +90,32 @@ export function createGrpcWebTransport(

validateResponse(response.statusCode, response.header);

const { trailer: resTrailer, message: resMessage } =
await parseUaryResponseBody(response.messageStream, parse);
let resTrailer: Headers | undefined;
let resMessage: O | undefined;
for await (const chunk of response.messageStream) {
const { flags, data } = chunk;
if (flags === trailerFlag) {
if (resTrailer !== undefined) {
throw 'extra trailer';
}
// Unary responses require exactly one response message, but in
// case of an error, it is perfectly valid to have a response body
// that only contains error trailers.
resTrailer = trailerParse(data);
continue;
}
if (resMessage !== undefined) {
throw 'extra message';
}
resMessage = parse(data);
}
if (resTrailer === undefined) {
throw 'missing trailer';
}
validateTrailer(resTrailer, response.header);
if (resMessage === undefined) {
throw 'missing message';
}

return {
service,
Expand Down Expand Up @@ -114,6 +145,70 @@ export function createGrpcWebTransport(
options.binaryOptions,
);

async function* parseResponseBody(
input: AsyncGenerator<EnvelopedMessage>,
foundStatus: boolean,
trailerTarget: Headers,
header: Headers,
) {
try {
if (foundStatus) {
// A grpc-status: 0 response header was present. This is a "trailers-only"
// response (a response without a body and no trailers).
//
// The spec seems to disallow a trailers-only response for status 0 - we are
// lenient and only verify that the body is empty.
//
// > [...] Trailers-Only is permitted for calls that produce an immediate error.
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
if (!(await input.next()).done) {
throw 'extra data for trailers-only';
}
return;
}
let trailerReceived = false;
for await (const chunk of input) {
const { flags, data } = chunk;

if ((flags & trailerFlag) === trailerFlag) {
if (trailerReceived) {
throw 'extra trailer';
}
trailerReceived = true;
const trailer = trailerParse(data);

validateTrailer(trailer, header);
trailer.forEach((value, key) => trailerTarget.set(key, value));

continue;
}
if (trailerReceived) {
throw 'extra message';
}
yield parse(data);
continue;
}
if (!trailerReceived) {
throw 'missing trailer';
}
} catch (e) {
throw ConnectError.from(e);
}
}

async function createRequestBody(
input: AsyncIterable<I>,
): Promise<Uint8Array> {
if (method.kind != MethodKind.ServerStreaming) {
throw 'Weixin does not support streaming request bodies';
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw 'missing request message';
}
return encodeEnvelope(0, serialize(r.value));
}

timeoutMs =
timeoutMs === undefined
? options.defaultTimeoutMs
Expand All @@ -122,12 +217,12 @@ export function createGrpcWebTransport(
: timeoutMs;

const url = createMethodUrl(options.baseUrl, service, method);
const reqHeader = requestHeader(useBinaryFormat, timeoutMs, header);
const reqHeader = requestHeader(useBinaryFormat, timeoutMs, header, false);

reqHeader.delete('User-Agent');

const reqMessage = normalizeIterable(method.I, input);
const body = await createRequestBody(reqMessage, serialize, method);
const body = await createRequestBody(reqMessage);
const response = await requestAsAsyncIterable({
url,
header: headersToObject(reqHeader),
Expand All @@ -142,11 +237,11 @@ export function createGrpcWebTransport(

const trailerTarget = new Headers();

const resMessage = await parseStreamResponseBody(
const resMessage = await parseResponseBody(
response.messageStream,
foundStatus,
trailerTarget,
parse,
response.header,
);

return {
Expand Down
17 changes: 0 additions & 17 deletions packages/connect-miniprogram/src/connect/message-body/create.ts

This file was deleted.

Loading

0 comments on commit 95345bc

Please sign in to comment.