From f400f9f9afa8d2fd1249756bd6cb58195ee5993a Mon Sep 17 00:00:00 2001 From: Harsh Vardhan Date: Wed, 7 Aug 2024 16:39:32 +0530 Subject: [PATCH 1/2] feat: added client method to clear flush queue --- packages/core/src/analytics.ts | 10 ++++++ packages/core/src/client.tsx | 1 + packages/core/src/plugin.ts | 4 +++ .../core/src/plugins/QueueFlushingPlugin.ts | 9 +++++ .../core/src/plugins/SegmentDestination.ts | 4 +++ .../__tests__/QueueFlushingPlugin.test.ts | 34 +++++++++++++++++++ packages/core/src/types.ts | 1 + 7 files changed, 63 insertions(+) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index eca23788..b0c29e39 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -167,6 +167,16 @@ export class SegmentClient { return { ...this.config }; } + clearFlushQueue() { + const plugins = this.getPlugins(); + plugins.forEach(async(plugin)=>{ + if (plugin.type == PluginType.destination) { + await plugin?.clearFlushQueue() + this.flushPolicyExecuter.reset(); + } + }) + } + constructor({ config, logger, diff --git a/packages/core/src/client.tsx b/packages/core/src/client.tsx index 4acae002..b923c345 100644 --- a/packages/core/src/client.tsx +++ b/packages/core/src/client.tsx @@ -71,6 +71,7 @@ export const useAnalytics = (): ClientMethods => { group: async (...args) => client?.group(...args), alias: async (...args) => client?.alias(...args), reset: async (...args) => client?.reset(...args), + clearFlushQueue: async () => client?.clearFlushQueue() }; }, [client]); }; diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19..b69b341c 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -36,6 +36,10 @@ export class Plugin { shutdown() { // do nothing by default, user can override. } + + async clearFlushQueue() { + // Overridden in Segment Destination + } } export class EventPlugin extends Plugin { diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 6c303525..8b46502c 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -114,4 +114,13 @@ export class QueueFlushingPlugin extends UtilityPlugin { return { events: filteredEvents }; }); } + + /** + * Clear all events from the queue + */ + async clearQueue() { + await this.queueStore?.dispatch(() => { + return { events: [] } + }) + } } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index b3717b8e..f90c7063 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -131,4 +131,8 @@ export class SegmentDestination extends DestinationPlugin { // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } + async clearFlushQueue() { + //Wait until clearing current Flush queue + return this.queuePlugin.clearQueue(); + } } diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 81eabac7..55aa9625 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -77,4 +77,38 @@ describe('QueueFlushingPlugin', () => { // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); }); + + it('should clear all events from the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(2); + + await queuePlugin.clearQueue(); + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); +}); }); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 141ca3c4..34ac135f 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -159,6 +159,7 @@ export type ClientMethods = { group: (groupId: string, groupTraits?: GroupTraits) => Promise; alias: (newUserId: string) => Promise; reset: (resetAnonymousId?: boolean) => Promise; + clearFlushQueue: () => Promise ; }; type ContextApp = { From b7120186e5ff58dde1dfe7c8336e7db3eff4f24a Mon Sep 17 00:00:00 2001 From: Harsh Vardhan Date: Fri, 9 Aug 2024 11:51:06 +0530 Subject: [PATCH 2/2] feat: added client method to get count of flush queue --- packages/core/src/analytics.ts | 24 ++++++++++-- packages/core/src/client.tsx | 3 +- packages/core/src/plugin.ts | 5 +++ .../core/src/plugins/QueueFlushingPlugin.ts | 13 ++++++- .../core/src/plugins/SegmentDestination.ts | 5 +++ .../__tests__/QueueFlushingPlugin.test.ts | 39 ++++++++++++++++++- packages/core/src/types.ts | 3 +- 7 files changed, 84 insertions(+), 8 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index b0c29e39..e96c17dc 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -167,14 +167,32 @@ export class SegmentClient { return { ...this.config }; } + /** + * Method for clearing flush queue. + */ clearFlushQueue() { const plugins = this.getPlugins(); - plugins.forEach(async(plugin)=>{ + plugins.forEach(async (plugin) => { if (plugin.type == PluginType.destination) { - await plugin?.clearFlushQueue() + await plugin?.clearFlushQueue(); this.flushPolicyExecuter.reset(); } - }) + }); + } + + /** + * Method to get count of events in flush queue. + */ + async getFlushQueueCount() { + const plugins = this.getPlugins(); + let totalEventsCount = 0; + for (let i = 0; i <= plugins.length; i++) { + if (plugins[i]?.type == PluginType.destination) { + const eventsCount = await plugins[i]?.getQueueCount(); + totalEventsCount += eventsCount; + } + } + return totalEventsCount; } constructor({ diff --git a/packages/core/src/client.tsx b/packages/core/src/client.tsx index b923c345..0c3f7fd1 100644 --- a/packages/core/src/client.tsx +++ b/packages/core/src/client.tsx @@ -71,7 +71,8 @@ export const useAnalytics = (): ClientMethods => { group: async (...args) => client?.group(...args), alias: async (...args) => client?.alias(...args), reset: async (...args) => client?.reset(...args), - clearFlushQueue: async () => client?.clearFlushQueue() + clearFlushQueue: async () => client?.clearFlushQueue(), + getFlushQueueCount: async () => client?.getFlushQueueCount(), }; }, [client]); }; diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index b69b341c..ab8ce33d 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -40,6 +40,11 @@ export class Plugin { async clearFlushQueue() { // Overridden in Segment Destination } + + async getQueueCount() { + // Overridden in Segment Destination + return 0; + } } export class EventPlugin extends Plugin { diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 8b46502c..dc19d3cc 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -120,7 +120,16 @@ export class QueueFlushingPlugin extends UtilityPlugin { */ async clearQueue() { await this.queueStore?.dispatch(() => { - return { events: [] } - }) + return { events: [] }; + }); + } + + /** + * Returns the count of items in the queue + */ + async getQueueCount() { + const state = await this.queueStore?.getState(); + const eventsCount = state?.events.length || 0; + return eventsCount; } } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index f90c7063..ab6aa3fd 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -135,4 +135,9 @@ export class SegmentDestination extends DestinationPlugin { //Wait until clearing current Flush queue return this.queuePlugin.clearQueue(); } + async getQueueCount() { + // Wait until getting the count of queue + const eventsCount = await this.queuePlugin.getQueueCount(); + return eventsCount; + } } diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 55aa9625..e5dfaf6c 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -110,5 +110,42 @@ describe('QueueFlushingPlugin', () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); -}); + }); + + it('should return the count of items in the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + let eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(2); + + await queuePlugin.dequeue(event1); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(1); + + await queuePlugin.clearQueue(); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(0); + }); }); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 34ac135f..95997714 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -159,7 +159,8 @@ export type ClientMethods = { group: (groupId: string, groupTraits?: GroupTraits) => Promise; alias: (newUserId: string) => Promise; reset: (resetAnonymousId?: boolean) => Promise; - clearFlushQueue: () => Promise ; + clearFlushQueue: () => Promise; + getFlushQueueCount: () => Promise; }; type ContextApp = {