Skip to content

Commit

Permalink
Switch to Sentry trace continuation API
Browse files Browse the repository at this point in the history
  • Loading branch information
dcramer committed Apr 24, 2024
1 parent cc0300b commit f980daa
Showing 1 changed file with 59 additions and 57 deletions.
116 changes: 59 additions & 57 deletions apps/server/src/jobs/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export async function pushJob(jobName: JobName, args?: any) {
// pull out traceparent to forward to faktory job
const activeContext = {};
propagation.inject(context.active(), activeContext);

try {
await client.job(jobName, args, { traceContext: activeContext }).push();
} catch (e) {
Expand All @@ -73,6 +72,11 @@ export async function pushJob(jobName: JobName, args?: any) {
);
}

type TraceContext = {
"sentry-trace"?: string;
baggage?: any;
};

export async function registerJob(jobName: JobName, jobFn: JobFunction) {
faktory.register(jobName, instrumentedJob(jobName, jobFn));
}
Expand All @@ -82,64 +86,62 @@ function instrumentedJob<T>(jobName: string, jobFn: JobFunction) {
return async (...args: unknown[]) => {
const jobId = cuid2.createId();

const activeContext = propagation.extract(
context.active(),
const activeContext: TraceContext =
args.length > 1
? (
args[1] as {
traceContext: {
traceId: string;
baggage: any;
};
}
).traceContext
: {},
);

return context.with(activeContext, async () => {
return Sentry.withScope(async function (scope) {
scope.setContext("job", {
name: jobName,
id: jobId,
? (args[1] as { traceContext: TraceContext }).traceContext
: {};

return Sentry.continueTrace(
{
sentryTrace: activeContext["sentry-trace"],
baggage: activeContext.baggage,
},
async () => {
return Sentry.withScope(async function (scope) {
scope.setContext("job", {
name: jobName,
id: jobId,
});
scope.setTransactionName(jobName);

// this is sentry's wrapper
return await Sentry.startSpan(
{
op: "process",
name: `faktory.${jobName.toLowerCase()}`,
},
async (span) => {
span.setAttribute("messaging.operation", "process");
span.setAttribute("messaging.system", "faktory");

console.log(`Running job [${jobName} - ${jobId}]`);
const start = new Date().getTime();
let success = false;
try {
await jobFn(...args);
success = true;
span.setStatus({
code: 1, // OK
});
} catch (e) {
logError(e);
span.setStatus({
code: 2, // ERROR
});
}

const duration = new Date().getTime() - start;

console.log(
`Job ${
success ? "succeeded" : "failed"
} [${jobName} - ${jobId}] in ${(duration / 1000).toFixed(3)}s`,
);
},
);
});
scope.setTransactionName(jobName);

return await Sentry.startSpan(
{
op: "process",
name: `faktory.${jobName.toLowerCase()}`,
},
async (span) => {
span.setAttribute("messaging.operation", "process");
span.setAttribute("messaging.system", "faktory");

console.log(`Running job [${jobName} - ${jobId}]`);
const start = new Date().getTime();
let success = false;
try {
await jobFn(...args);
success = true;
span.setStatus({
code: 1, // OK
});
} catch (e) {
logError(e);
span.setStatus({
code: 2, // ERROR
});
}

const duration = new Date().getTime() - start;

console.log(
`Job ${
success ? "succeeded" : "failed"
} [${jobName} - ${jobId}] in ${(duration / 1000).toFixed(3)}s`,
);
},
);
});
});
},
);
};
}

Expand Down

0 comments on commit f980daa

Please sign in to comment.