Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 3.0 #250

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

### 3.0.0 (not yet released)

- **[BREAKING]** Replace `iterall` use with native `Symbol.asyncIterator`. <br/>
[@n1ru4l](https://github.com/n1ru4l) in [#232](https://github.com/apollographql/graphql-subscriptions/pull/232)
- Add an optional generic type map to `PubSub`. <br/>
[@cursorsdottsx](https://github.com/cursorsdottsx) in [#245](https://github.com/apollographql/graphql-subscriptions/pull/245)
- Support `readonly` arrays of event names. <br/>
[@rh389](https://github.com/rh389) in [#234](https://github.com/apollographql/graphql-subscriptions/pull/234)
- Support returning a Promise of an `AsyncIterator` as the `withFilter` resolver function. <br/>
[@maclockard](https://github.com/maclockard) in [#220](https://github.com/apollographql/graphql-subscriptions/pull/220)

### 2.0.1 (not yet released)

- `withFilter` TypeScript improvements. <br/>
Expand Down
30 changes: 25 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ You can use it with any GraphQL client and server (not only Apollo).

If you are developing a project that uses this module with TypeScript:

* ensure that your `tsconfig.json` `lib` definition includes `"esnext.asynciterable"`
* ensure that your `tsconfig.json` `lib` definition includes `"es2018.asynciterable"`
* `npm install @types/graphql` or `yarn add @types/graphql`

### Getting started with your first subscription
Expand All @@ -25,11 +25,11 @@ To begin with GraphQL subscriptions, start by defining a GraphQL `Subscription`

```graphql
type Subscription {
somethingChanged: Result
somethingChanged: Result
}

type Result {
id: String
id: String
}
```

Expand All @@ -52,7 +52,27 @@ import { PubSub } from 'graphql-subscriptions';
export const pubsub = new PubSub();
```

Now, implement your Subscriptions type resolver, using the `pubsub.asyncIterator` to map the event you need:
If you're using TypeScript you can use the optional generic parameter for added type-safety:

```ts
import { PubSub } from "apollo-server-express";

const pubsub = new PubSub<{
EVENT_ONE: { data: number; };
EVENT_TWO: { data: string; };
}>();

pubsub.publish("EVENT_ONE", { data: 42 });
pubsub.publish("EVENTONE", { data: 42 }); // ! ERROR
pubsub.publish("EVENT_ONE", { data: "42" }); // ! ERROR
pubsub.publish("EVENT_TWO", { data: "hello" });

pubsub.subscribe("EVENT_ONE", () => {});
pubsub.subscribe("EVENTONE", () => {}); // ! ERROR
pubsub.subscribe("EVENT_TWO", () => {});
```

Next implement your Subscriptions type resolver using the `pubsub.asyncIterator` to map the event you need:

```js
const SOMETHING_CHANGED_TOPIC = 'something_changed';
Expand All @@ -68,7 +88,7 @@ export const resolvers = {

> Subscriptions resolvers are not a function, but an object with `subscribe` method, that returns `AsyncIterable`.

Now, the GraphQL engine knows that `somethingChanged` is a subscription, and every time we use `pubsub.publish` over this topic - it will publish it using the transport we use:
The GraphQL engine now knows that `somethingChanged` is a subscription, and every time we use `pubsub.publish` it will publish content using our chosen transport layer:

```js
pubsub.publish(SOMETHING_CHANGED_TOPIC, { somethingChanged: { id: "123" }});
Expand Down
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
"type": "git",
"url": "https://github.com/apollostack/graphql-subscriptions.git"
},
"dependencies": {
"iterall": "^1.3.0"
},
"dependencies": {},
"peerDependencies": {
"graphql": "^15.7.2 || ^16.0.0"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { $$asyncIterator } from 'iterall';
import { PubSubEngine } from './pubsub-engine';

/**
Expand All @@ -17,7 +16,7 @@ import { PubSubEngine } from './pubsub-engine';
* A queue of PubSubEngine events waiting for next() calls to be made, which returns the queued events
* for handling. This queue expands as PubSubEngine events arrive without next() calls occurring in-between.
*
* @property eventsArray @type {string[]}
* @property eventsArray @type {readonly string[]}
* An array of PubSubEngine event names that this PubSubAsyncIterator should watch.
*
* @property allSubscribed @type {Promise<number[]>}
Expand All @@ -33,16 +32,16 @@ import { PubSubEngine } from './pubsub-engine';
* @property pubsub @type {PubSubEngine}
* The PubSubEngine whose events will be observed.
*/
export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
export class PubSubAsyncIterableIterator<T> implements AsyncIterableIterator<T> {

private pullQueue: ((value: IteratorResult<T>) => void)[];
private pushQueue: T[];
private eventsArray: string[];
private eventsArray: readonly string[];
private allSubscribed: Promise<number[]>;
private running: boolean;
private pubsub: PubSubEngine;

constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
constructor(pubsub: PubSubEngine, eventNames: string | readonly string[]) {
this.pubsub = pubsub;
this.pullQueue = [];
this.pushQueue = [];
Expand All @@ -66,7 +65,7 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
return Promise.reject(error);
}

public [$$asyncIterator]() {
public [Symbol.asyncIterator]() {
return this;
}

Expand Down Expand Up @@ -119,5 +118,4 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
this.pubsub.unsubscribe(subscriptionId);
}
}

}
6 changes: 3 additions & 3 deletions src/pubsub-engine.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {PubSubAsyncIterator} from './pubsub-async-iterator';
import {PubSubAsyncIterableIterator} from './pubsub-async-iterable-iterator';

export abstract class PubSubEngine {
public abstract publish(triggerName: string, payload: any): Promise<void>;
public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
public abstract unsubscribe(subId: number);
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers);
public asyncIterableIterator<T>(triggers: string | readonly string[]): PubSubAsyncIterableIterator<T> {
return new PubSubAsyncIterableIterator<T>(this, triggers);
}
}
14 changes: 11 additions & 3 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export interface PubSubOptions {
eventEmitter?: EventEmitter;
}

export class PubSub extends PubSubEngine {
export class PubSub<
Events extends { [event: string]: unknown } = Record<string, never>
> extends PubSubEngine {
protected ee: EventEmitter;
private subscriptions: { [key: string]: [string, (...args: any[]) => void] };
private subIdCounter: number;
Expand All @@ -17,12 +19,18 @@ export class PubSub extends PubSubEngine {
this.subIdCounter = 0;
}

public publish(triggerName: string, payload: any): Promise<void> {
public publish<K extends keyof Events>(
triggerName: K & string,
payload: Events[K] extends never ? any : Events[K]
): Promise<void> {
this.ee.emit(triggerName, payload);
return Promise.resolve();
}

public subscribe(triggerName: string, onMessage: (...args: any[]) => void): Promise<number> {
public subscribe<K extends keyof Events>(
triggerName: K & string,
onMessage: (...args: any[]) => void
): Promise<number> {
this.ee.addListener(triggerName, onMessage);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
Expand Down
56 changes: 30 additions & 26 deletions src/test/asyncIteratorSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import * as chaiAsPromised from 'chai-as-promised';
import { spy } from 'sinon';
import * as sinonChai from 'sinon-chai';

import { createAsyncIterator, isAsyncIterable } from 'iterall';
import { PubSub } from '../pubsub';
import { withFilter, FilterFn } from '../with-filter';
import { ExecutionResult } from 'graphql';

const isAsyncIterableIterator = (input: unknown): input is AsyncIterableIterator<unknown> => {
return input != null && typeof input[Symbol.asyncIterator] === 'function';
};

chai.use(chaiAsPromised);
chai.use(sinonChai);
const expect = chai.expect;
Expand Down Expand Up @@ -64,14 +67,13 @@ describe('GraphQL-JS asyncIterator', () => {
}
`);
const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterableIterator(FIRST_EVENT);
const schema = buildSchema(origIterator);


const results = await subscribe({schema, document: query}) as AsyncIterator<ExecutionResult>;
const results = await subscribe({ schema, document: query }) as AsyncIterableIterator<ExecutionResult>;
const payload1 = results.next();

expect(isAsyncIterable(results)).to.be.true;
expect(isAsyncIterableIterator(results)).to.be.true;

const r = payload1.then(res => {
expect(res.value.data.testSubscription).to.equal('FIRST_EVENT');
Expand All @@ -90,13 +92,13 @@ describe('GraphQL-JS asyncIterator', () => {
}
`);
const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterableIterator(FIRST_EVENT);
const schema = buildSchema(origIterator, () => Promise.resolve(true));

const results = await subscribe({schema, document: query}) as AsyncIterator<ExecutionResult>;
const results = await subscribe({ schema, document: query }) as AsyncIterableIterator<ExecutionResult>;
const payload1 = results.next();

expect(isAsyncIterable(results)).to.be.true;
expect(isAsyncIterableIterator(results)).to.be.true;

const r = payload1.then(res => {
expect(res.value.data.testSubscription).to.equal('FIRST_EVENT');
Expand All @@ -115,7 +117,7 @@ describe('GraphQL-JS asyncIterator', () => {
`);

const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterableIterator(FIRST_EVENT);

let counter = 0;

Expand All @@ -133,8 +135,8 @@ describe('GraphQL-JS asyncIterator', () => {

const schema = buildSchema(origIterator, filterFn);

subscribe({schema, document: query}).then((results: AsyncGenerator<ExecutionResult, void, void> | ExecutionResult) => {
expect(isAsyncIterable(results)).to.be.true;
Promise.resolve(subscribe({ schema, document: query })).then((results: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => {
expect(isAsyncIterableIterator(results)).to.be.true;

(results as AsyncGenerator<ExecutionResult, void, void>).next();
(results as AsyncGenerator<ExecutionResult, void, void>).return();
Expand All @@ -155,7 +157,7 @@ describe('GraphQL-JS asyncIterator', () => {
`);

const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterableIterator(FIRST_EVENT);
const returnSpy = spy(origIterator, 'return');
const schema = buildSchema(origIterator);

Expand All @@ -172,21 +174,22 @@ describe('GraphQL-JS asyncIterator', () => {
});
});

describe('withFilter', () => {

it('works properly with finite asyncIterators', async () => {
const isEven = (x: number) => x % 2 === 0;
function isEven(x: number) {
if (x === undefined) {
throw Error('Undefined value passed to filterFn');
}
return x % 2 === 0;
}

const testFiniteAsyncIterator: AsyncIterator<number> = createAsyncIterator([1, 2, 3, 4, 5, 6, 7, 8]);
// Work around https://github.com/leebyron/iterall/issues/48
testFiniteAsyncIterator.throw = function (error) {
return Promise.reject(error);
};
testFiniteAsyncIterator.return = function () {
return Promise.resolve({ value: undefined, done: true });
};
const testFiniteAsyncIterator: AsyncIterableIterator<number> = (async function * () {
for (const value of [1, 2, 3, 4, 5, 6, 7, 8]) {
yield value;
}
})();

const filteredAsyncIterator = withFilter(() => testFiniteAsyncIterator, isEven)();
describe('withFilter', () => {
it('works properly with finite asyncIterators', async () => {
const filteredAsyncIterator = await withFilter(() => testFiniteAsyncIterator, isEven)();

for (let i = 1; i <= 4; i++) {
const result = await filteredAsyncIterator.next();
Expand Down Expand Up @@ -225,7 +228,8 @@ describe('withFilter', () => {
},
};

const filteredAsyncIterator = withFilter(() => asyncIterator, () => stopped)();
const filteredAsyncIterator =
await withFilter(() => asyncIterator, () => stopped)();

global.gc();
const heapUsed = process.memoryUsage().heapUsed;
Expand Down
19 changes: 11 additions & 8 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import * as chaiAsPromised from 'chai-as-promised';
import * as sinonChai from 'sinon-chai';

import { PubSub } from '../pubsub';
import { isAsyncIterable } from 'iterall';

const isAsyncIterableIterator = (input: unknown): input is AsyncIterableIterator<unknown> => {
return input != null && typeof input[Symbol.asyncIterator] === 'function';
};

chai.use(chaiAsPromised);
chai.use(sinonChai);
Expand Down Expand Up @@ -37,15 +40,15 @@ describe('AsyncIterator', () => {
it('should expose valid asyncIterator for a specific event', () => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = ps.asyncIterableIterator(eventName);
expect(iterator).to.not.be.undefined;
expect(isAsyncIterable(iterator)).to.be.true;
expect(isAsyncIterableIterator(iterator)).to.be.true;
});

it('should trigger event on asyncIterator when published', done => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = ps.asyncIterableIterator(eventName);

iterator.next().then(result => {
expect(result).to.not.be.undefined;
Expand All @@ -60,7 +63,7 @@ describe('AsyncIterator', () => {
it('should not trigger event on asyncIterator when publishing other event', () => {
const eventName = 'test2';
const ps = new PubSub();
const iterator = ps.asyncIterator('test');
const iterator = ps.asyncIterableIterator('test');
const spy = sinon.spy();

iterator.next().then(spy);
Expand All @@ -71,7 +74,7 @@ describe('AsyncIterator', () => {
it('register to multiple events', done => {
const eventName = 'test2';
const ps = new PubSub();
const iterator = ps.asyncIterator(['test', 'test2']);
const iterator = ps.asyncIterableIterator(['test', 'test2'] as const);
const spy = sinon.spy();

iterator.next().then(() => {
Expand All @@ -85,7 +88,7 @@ describe('AsyncIterator', () => {
it('should not trigger event on asyncIterator already returned', done => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = ps.asyncIterableIterator(eventName);

iterator.next().then(result => {
expect(result).to.deep.equal({
Expand Down Expand Up @@ -117,7 +120,7 @@ describe('AsyncIterator', () => {
}
}
const ps = new TestPubSub();
ps.asyncIterator(testEventName);
ps.asyncIterableIterator(testEventName);

expect(ps.listenerCount(testEventName)).to.equal(0);
});
Expand Down