diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index eca23788..e96c17dc 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -167,6 +167,34 @@ export class SegmentClient { return { ...this.config }; } + /** + * Method for clearing flush queue. + */ + clearFlushQueue() { + const plugins = this.getPlugins(); + plugins.forEach(async (plugin) => { + if (plugin.type == PluginType.destination) { + await plugin?.clearFlushQueue(); + this.flushPolicyExecuter.reset(); + } + }); + } + + /** + * Method to get count of events in flush queue. + */ + async getFlushQueueCount() { + const plugins = this.getPlugins(); + let totalEventsCount = 0; + for (let i = 0; i <= plugins.length; i++) { + if (plugins[i]?.type == PluginType.destination) { + const eventsCount = await plugins[i]?.getQueueCount(); + totalEventsCount += eventsCount; + } + } + return totalEventsCount; + } + constructor({ config, logger, diff --git a/packages/core/src/client.tsx b/packages/core/src/client.tsx index 4acae002..0c3f7fd1 100644 --- a/packages/core/src/client.tsx +++ b/packages/core/src/client.tsx @@ -71,6 +71,8 @@ export const useAnalytics = (): ClientMethods => { group: async (...args) => client?.group(...args), alias: async (...args) => client?.alias(...args), reset: async (...args) => client?.reset(...args), + clearFlushQueue: async () => client?.clearFlushQueue(), + getFlushQueueCount: async () => client?.getFlushQueueCount(), }; }, [client]); }; diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19..ab8ce33d 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -36,6 +36,15 @@ export class Plugin { shutdown() { // do nothing by default, user can override. } + + async clearFlushQueue() { + // Overridden in Segment Destination + } + + async getQueueCount() { + // Overridden in Segment Destination + return 0; + } } export class EventPlugin extends Plugin { diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 6c303525..dc19d3cc 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -114,4 +114,22 @@ export class QueueFlushingPlugin extends UtilityPlugin { return { events: filteredEvents }; }); } + + /** + * Clear all events from the queue + */ + async clearQueue() { + await this.queueStore?.dispatch(() => { + return { events: [] }; + }); + } + + /** + * Returns the count of items in the queue + */ + async getQueueCount() { + const state = await this.queueStore?.getState(); + const eventsCount = state?.events.length || 0; + return eventsCount; + } } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index b3717b8e..ab6aa3fd 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -131,4 +131,13 @@ export class SegmentDestination extends DestinationPlugin { // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } + async clearFlushQueue() { + //Wait until clearing current Flush queue + return this.queuePlugin.clearQueue(); + } + async getQueueCount() { + // Wait until getting the count of queue + const eventsCount = await this.queuePlugin.getQueueCount(); + return eventsCount; + } } diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 81eabac7..e5dfaf6c 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -77,4 +77,75 @@ describe('QueueFlushingPlugin', () => { // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); }); + + it('should clear all events from the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(2); + + await queuePlugin.clearQueue(); + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); + }); + + it('should return the count of items in the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + let eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(2); + + await queuePlugin.dequeue(event1); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(1); + + await queuePlugin.clearQueue(); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(0); + }); }); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 141ca3c4..95997714 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -159,6 +159,8 @@ export type ClientMethods = { group: (groupId: string, groupTraits?: GroupTraits) => Promise; alias: (newUserId: string) => Promise; reset: (resetAnonymousId?: boolean) => Promise; + clearFlushQueue: () => Promise; + getFlushQueueCount: () => Promise; }; type ContextApp = {