diff --git a/.changeset/great-ads-share.md b/.changeset/great-ads-share.md new file mode 100644 index 000000000..c0d369b42 --- /dev/null +++ b/.changeset/great-ads-share.md @@ -0,0 +1,5 @@ +--- +'@segment/analytics-node': minor +--- + +Add analytics.flush({ timeout: ..., close: ... }) method diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts index 2790f925c..ef5728cbb 100644 --- a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -241,4 +241,138 @@ describe('Ability for users to exit without losing events', () => { expect(calls[0].data.batch.length).toBe(2) }) }) + + describe('.flush()', () => { + beforeEach(() => { + ajs = new Analytics({ + writeKey: 'abc123', + httpClient: testClient, + maxEventsInBatch: 15, + }) + }) + + it('should be able to flush multiple times', async () => { + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + let trackCalls = 0 + ajs.on('track', () => { + trackCalls++ + }) + // make track call + _helpers.makeTrackCall() + + // flush first time + await ajs.flush() + expect(_helpers.getFetchCalls().length).toBe(1) + expect(trackCalls).toBe(1) + expect(drainedCalls).toBe(1) + + // make another 2 track calls + _helpers.makeTrackCall() + _helpers.makeTrackCall() + + // flush second time + await ajs.flush() + expect(drainedCalls).toBe(2) + expect(_helpers.getFetchCalls().length).toBe(2) + expect(trackCalls).toBe(3) + }) + + test('should handle events normally if new events enter the pipeline _after_ flush is called', async () => { + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + let trackCallCount = 0 + ajs.on('track', () => { + trackCallCount += 1 + }) + + // make regular call + _helpers.makeTrackCall() + const flushed = ajs.flush() + + // add another event to the queue to simulate late-arriving track call. flush should not wait for this event. + await sleep(100) + _helpers.makeTrackCall() + + await flushed + expect(trackCallCount).toBe(1) + expect(_helpers.getFetchCalls().length).toBe(1) + expect(drainedCalls).toBe(1) + + // should be one event left in the queue (the late-arriving track call). This will be included in the next flush. + // add a second event to the queue. + _helpers.makeTrackCall() + + await ajs.flush() + expect(drainedCalls).toBe(2) + expect(_helpers.getFetchCalls().length).toBe(2) + expect(trackCallCount).toBe(3) + }) + + test('overlapping flush calls should be ignored with a wwarning', async () => { + ajs = new Analytics({ + writeKey: 'abc123', + httpClient: testClient, + maxEventsInBatch: 2, + }) + const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}) + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + let trackCallCount = 0 + ajs.on('track', () => { + trackCallCount += 1 + }) + + _helpers.makeTrackCall() + // overlapping flush calls + const flushes = Promise.all([ajs.flush(), ajs.flush()]) + _helpers.makeTrackCall() + _helpers.makeTrackCall() + await flushes + expect(warnSpy).toHaveBeenCalledTimes(1) + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Overlapping flush calls detected') + ) + expect(trackCallCount).toBe(3) + expect(drainedCalls).toBe(1) + + // just to be ensure the pipeline is operating as usual, make another track call and flush + _helpers.makeTrackCall() + await ajs.flush() + expect(trackCallCount).toBe(4) + expect(drainedCalls).toBe(2) + }) + + test('should call console.warn only once', async () => { + const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}) + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + + _helpers.makeTrackCall() + + // overlapping flush calls + await Promise.all([ajs.flush(), ajs.flush()]) + expect(warnSpy).toHaveBeenCalledTimes(1) + expect(drainedCalls).toBe(1) + + _helpers.makeTrackCall() + // non-overlapping flush calls + await ajs.flush() + expect(drainedCalls).toBe(2) + + // there are no additional events to flush + await ajs.flush() + expect(drainedCalls).toBe(2) + + expect(warnSpy).toHaveBeenCalledTimes(1) + }) + }) }) diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index b5c0b4b0f..229df134c 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -13,6 +13,8 @@ import { TrackParams, Plugin, SegmentEvent, + FlushParams, + CloseAndFlushParams, } from './types' import { Context } from './context' import { NodeEventQueue } from './event-queue' @@ -27,6 +29,8 @@ export class Analytics extends NodeEmitter implements CoreAnalytics { typeof createConfiguredNodePlugin >['publisher'] + private _isFlushing = false + private readonly _queue: NodeEventQueue ready: Promise @@ -78,18 +82,42 @@ export class Analytics extends NodeEmitter implements CoreAnalytics { */ public closeAndFlush({ timeout = this._closeAndFlushDefaultTimeout, - }: { - /** Set a maximum time permitted to wait before resolving. */ - timeout?: number - } = {}): Promise { - this._publisher.flushAfterClose(this._pendingEvents) - this._isClosed = true + }: CloseAndFlushParams = {}): Promise { + return this.flush({ timeout, close: true }) + } + + /** + * Call this method to flush all existing events.. + * This method also waits for any event method-specific callbacks to be triggered, + * and any of their subsequent promises to be resolved/rejected. + */ + public async flush({ + timeout, + close = false, + }: FlushParams = {}): Promise { + if (this._isFlushing) { + // if we're already flushing, then we don't need to do anything + console.warn( + 'Overlapping flush calls detected. Please wait for the previous flush to finish before calling .flush again' + ) + return + } else { + this._isFlushing = true + } + if (close) { + this._isClosed = true + } + this._publisher.flush(this._pendingEvents) const promise = new Promise((resolve) => { if (!this._pendingEvents) { resolve() } else { - this.once('drained', () => resolve()) + this.once('drained', () => { + resolve() + }) } + }).finally(() => { + this._isFlushing = false }) return timeout ? pTimeout(promise, timeout).catch(() => undefined) : promise } diff --git a/packages/node/src/app/types/params.ts b/packages/node/src/app/types/params.ts index 722464c49..c8974bd8e 100644 --- a/packages/node/src/app/types/params.ts +++ b/packages/node/src/app/types/params.ts @@ -76,3 +76,21 @@ export type TrackParams = { timestamp?: Timestamp integrations?: Integrations } & IdentityOptions + +export type FlushParams = { + /** + * Max time in milliseconds to wait until the resulting promise resolves. + */ + timeout?: number + /** + * If true, will prevent new events from entering the pipeline. Default: false + */ + close?: boolean +} + +export type CloseAndFlushParams = { + /** + * Max time in milliseconds to wait until the resulting promise resolves. + */ + timeout?: FlushParams['timeout'] +} diff --git a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts index 24ac50cb5..fb8364bb2 100644 --- a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts @@ -188,7 +188,7 @@ describe('flushAfterClose', () => { flushAt: 20, }) - publisher.flushAfterClose(3) + publisher.flush(3) void segmentPlugin.track(_createTrackCtx()) void segmentPlugin.track(_createTrackCtx()) @@ -202,7 +202,7 @@ describe('flushAfterClose', () => { flushAt: 1, }) - publisher.flushAfterClose(3) + publisher.flush(3) void segmentPlugin.track(_createTrackCtx()) void segmentPlugin.track(_createTrackCtx()) @@ -215,7 +215,7 @@ describe('flushAfterClose', () => { flushAt: 3, }) - publisher.flushAfterClose(5) + publisher.flush(5) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) expect(makeReqSpy).toHaveBeenCalledTimes(1) range(2).forEach(() => segmentPlugin.track(_createTrackCtx())) @@ -228,9 +228,7 @@ describe('flushAfterClose', () => { }) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush - expect(makeReqSpy).toHaveBeenCalledTimes(0) - publisher.flushAfterClose(5) - expect(makeReqSpy).toHaveBeenCalledTimes(0) + publisher.flush(5) range(2).forEach(() => segmentPlugin.track(_createTrackCtx())) expect(makeReqSpy).toHaveBeenCalledTimes(1) }) @@ -242,7 +240,7 @@ describe('flushAfterClose', () => { range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush expect(makeReqSpy).toHaveBeenCalledTimes(0) - publisher.flushAfterClose(10) + publisher.flush(10) expect(makeReqSpy).toHaveBeenCalledTimes(0) range(4).forEach(() => segmentPlugin.track(_createTrackCtx())) // batch is full, send. expect(makeReqSpy).toHaveBeenCalledTimes(1) diff --git a/packages/node/src/plugins/segmentio/publisher.ts b/packages/node/src/plugins/segmentio/publisher.ts index 221870b4d..6297f2c0f 100644 --- a/packages/node/src/plugins/segmentio/publisher.ts +++ b/packages/node/src/plugins/segmentio/publisher.ts @@ -42,11 +42,12 @@ export class Publisher { private _maxRetries: number private _auth: string private _url: string - private _closeAndFlushPendingItemsCount?: number + private _flushPendingItemsCount?: number private _httpRequestTimeout: number private _emitter: NodeEmitter private _disable: boolean private _httpClient: HTTPClient + constructor( { host, @@ -96,13 +97,13 @@ export class Publisher { this._batch = undefined } - flushAfterClose(pendingItemsCount: number) { + flush(pendingItemsCount: number): void { if (!pendingItemsCount) { - // if number of pending items is 0, there will never be anything else entering the batch, since the app is closed. + // if number of pending items is 0, there is nothing to flush return } - this._closeAndFlushPendingItemsCount = pendingItemsCount + this._flushPendingItemsCount = pendingItemsCount // if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them. if (!this._batch) return @@ -145,7 +146,7 @@ export class Publisher { const addStatus = batch.tryAdd(pendingItem) if (addStatus.success) { const isExpectingNoMoreItems = - batch.length === this._closeAndFlushPendingItemsCount + batch.length === this._flushPendingItemsCount const isFull = batch.length === this._flushAt if (isFull || isExpectingNoMoreItems) { this.send(batch).catch(noop) @@ -166,7 +167,7 @@ export class Publisher { if (fbAddStatus.success) { const isExpectingNoMoreItems = - fallbackBatch.length === this._closeAndFlushPendingItemsCount + fallbackBatch.length === this._flushPendingItemsCount if (isExpectingNoMoreItems) { this.send(fallbackBatch).catch(noop) this.clearBatch() @@ -182,8 +183,8 @@ export class Publisher { } private async send(batch: ContextBatch) { - if (this._closeAndFlushPendingItemsCount) { - this._closeAndFlushPendingItemsCount -= batch.length + if (this._flushPendingItemsCount) { + this._flushPendingItemsCount -= batch.length } const events = batch.getEvents() const maxAttempts = this._maxRetries + 1