Skip to content

Commit

Permalink
Merge pull request #213 from rrhodes/feature/disable-payload-logs
Browse files Browse the repository at this point in the history
feat: disablePayloadLogs flag to suppress payload info
  • Loading branch information
alexcasalboni authored Aug 15, 2023
2 parents 4d0d35d + e38c5a3 commit e21c2f4
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 29 deletions.
3 changes: 2 additions & 1 deletion README-INPUT-OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ The state machine accepts the following input parameters:
* **preProcessorARN** (string): it must be the ARN of a Lambda function; if provided, the function will be invoked before every invocation of `lambdaARN`; more details below in the [Pre/Post-processing functions section](#user-content-prepost-processing-functions)
* **postProcessorARN** (string): it must be the ARN of a Lambda function; if provided, the function will be invoked after every invocation of `lambdaARN`; more details below in the [Pre/Post-processing functions section](#user-content-prepost-processing-functions)
* **discardTopBottom** (number between 0.0 and 0.4, by default is 0.2): By default, the state machine will discard the top/bottom 20% of "outliers" (the fastest and slowest), to filter out the effects of cold starts that would bias the overall averages. You can customize this parameter by providing a value between 0 and 0.4, with 0 meaning no results are discarded and 0.4 meaning that 40% of the top/bottom results are discarded (i.e. only 20% of the results are considered).
* **sleepBetweenRunsMs** (integer) If provided, the time in milliseconds that the tuner function will sleep/wait after invoking your function, but before carrying out the Post-Processing step, should that be provided. This could be used if you have agressive downstream rate limits you need to respect. By default this will be set to 0 and the function won't sleep between invocations. Setting this value will have no effect if running the invocations in parallel.
* **sleepBetweenRunsMs** (integer) If provided, the time in milliseconds that the tuner function will sleep/wait after invoking your function, but before carrying out the Post-Processing step, should that be provided. This could be used if you have aggressive downstream rate limits you need to respect. By default this will be set to 0 and the function won't sleep between invocations. Setting this value will have no effect if running the invocations in parallel.
* **disablePayloadLogs** (boolean) If provided and set to a truthy value, suppresses `payload` from error messages and logs. If `preProcessorARN` is provided, this also suppresses the output payload of the pre-processor.

## State machine configuration (at deployment time)

Expand Down
2 changes: 2 additions & 0 deletions README-SAR.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ The state machine accepts the following input parameters:
* **preProcessorARN** (string): it must be the ARN of a Lambda function; if provided, the function will be invoked before every invocation of `lambdaARN`; more details below in the Pre/Post-processing functions section
* **postProcessorARN** (string): it must be the ARN of a Lambda function; if provided, the function will be invoked after every invocation of `lambdaARN`; more details below in the Pre/Post-processing functions section
* **discardTopBottom** (number between 0.0 and 0.4, by default is 0.2): By default, the state machine will discard the top/bottom 20% of "outliers" (the fastest and slowest), to filter out the effects of cold starts that would bias the overall averages. You can customize this parameter by providing a value between 0 and 0.4, with 0 meaning no results are discarded and 0.4 meaning that 40% of the top/bottom results are discarded (i.e. only 20% of the results are considered).
* **sleepBetweenRunsMs** (integer) If provided, the time in milliseconds that the tuner function will sleep/wait after invoking your function, but before carrying out the Post-Processing step, should that be provided. This could be used if you have aggressive downstream rate limits you need to respect. By default this will be set to 0 and the function won't sleep between invocations. Setting this value will have no effect if running the invocations in parallel.
* **disablePayloadLogs** (boolean) If provided and set to a truthy value, suppresses `payload` from error messages and logs. If `preProcessorARN` is provided, this also suppresses the output payload of the pre-processor.


## State machine configuration (at deployment time)
Expand Down
23 changes: 17 additions & 6 deletions lambda/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports.handler = async(event, context) => {
postProcessorARN,
discardTopBottom,
sleepBetweenRunsMs,
disablePayloadLogs,
} = await extractDataFromInput(event);

validateInput(lambdaARN, value, num); // may throw
Expand Down Expand Up @@ -55,6 +56,7 @@ module.exports.handler = async(event, context) => {
preARN: preProcessorARN,
postARN: postProcessorARN,
sleepBetweenRunsMs: sleepBetweenRunsMs,
disablePayloadLogs: disablePayloadLogs,
};

// wait if the function/alias state is Pending
Expand Down Expand Up @@ -133,17 +135,22 @@ const extractDataFromInput = async(event) => {
postProcessorARN: input.postProcessorARN,
discardTopBottom: discardTopBottom,
sleepBetweenRunsMs: sleepBetweenRunsMs,
disablePayloadLogs: !!input.disablePayloadLogs,
};
};

const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN}) => {
const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, disablePayloadLogs}) => {
const results = [];
// run all invocations in parallel ...
const invocations = utils.range(num).map(async(_, i) => {
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN);
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN, disablePayloadLogs);
// invocation errors return 200 and contain FunctionError and Payload
if (invocationResults.FunctionError) {
throw new Error(`Invocation error (running in parallel): ${invocationResults.Payload} with payload ${JSON.stringify(actualPayload)}`);
let errorMessage = `Invocation error (running in parallel): ${invocationResults.Payload}`;
if (!disablePayloadLogs) {
errorMessage += ` with payload ${JSON.stringify(actualPayload)}`;
}
throw new Error(errorMessage);
}
results.push(invocationResults);
});
Expand All @@ -152,14 +159,18 @@ const runInParallel = async({num, lambdaARN, lambdaAlias, payloads, preARN, post
return results;
};

const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs}) => {
const runInSeries = async({num, lambdaARN, lambdaAlias, payloads, preARN, postARN, sleepBetweenRunsMs, disablePayloadLogs}) => {
const results = [];
for (let i = 0; i < num; i++) {
// run invocations in series
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN);
const {invocationResults, actualPayload} = await utils.invokeLambdaWithProcessors(lambdaARN, lambdaAlias, payloads[i], preARN, postARN, disablePayloadLogs);
// invocation errors return 200 and contain FunctionError and Payload
if (invocationResults.FunctionError) {
throw new Error(`Invocation error (running in series): ${invocationResults.Payload} with payload ${JSON.stringify(actualPayload)}`);
let errorMessage = `Invocation error (running in series): ${invocationResults.Payload}`;
if (!disablePayloadLogs) {
errorMessage += ` with payload ${JSON.stringify(actualPayload)}`;
}
throw new Error(errorMessage);
}
if (sleepBetweenRunsMs > 0) {
await utils.sleep(sleepBetweenRunsMs);
Expand Down
26 changes: 17 additions & 9 deletions lambda/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,39 +254,43 @@ module.exports.deleteLambdaAlias = (lambdaARN, alias) => {
/**
* Invoke a (pre/post-)processor Lambda function and return its output (data.Payload).
*/
module.exports.invokeLambdaProcessor = async(processorARN, payload, preOrPost = 'Pre') => {
const processorData = await utils.invokeLambda(processorARN, null, payload);
module.exports.invokeLambdaProcessor = async(processorARN, payload, preOrPost = 'Pre', disablePayloadLogs = false) => {
const processorData = await utils.invokeLambda(processorARN, null, payload, disablePayloadLogs);
if (processorData.FunctionError) {
throw new Error(`${preOrPost}Processor ${processorARN} failed with error ${processorData.Payload} and payload ${JSON.stringify(payload)}`);
let errorMessage = `${preOrPost}Processor ${processorARN} failed with error ${processorData.Payload}`;
if (!disablePayloadLogs) {
errorMessage += ` and payload ${JSON.stringify(payload)}`;
}
throw new Error(errorMessage);
}
return processorData.Payload;
};

/**
* Wrapper around Lambda function invocation with pre/post-processor functions.
*/
module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, preARN, postARN) => {
module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, preARN, postARN, disablePayloadLogs) => {

var actualPayload = payload; // might change based on pre-processor

// first invoke pre-processor, if provided
if (preARN) {
console.log('Invoking pre-processor');
// overwrite payload with pre-processor's output (only if not empty)
const preProcessorOutput = await utils.invokeLambdaProcessor(preARN, payload, 'Pre');
const preProcessorOutput = await utils.invokeLambdaProcessor(preARN, payload, 'Pre', disablePayloadLogs);
if (preProcessorOutput) {
actualPayload = preProcessorOutput;
}
}

// invoke function to be power-tuned
const invocationResults = await utils.invokeLambda(lambdaARN, alias, actualPayload);
const invocationResults = await utils.invokeLambda(lambdaARN, alias, actualPayload, disablePayloadLogs);

// then invoke post-processor, if provided
if (postARN) {
console.log('Invoking post-processor');
// note: invocation may have failed (invocationResults.FunctionError)
await utils.invokeLambdaProcessor(postARN, invocationResults.Payload, 'Post');
await utils.invokeLambdaProcessor(postARN, invocationResults.Payload, 'Post', disablePayloadLogs);
}

return {
Expand All @@ -298,8 +302,12 @@ module.exports.invokeLambdaWithProcessors = async(lambdaARN, alias, payload, pre
/**
* Invoke a given Lambda Function:Alias with payload and return its logs.
*/
module.exports.invokeLambda = (lambdaARN, alias, payload) => {
console.log(`Invoking function ${lambdaARN}:${alias || '$LATEST'} with payload ${JSON.stringify(payload)}`);
module.exports.invokeLambda = (lambdaARN, alias, payload, disablePayloadLogs) => {
let consoleLogMessage = `Invoking function ${lambdaARN}:${alias || '$LATEST'}`;
if (!disablePayloadLogs) {
consoleLogMessage += ` with payload ${JSON.stringify(payload)}`;
}
console.log(consoleLogMessage);
const params = {
FunctionName: lambdaARN,
Qualifier: alias,
Expand Down
6 changes: 5 additions & 1 deletion test/setup.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

const sinon = require('sinon');

let consoleLogStub;

// hide all logging for tests
// comment out the line which
// you would like to see logged
// during test run
sinon.stub(console, 'log');
consoleLogStub = sinon.stub(console, 'log');
sinon.stub(console, 'info');
sinon.stub(console, 'debug');
sinon.stub(console, 'error');

module.exports = { consoleLogStub };
47 changes: 39 additions & 8 deletions test/unit/test-lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const invokeForSuccess = async(handler, event) => {
}
};

// utility to invoke handler (success case)
// utility to invoke handler and assert an exception is caught (success case)
const invokeForFailure = async(handler, event) => {

let result;
Expand Down Expand Up @@ -792,7 +792,7 @@ describe('Lambda Functions', async() => {
expect(waitForAliasActiveCounter).to.be(0);
});

it('should include payload in exception message if invocation fails (series)', async() => {
const invokeForFailureInSeriesAndAssertOnErrorMessage = async({disablePayloadLogs, isPayloadInErrorMessage}) => {
invokeLambdaStub && invokeLambdaStub.restore();
invokeLambdaStub = sandBox.stub(utils, 'invokeLambda')
.callsFake(async(_arn, _alias, payload) => {
Expand All @@ -807,17 +807,33 @@ describe('Lambda Functions', async() => {
lambdaARN: 'arnOK',
num: 10,
payload: 'SENTINEL',
disablePayloadLogs: disablePayloadLogs,
},
});

expect(error.message).to.contain('SENTINEL');
expect(error.message.includes('SENTINEL')).to.be(isPayloadInErrorMessage);
expect(error.message).to.contain('in series');

expect(getLambdaConfigCounter).to.be(1);
expect(waitForAliasActiveCounter).to.be(0);
});
};

it('should include payload in exception message if invocation fails and disablePayloadLogs is undefined (series)', async() => invokeForFailureInSeriesAndAssertOnErrorMessage({
disablePayloadLogs: undefined,
isPayloadInErrorMessage: true,
}));

it('should include payload in exception message if invocation fails and disablePayloadLogs is false (series)', async() => invokeForFailureInSeriesAndAssertOnErrorMessage({
disablePayloadLogs: false,
isPayloadInErrorMessage: true,
}));

it('should include payload in exception message if invocation fails (parallel)', async() => {
it('should not include payload in exception message if invocation fails and disablePayloadLogs is true (series)', async() => invokeForFailureInSeriesAndAssertOnErrorMessage({
disablePayloadLogs: true,
isPayloadInErrorMessage: false,
}));

const invokeForFailureInParallelAndAssertOnErrorMessage = async({disablePayloadLogs, isPayloadInErrorMessage}) => {
invokeLambdaStub && invokeLambdaStub.restore();
invokeLambdaStub = sandBox.stub(utils, 'invokeLambda')
.callsFake(async(_arn, _alias, payload) => {
Expand All @@ -833,16 +849,31 @@ describe('Lambda Functions', async() => {
num: 10,
parallelInvocation: true,
payload: 'SENTINEL',
disablePayloadLogs: disablePayloadLogs,
},
});

expect(error.message).to.contain('SENTINEL');
expect(error.message.includes('SENTINEL')).to.be(isPayloadInErrorMessage);
expect(error.message).to.contain('in parallel');

expect(getLambdaConfigCounter).to.be(1);
expect(waitForAliasActiveCounter).to.be(0);
});

};

it('should include payload in exception message if invocation fails and disablePayloadLogs is undefined (parallel)', async() => invokeForFailureInParallelAndAssertOnErrorMessage({
disablePayloadLogs: undefined,
isPayloadInErrorMessage: true,
}));

it('should include payload in exception message if invocation fails and disablePayloadLogs is false (parallel)', async() => invokeForFailureInParallelAndAssertOnErrorMessage({
disablePayloadLogs: false,
isPayloadInErrorMessage: true,
}));

it('should not include payload in exception message if invocation fails and disablePayloadLogs is true (parallel)', async() => invokeForFailureInParallelAndAssertOnErrorMessage({
disablePayloadLogs: true,
isPayloadInErrorMessage: false,
}));

it('should include weighted payload in exception message if invocation fails (series)', async() => {
invokeLambdaStub && invokeLambdaStub.restore();
Expand Down
71 changes: 67 additions & 4 deletions test/unit/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ process.env.baseCosts = '{"x86_64": {"ap-east-1":2.9e-9,"af-south-1":2.8e-9,"me-
process.env.AWS_REGION = 'af-south-1';

const utils = require('../../lambda/utils');
const { consoleLogStub: consoleLogSetupStub } = require('../setup.spec');

const sandBox = sinon.createSandbox();

Expand Down Expand Up @@ -458,7 +459,8 @@ describe('Lambda Utils', () => {
expect(data).to.be('{"OK": "OK"}');
});

it('should explode if processor fails', async() => {
const invokeLambdaProcessorReturningUnhandledError = async({ disablePayloadLogs, isPayloadInErrorMessage }) => {
const payload = {keyOne: 'value-one'};
sandBox.stub(utils, 'invokeLambda')
.callsFake(async() => {
invokeLambdaCounter++;
Expand All @@ -468,14 +470,28 @@ describe('Lambda Utils', () => {
};
});
try {
const data = await utils.invokeLambdaProcessor('arnOK', {});
const data = await utils.invokeLambdaProcessor('arnOK', payload, 'Pre', disablePayloadLogs);
expect(data).to.be(null);
} catch (ex) {
expect(ex.message.includes('failed with error')).to.be(true);
expect(ex.message).to.contain('failed with error');
expect(ex.message.includes('and payload')).to.be(isPayloadInErrorMessage);
}

expect(invokeLambdaCounter).to.be(1);
});
};

it('should explode if processor fails and share payload in error when disablePayloadLogs is undefined', async() => invokeLambdaProcessorReturningUnhandledError({
disablePayloadLogs: undefined,
isPayloadInErrorMessage: true,
}));
it('should explode if processor fails and share payload in error when disablePayloadLogs is false', async() => invokeLambdaProcessorReturningUnhandledError({
disablePayloadLogs: false,
isPayloadInErrorMessage: true,
}));
it('should explode if processor fails and not share payload in error when disablePayloadLogs is true', async() => invokeLambdaProcessorReturningUnhandledError({
disablePayloadLogs: true,
isPayloadInErrorMessage: false,
}));
});

const isJsonString = (str) => {
Expand Down Expand Up @@ -890,4 +906,51 @@ describe('Lambda Utils', () => {

});

describe('invokeLambda', () => {
const alias = 'aliasName';
const arn = 'arn:aws:lambda:eu-west-1:XXX:function:name';
const payload = {testKey: 'test-value'};

let consoleLogStub;

const invokeLambdaAndAssertOnConsoleLog = async({disablePayloadLogs, isPayloadInConsoleLog}) => {
utils.invokeLambda(arn, alias, payload, disablePayloadLogs);

const consoleLogArg = consoleLogStub.firstCall.args[0];

expect(consoleLogArg).to.contain('Invoking function');
expect(consoleLogArg.includes('with payload')).to.be(isPayloadInConsoleLog);
};

before(() => {
if (consoleLogSetupStub) {
consoleLogStub = consoleLogSetupStub;
} else {
consoleLogStub = sinon.stub(console, 'log');
}
});

beforeEach(() => {
consoleLogStub.resetHistory();
});

after(() => {
if (!consoleLogSetupStub) {
consoleLogStub.restore();
}
});

it('should invoke lambda and share payload in console log when disablePayloadLogs is undefined', async() => invokeLambdaAndAssertOnConsoleLog({
disablePayloadLogs: undefined,
isPayloadInConsoleLog: true,
}));
it('should invoke lambda and share payload in console log when disablePayloadLogs is false', async() => invokeLambdaAndAssertOnConsoleLog({
disablePayloadLogs: false,
isPayloadInConsoleLog: true,
}));
it('should invoke lambda and not share payload in console log when disablePayloadLogs is true', async() => invokeLambdaAndAssertOnConsoleLog({
disablePayloadLogs: true,
isPayloadInConsoleLog: false,
}));
});
});

0 comments on commit e21c2f4

Please sign in to comment.