diff --git a/packages/destination-actions/src/destinations/klaviyo/__tests__/__snapshots__/snapshot.test.ts.snap b/packages/destination-actions/src/destinations/klaviyo/__tests__/__snapshots__/snapshot.test.ts.snap index fb456e1b0c..f45c5a5558 100644 --- a/packages/destination-actions/src/destinations/klaviyo/__tests__/__snapshots__/snapshot.test.ts.snap +++ b/packages/destination-actions/src/destinations/klaviyo/__tests__/__snapshots__/snapshot.test.ts.snap @@ -130,7 +130,7 @@ Object { "data": Object { "attributes": Object { "anonymous_id": "mTdOx(Nl)", - "email": "ujoeri@ifosi.kp", + "email": "atvilifo@rik.lr", "external_id": "mTdOx(Nl)", "phone_number": "+5694788449", }, diff --git a/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts b/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts new file mode 100644 index 0000000000..30ab0ad53c --- /dev/null +++ b/packages/destination-actions/src/destinations/klaviyo/__tests__/multistatus.test.ts @@ -0,0 +1,260 @@ +import { SegmentEvent, createTestEvent, createTestIntegration } from '@segment/actions-core' +import nock from 'nock' +import { API_URL } from '../config' +import Klaviyo from '../index' + +beforeEach(() => nock.cleanAll()) + +const testDestination = createTestIntegration(Klaviyo) + +const settings = { + api_key: 'my-api-key' +} + +const timestamp = '2024-07-22T20:08:49.7931Z' + +describe('MultiStatus', () => { + describe('trackEvent', () => { + const mapping = { + profile: { + '@path': '$.properties' + }, + metric_name: { + '@path': '$.event' + }, + properties: { + '@path': '$.properties' + }, + time: { + '@path': '$.timestamp' + }, + unique_id: { + '@path': '$.messageId' + } + } + + it("should successfully handle those payload where phone_number is invalid and couldn't be converted to E164 format", async () => { + nock(API_URL).post('/event-bulk-create-jobs/').reply(202, {}) + + const events: SegmentEvent[] = [ + // Event with invalid phone_number + createTestEvent({ + type: 'track', + timestamp, + properties: { + country_code: 'IN', + phone_number: '701271', + email: 'valid@gmail.com' + } + }), + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'valid@gmail.com' + } + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The First event fails as pre-request validation fails for having invalid phone_number and could not be converted to E164 format + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.', + errorreporter: 'INTEGRATIONS' + }) + + // The Second event doesn't fail as there is no error reported by Klaviyo API + expect(response[1]).toMatchObject({ + status: 200, + body: '{}' + }) + }) + + it('should successfully handle a batch of events with complete success response from Klaviyo API', async () => { + nock(API_URL).post('/event-bulk-create-jobs/').reply(202, {}) + + const events: SegmentEvent[] = [ + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'valid@gmail.com' + } + }), + // Event without any user identifier + createTestEvent({ + type: 'track', + timestamp + }), + //Event with invalid Email + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'invalid_email@gmail..com', + list_id: '123' + } + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The first event doesn't fail as there is no error reported by Klaviyo API + expect(response[0]).toMatchObject({ + status: 200, + body: '{}' + }) + + // The second event fails as pre-request validation fails for not having any user identifier + expect(response[1]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.', + errorreporter: 'INTEGRATIONS' + }) + // The third event fails as pre-request validation fails for having invalid email + expect(response[2]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Email must be a valid email address string but it was not.', + errorreporter: 'INTEGRATIONS' + }) + }) + + it('should successfully handle a batch of events with failure response from Klaviyo API', async () => { + // Mocking a 400 response from Klaviyo API + const mockResponse = { + errors: [ + { + id: '752f7ece-af20-44e0-aa3a-b13290d98e72', + status: 400, + code: 'invalid', + title: 'Invalid input.', + detail: 'Invalid input', + source: { + pointer: '/data/attributes/events-bulk-create/data/0/attributes/email' + }, + links: {}, + meta: {} + } + ] + } + nock(API_URL).post('/event-bulk-create-jobs/').reply(400, mockResponse) + + const events: SegmentEvent[] = [ + // Invalid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + email: 'gk@gmail.com' + } + }), + // Valid Event + createTestEvent({ + type: 'track', + timestamp, + properties: { + external_id: 'Xi1234' + } + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The first doesn't fail as there is no error reported by Klaviyo API + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'BAD_REQUEST', + errormessage: 'Invalid input', + sent: { + profile: { + email: 'gk@gmail.com' + }, + metric_name: 'Test Event', + properties: { + email: 'gk@gmail.com' + }, + time: timestamp + }, + body: '{"id":"752f7ece-af20-44e0-aa3a-b13290d98e72","status":400,"code":"invalid","title":"Invalid input.","detail":"Invalid input","source":{"pointer":"/data/attributes/events-bulk-create/data/0/attributes/email"},"links":{},"meta":{}}' + }) + + // The second event fails as Klaviyo API reports an error + expect(response[1]).toMatchObject({ + status: 429, + sent: { + profile: { + external_id: 'Xi1234' + }, + metric_name: 'Test Event', + properties: { + external_id: 'Xi1234' + }, + time: timestamp + }, + body: 'Retry' + }) + }) + + it('should successfully handle a batch when all payload is invalid', async () => { + const events: SegmentEvent[] = [ + // Event with invalid phone_number + createTestEvent({ + type: 'track', + timestamp, + properties: { + country_code: 'IN', + phone_number: '701271', + email: 'valid@gmail.com' + } + }), + // Event without any user identifier + createTestEvent({ + type: 'track', + timestamp, + properties: {} + }) + ] + + const response = await testDestination.executeBatch('trackEvent', { + events, + settings, + mapping + }) + + // The First event fails as pre-request validation fails for having invalid phone_number and could not be converted to E164 format + expect(response[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.', + errorreporter: 'INTEGRATIONS' + }) + + // The second event fails as pre-request validation fails for not having any user identifier + expect(response[1]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.', + errorreporter: 'INTEGRATIONS' + }) + }) + }) +}) diff --git a/packages/destination-actions/src/destinations/klaviyo/functions.ts b/packages/destination-actions/src/destinations/klaviyo/functions.ts index 2d9f2365b9..89c71810d2 100644 --- a/packages/destination-actions/src/destinations/klaviyo/functions.ts +++ b/packages/destination-actions/src/destinations/klaviyo/functions.ts @@ -3,8 +3,12 @@ import { RequestClient, DynamicFieldResponse, IntegrationError, - PayloadValidationError + PayloadValidationError, + MultiStatusResponse, + HTTPError, + ErrorCodes } from '@segment/actions-core' +import { JSONLikeObject } from '@segment/actions-core' import { API_URL, REVISION_DATE } from './config' import { Settings } from './generated-types' import { @@ -20,10 +24,15 @@ import { UnsubscribeProfile, UnsubscribeEventData, GroupedProfiles, - AdditionalAttributes + AdditionalAttributes, + KlaviyoAPIErrorResponse } from './types' import { Payload } from './upsertProfile/generated-types' +import { Payload as TrackEventPayload } from './trackEvent/generated-types' +import dayjs from '../../lib/dayjs' import { PhoneNumberUtil, PhoneNumberFormat } from 'google-libphonenumber' +import { eventBulkCreateRegex } from './properties' +import { ActionDestinationErrorResponseType } from '@segment/actions-core/destination-kittypes' const phoneUtil = PhoneNumberUtil.getInstance() @@ -451,3 +460,205 @@ export function processPhoneNumber(initialPhoneNumber?: string, country_code?: s return phone_number } + +export async function sendBatchedTrackEvent(request: RequestClient, payloads: TrackEventPayload[]) { + const multiStatusResponse = new MultiStatusResponse() + const { filteredPayloads, validPayloadIndicesBitmap } = validateAndPreparePayloads(payloads, multiStatusResponse) + // if there are no payloads with valid phone number/email/external_id, return multiStatusResponse + if (!filteredPayloads.length) { + return multiStatusResponse + } + const payloadToSend = { + data: { + type: 'event-bulk-create-job', + attributes: { + 'events-bulk-create': { + data: filteredPayloads + } + } + } + } + + try { + const response = await request(`${API_URL}/event-bulk-create-jobs/`, { + method: 'POST', + json: payloadToSend, + headers: { + revision: '2024-10-15' + } + }) + updateMultiStatusWithSuccessData(filteredPayloads, validPayloadIndicesBitmap, multiStatusResponse, response) + } catch (err) { + if (err instanceof HTTPError) { + const errorResponse = await err?.response?.json() + handleKlaviyoAPIErrorResponse( + payloads as object as JSONLikeObject[], + errorResponse, + multiStatusResponse, + validPayloadIndicesBitmap, + eventBulkCreateRegex + ) + } else { + // Bubble up the error and let Actions Framework handle it + throw err + } + } + return multiStatusResponse +} + +function validateAndPreparePayloads(payloads: TrackEventPayload[], multiStatusResponse: MultiStatusResponse) { + const filteredPayloads: JSONLikeObject[] = [] + const validPayloadIndicesBitmap: number[] = [] + + payloads.forEach((payload, originalBatchIndex) => { + const { email, phone_number, external_id, anonymous_id, country_code } = payload.profile + if (!email && !phone_number && !external_id && !anonymous_id) { + multiStatusResponse.setErrorResponseAtIndex(originalBatchIndex, { + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'One of External ID, Anonymous ID, Phone Number or Email is required.' + }) + return + } + + if (phone_number) { + // Validate and convert the phone number if present + const validPhoneNumber = validateAndConvertPhoneNumber(phone_number, country_code as string) + // If the phone number is not valid, skip this payload + if (!validPhoneNumber) { + multiStatusResponse.setErrorResponseAtIndex(originalBatchIndex, { + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: 'Phone number could not be converted to E.164 format.' + }) + return // Skip this payload + } + + // Update the payload's phone number with the validated format + payload.profile.phone_number = validPhoneNumber + delete payload?.profile?.country_code + } + + const profileToAdd = constructBulkCreateEventPayload(payload) + filteredPayloads.push(profileToAdd as JSONLikeObject) + validPayloadIndicesBitmap.push(originalBatchIndex) + }) + + return { filteredPayloads, validPayloadIndicesBitmap } +} + +function constructBulkCreateEventPayload(payload: TrackEventPayload) { + return { + type: 'event-bulk-create', + attributes: { + profile: { + data: { + type: 'profile', + attributes: payload.profile + } + }, + events: { + data: [ + { + type: 'event', + attributes: { + metric: { + data: { + type: 'metric', + attributes: { + name: payload.metric_name + } + } + }, + properties: { ...payload.properties }, + time: payload?.time ? dayjs(payload.time).toISOString() : undefined, + value: payload.value, + unique_id: payload.unique_id + } + } + ] + } + } + } +} + +/** + * Handles the error response from the Klaviyo API and updates the status of each payload accordingly. + * This function processes the `errors` array from the Klaviyo API response, identifies which payloads + * failed based on their error pointers, and sets an appropriate error response for each payload. If some + * events in a batch are invalid, it marks the rest as "retryable" if they are part of the same batch. + * + * @param {JSONLikeObject[]} payloads - An array of payloads that were sent to the Klaviyo API. + * @param {KlaviyoAPIErrorResponse} response - The error response from the Klaviyo API. + * @param {MultiStatusResponse} multiStatusResponse - An object used to store and track the status of each payload. + * @param {number[]} validPayloadIndicesBitmap - A bitmap of indices representing valid payloads in the `payloads` array. + * @param {RegExp} regex - A regular expression used to parse the error pointers to find the index of the corresponding payload. + */ + +function handleKlaviyoAPIErrorResponse( + payloads: JSONLikeObject[], + response: KlaviyoAPIErrorResponse, + multiStatusResponse: MultiStatusResponse, + validPayloadIndicesBitmap: number[], + regex: RegExp +) { + if (response?.errors && Array.isArray(response.errors)) { + const invalidIndexSet = new Set() + response.errors.forEach((error: KlaviyoAPIError) => { + const indexInOriginalPayload = getIndexFromErrorPointer(error.source.pointer, validPayloadIndicesBitmap, regex) + if (indexInOriginalPayload !== -1 && !multiStatusResponse.isErrorResponseAtIndex(indexInOriginalPayload)) { + multiStatusResponse.setErrorResponseAtIndex(indexInOriginalPayload, { + status: error.status, + // errortype will be inferred from the error.response.status + errormessage: error.detail, + sent: payloads[indexInOriginalPayload], + body: JSON.stringify(error) + } as ActionDestinationErrorResponseType) + invalidIndexSet.add(indexInOriginalPayload) + } + }) + + for (const index of validPayloadIndicesBitmap) { + if (!invalidIndexSet.has(index)) { + multiStatusResponse.setErrorResponseAtIndex(index, { + errormessage: + "This event wasn't delivered because of few bad events in the same batch to Klaviyo. This will be retried", + errortype: 'RETRYABLE_BATCH_FAILURE' as keyof typeof ErrorCodes, + status: 429, + sent: payloads[index], + body: 'Retry' + }) + } + } + } +} + +function getIndexFromErrorPointer(pointer: string, validPayloadIndicesBitmap: number[], regex: RegExp) { + const match = regex.exec(pointer) + if (match && match[1]) { + const index = parseInt(match[1], 10) + return validPayloadIndicesBitmap[index] !== undefined ? validPayloadIndicesBitmap[index] : -1 + } + return -1 +} +/** + * Updates the multi-status response with success data for each payload. + * @param {JSONLikeObject[]} filteredPayloads The list of filtered payloads to process. + * @param {number[]} validPayloadIndicesBitmap A bitmap of valid payload indices. + * @param {MultiStatusResponse} multiStatusResponse The multi-status response object to update. + * @param {any} response The response from the import job request containing the data. + */ +export function updateMultiStatusWithSuccessData( + filteredPayloads: JSONLikeObject[], + validPayloadIndicesBitmap: number[], + multiStatusResponse: MultiStatusResponse, + response: any +) { + filteredPayloads.forEach((payload, index) => { + multiStatusResponse.setSuccessResponseAtIndex(validPayloadIndicesBitmap[index], { + status: 200, + sent: payload, + body: JSON.stringify(response?.data) + }) + }) +} diff --git a/packages/destination-actions/src/destinations/klaviyo/properties.ts b/packages/destination-actions/src/destinations/klaviyo/properties.ts index f2f440a40b..6228b7e9f7 100644 --- a/packages/destination-actions/src/destinations/klaviyo/properties.ts +++ b/packages/destination-actions/src/destinations/klaviyo/properties.ts @@ -162,3 +162,4 @@ export const country_code: InputField = { ] } } +export const eventBulkCreateRegex = /\/data\/attributes\/events-bulk-create\/data\/(\d+)/ diff --git a/packages/destination-actions/src/destinations/klaviyo/trackEvent/__tests__/__snapshots__/snapshot.test.ts.snap b/packages/destination-actions/src/destinations/klaviyo/trackEvent/__tests__/__snapshots__/snapshot.test.ts.snap index 634a3104bb..b30a1b785c 100644 --- a/packages/destination-actions/src/destinations/klaviyo/trackEvent/__tests__/__snapshots__/snapshot.test.ts.snap +++ b/packages/destination-actions/src/destinations/klaviyo/trackEvent/__tests__/__snapshots__/snapshot.test.ts.snap @@ -16,7 +16,7 @@ Object { "data": Object { "attributes": Object { "anonymous_id": "]DD4LgSzT#hw(U]@J$a", - "email": "so@uzwumiz.wf", + "email": "wumizkiw@lecu.hu", "external_id": "]DD4LgSzT#hw(U]@J$a", "phone_number": "+2458829936", }, diff --git a/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts b/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts index 2422660431..06c04139c3 100644 --- a/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts +++ b/packages/destination-actions/src/destinations/klaviyo/trackEvent/generated-types.ts @@ -5,6 +5,9 @@ export interface Payload { * Properties of the profile that triggered this event. */ profile: { + /** + * The user's email to send to Klavio. + */ email?: string phone_number?: string /** @@ -50,4 +53,12 @@ export interface Payload { * */ unique_id?: string + /** + * When enabled, the action will use the klaviyo batch API. + */ + enable_batching?: boolean + /** + * Maximum number of events to include in each batch. Actual batch sizes may be lower. + */ + batch_size?: number } diff --git a/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts b/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts index 0c59233859..592ca17ef8 100644 --- a/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts +++ b/packages/destination-actions/src/destinations/klaviyo/trackEvent/index.ts @@ -1,12 +1,11 @@ import type { ActionDefinition } from '@segment/actions-core' import type { Settings } from '../generated-types' import type { Payload } from './generated-types' - import { PayloadValidationError } from '@segment/actions-core' import { API_URL } from '../config' -import { processPhoneNumber } from '../functions' -import { country_code } from '../properties' -import dayjs from 'dayjs' +import { batch_size, enable_batching, country_code } from '../properties' +import { processPhoneNumber, sendBatchedTrackEvent } from '../functions' +import dayjs from '../../../lib/dayjs' const action: ActionDefinition = { title: 'Track Event', @@ -20,7 +19,9 @@ const action: ActionDefinition = { properties: { email: { label: 'Email', - type: 'string' + type: 'string', + description: `The user's email to send to Klavio.`, + format: 'email' }, phone_number: { label: 'Phone Number', @@ -91,7 +92,9 @@ const action: ActionDefinition = { default: { '@path': '$.messageId' } - } + }, + enable_batching: { ...enable_batching }, + batch_size: { ...batch_size, default: 1000 } }, perform: (request, { payload }) => { const { email, phone_number: initialPhoneNumber, external_id, anonymous_id, country_code } = payload.profile @@ -128,11 +131,13 @@ const action: ActionDefinition = { } } } - return request(`${API_URL}/events/`, { method: 'POST', json: eventData }) + }, + performBatch: (request, { payload }) => { + return sendBatchedTrackEvent(request, payload) } } diff --git a/packages/destination-actions/src/destinations/klaviyo/types.ts b/packages/destination-actions/src/destinations/klaviyo/types.ts index fc645ad0c5..d28bcf24f6 100644 --- a/packages/destination-actions/src/destinations/klaviyo/types.ts +++ b/packages/destination-actions/src/destinations/klaviyo/types.ts @@ -219,3 +219,17 @@ export interface AdditionalAttributes { title?: string image?: string } +export interface KlaviyoAPIError { + id: string + status: number + code: string + title: string + detail: string + source: { + pointer: string + parameter?: string + } +} +export interface KlaviyoAPIErrorResponse { + errors: KlaviyoAPIError[] +}