Skip to content

Commit

Permalink
Node: add flush method (#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
silesky authored Jan 10, 2024
1 parent e57960e commit 5f37f4f
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/great-ads-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@segment/analytics-node': minor
---

Add analytics.flush({ timeout: ..., close: ... }) method
134 changes: 134 additions & 0 deletions packages/node/src/__tests__/graceful-shutdown-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,138 @@ describe('Ability for users to exit without losing events', () => {
expect(calls[0].data.batch.length).toBe(2)
})
})

describe('.flush()', () => {
beforeEach(() => {
ajs = new Analytics({
writeKey: 'abc123',
httpClient: testClient,
maxEventsInBatch: 15,
})
})

it('should be able to flush multiple times', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCalls = 0
ajs.on('track', () => {
trackCalls++
})
// make track call
_helpers.makeTrackCall()

// flush first time
await ajs.flush()
expect(_helpers.getFetchCalls().length).toBe(1)
expect(trackCalls).toBe(1)
expect(drainedCalls).toBe(1)

// make another 2 track calls
_helpers.makeTrackCall()
_helpers.makeTrackCall()

// flush second time
await ajs.flush()
expect(drainedCalls).toBe(2)
expect(_helpers.getFetchCalls().length).toBe(2)
expect(trackCalls).toBe(3)
})

test('should handle events normally if new events enter the pipeline _after_ flush is called', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCallCount = 0
ajs.on('track', () => {
trackCallCount += 1
})

// make regular call
_helpers.makeTrackCall()
const flushed = ajs.flush()

// add another event to the queue to simulate late-arriving track call. flush should not wait for this event.
await sleep(100)
_helpers.makeTrackCall()

await flushed
expect(trackCallCount).toBe(1)
expect(_helpers.getFetchCalls().length).toBe(1)
expect(drainedCalls).toBe(1)

// should be one event left in the queue (the late-arriving track call). This will be included in the next flush.
// add a second event to the queue.
_helpers.makeTrackCall()

await ajs.flush()
expect(drainedCalls).toBe(2)
expect(_helpers.getFetchCalls().length).toBe(2)
expect(trackCallCount).toBe(3)
})

test('overlapping flush calls should be ignored with a wwarning', async () => {
ajs = new Analytics({
writeKey: 'abc123',
httpClient: testClient,
maxEventsInBatch: 2,
})
const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {})
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCallCount = 0
ajs.on('track', () => {
trackCallCount += 1
})

_helpers.makeTrackCall()
// overlapping flush calls
const flushes = Promise.all([ajs.flush(), ajs.flush()])
_helpers.makeTrackCall()
_helpers.makeTrackCall()
await flushes
expect(warnSpy).toHaveBeenCalledTimes(1)
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Overlapping flush calls detected')
)
expect(trackCallCount).toBe(3)
expect(drainedCalls).toBe(1)

// just to be ensure the pipeline is operating as usual, make another track call and flush
_helpers.makeTrackCall()
await ajs.flush()
expect(trackCallCount).toBe(4)
expect(drainedCalls).toBe(2)
})

test('should call console.warn only once', async () => {
const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {})
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})

_helpers.makeTrackCall()

// overlapping flush calls
await Promise.all([ajs.flush(), ajs.flush()])
expect(warnSpy).toHaveBeenCalledTimes(1)
expect(drainedCalls).toBe(1)

_helpers.makeTrackCall()
// non-overlapping flush calls
await ajs.flush()
expect(drainedCalls).toBe(2)

// there are no additional events to flush
await ajs.flush()
expect(drainedCalls).toBe(2)

expect(warnSpy).toHaveBeenCalledTimes(1)
})
})
})
42 changes: 35 additions & 7 deletions packages/node/src/app/analytics-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
TrackParams,
Plugin,
SegmentEvent,
FlushParams,
CloseAndFlushParams,
} from './types'
import { Context } from './context'
import { NodeEventQueue } from './event-queue'
Expand All @@ -27,6 +29,8 @@ export class Analytics extends NodeEmitter implements CoreAnalytics {
typeof createConfiguredNodePlugin
>['publisher']

private _isFlushing = false

private readonly _queue: NodeEventQueue

ready: Promise<void>
Expand Down Expand Up @@ -78,18 +82,42 @@ export class Analytics extends NodeEmitter implements CoreAnalytics {
*/
public closeAndFlush({
timeout = this._closeAndFlushDefaultTimeout,
}: {
/** Set a maximum time permitted to wait before resolving. */
timeout?: number
} = {}): Promise<void> {
this._publisher.flushAfterClose(this._pendingEvents)
this._isClosed = true
}: CloseAndFlushParams = {}): Promise<void> {
return this.flush({ timeout, close: true })
}

/**
* Call this method to flush all existing events..
* This method also waits for any event method-specific callbacks to be triggered,
* and any of their subsequent promises to be resolved/rejected.
*/
public async flush({
timeout,
close = false,
}: FlushParams = {}): Promise<void> {
if (this._isFlushing) {
// if we're already flushing, then we don't need to do anything
console.warn(
'Overlapping flush calls detected. Please wait for the previous flush to finish before calling .flush again'
)
return
} else {
this._isFlushing = true
}
if (close) {
this._isClosed = true
}
this._publisher.flush(this._pendingEvents)
const promise = new Promise<void>((resolve) => {
if (!this._pendingEvents) {
resolve()
} else {
this.once('drained', () => resolve())
this.once('drained', () => {
resolve()
})
}
}).finally(() => {
this._isFlushing = false
})
return timeout ? pTimeout(promise, timeout).catch(() => undefined) : promise
}
Expand Down
18 changes: 18 additions & 0 deletions packages/node/src/app/types/params.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,21 @@ export type TrackParams = {
timestamp?: Timestamp
integrations?: Integrations
} & IdentityOptions

export type FlushParams = {
/**
* Max time in milliseconds to wait until the resulting promise resolves.
*/
timeout?: number
/**
* If true, will prevent new events from entering the pipeline. Default: false
*/
close?: boolean
}

export type CloseAndFlushParams = {
/**
* Max time in milliseconds to wait until the resulting promise resolves.
*/
timeout?: FlushParams['timeout']
}
12 changes: 5 additions & 7 deletions packages/node/src/plugins/segmentio/__tests__/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ describe('flushAfterClose', () => {
flushAt: 20,
})

publisher.flushAfterClose(3)
publisher.flush(3)

void segmentPlugin.track(_createTrackCtx())
void segmentPlugin.track(_createTrackCtx())
Expand All @@ -202,7 +202,7 @@ describe('flushAfterClose', () => {
flushAt: 1,
})

publisher.flushAfterClose(3)
publisher.flush(3)

void segmentPlugin.track(_createTrackCtx())
void segmentPlugin.track(_createTrackCtx())
Expand All @@ -215,7 +215,7 @@ describe('flushAfterClose', () => {
flushAt: 3,
})

publisher.flushAfterClose(5)
publisher.flush(5)
range(3).forEach(() => segmentPlugin.track(_createTrackCtx()))
expect(makeReqSpy).toHaveBeenCalledTimes(1)
range(2).forEach(() => segmentPlugin.track(_createTrackCtx()))
Expand All @@ -228,9 +228,7 @@ describe('flushAfterClose', () => {
})

range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush
expect(makeReqSpy).toHaveBeenCalledTimes(0)
publisher.flushAfterClose(5)
expect(makeReqSpy).toHaveBeenCalledTimes(0)
publisher.flush(5)
range(2).forEach(() => segmentPlugin.track(_createTrackCtx()))
expect(makeReqSpy).toHaveBeenCalledTimes(1)
})
Expand All @@ -242,7 +240,7 @@ describe('flushAfterClose', () => {

range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush
expect(makeReqSpy).toHaveBeenCalledTimes(0)
publisher.flushAfterClose(10)
publisher.flush(10)
expect(makeReqSpy).toHaveBeenCalledTimes(0)
range(4).forEach(() => segmentPlugin.track(_createTrackCtx())) // batch is full, send.
expect(makeReqSpy).toHaveBeenCalledTimes(1)
Expand Down
17 changes: 9 additions & 8 deletions packages/node/src/plugins/segmentio/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ export class Publisher {
private _maxRetries: number
private _auth: string
private _url: string
private _closeAndFlushPendingItemsCount?: number
private _flushPendingItemsCount?: number
private _httpRequestTimeout: number
private _emitter: NodeEmitter
private _disable: boolean
private _httpClient: HTTPClient

constructor(
{
host,
Expand Down Expand Up @@ -96,13 +97,13 @@ export class Publisher {
this._batch = undefined
}

flushAfterClose(pendingItemsCount: number) {
flush(pendingItemsCount: number): void {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
// if number of pending items is 0, there is nothing to flush
return
}

this._closeAndFlushPendingItemsCount = pendingItemsCount
this._flushPendingItemsCount = pendingItemsCount

// if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them.
if (!this._batch) return
Expand Down Expand Up @@ -145,7 +146,7 @@ export class Publisher {
const addStatus = batch.tryAdd(pendingItem)
if (addStatus.success) {
const isExpectingNoMoreItems =
batch.length === this._closeAndFlushPendingItemsCount
batch.length === this._flushPendingItemsCount
const isFull = batch.length === this._flushAt
if (isFull || isExpectingNoMoreItems) {
this.send(batch).catch(noop)
Expand All @@ -166,7 +167,7 @@ export class Publisher {

if (fbAddStatus.success) {
const isExpectingNoMoreItems =
fallbackBatch.length === this._closeAndFlushPendingItemsCount
fallbackBatch.length === this._flushPendingItemsCount
if (isExpectingNoMoreItems) {
this.send(fallbackBatch).catch(noop)
this.clearBatch()
Expand All @@ -182,8 +183,8 @@ export class Publisher {
}

private async send(batch: ContextBatch) {
if (this._closeAndFlushPendingItemsCount) {
this._closeAndFlushPendingItemsCount -= batch.length
if (this._flushPendingItemsCount) {
this._flushPendingItemsCount -= batch.length
}
const events = batch.getEvents()
const maxAttempts = this._maxRetries + 1
Expand Down

0 comments on commit 5f37f4f

Please sign in to comment.