Skip to content

Commit

Permalink
feat: wrap lock in callback
Browse files Browse the repository at this point in the history
  • Loading branch information
darkskygit committed Mar 5, 2024
1 parent 4d8fe47 commit df14857
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 70 deletions.
137 changes: 68 additions & 69 deletions packages/backend/server/src/core/workspaces/resolvers/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,82 +339,81 @@ export class WorkspaceResolver {
}

const lockFlag = `invite:${workspaceId}`;
if (!(await this.mutex.lock(lockFlag))) {
throw new ForbiddenException('Failed to acquire lock');
}

// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
await this.mutex.unlock(lockFlag);
throw new PayloadTooLargeException('Workspace member limit reached.');
}

let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord = await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) {
return this.mutex.lockWith(lockFlag, async () => {
// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
await this.mutex.unlock(lockFlag);
return originRecord.id;
throw new PayloadTooLargeException('Workspace member limit reached.');
}
} else {
target = await this.auth.createAnonymousUser(email);
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord =
await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) {
await this.mutex.unlock(lockFlag);
return originRecord.id;
}
} else {
target = await this.auth.createAnonymousUser(email);
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`
);
}
await this.mutex.unlock(lockFlag);
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
await this.mutex.unlock(lockFlag);
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
}
await this.mutex.unlock(lockFlag);
return inviteId;
await this.mutex.unlock(lockFlag);
return inviteId;
});
}

@Throttle({
Expand Down
23 changes: 22 additions & 1 deletion packages/backend/server/src/fundamentals/mutex/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { randomUUID } from 'node:crypto';

import { Global, Injectable, Logger, Module } from '@nestjs/common';
import {
ForbiddenException,
Global,
Injectable,
Logger,
Module,
} from '@nestjs/common';
import { ClsService } from 'nestjs-cls';

import { sleep } from '../utils/utils';
Expand All @@ -26,6 +32,21 @@ export class MutexService {
return id;
}

async lockWith<R>(key: string, cb: () => Promise<R>): Promise<R> {
const locked = await this.lock(key);
if (locked) {
let result: R;
try {
result = await cb();
} finally {
await this.unlock(key);
}
return result;
} else {
throw new ForbiddenException('Failed to acquire lock');
}
}

async lock(key: string): Promise<boolean> {
const id = this.getId();
const fetchLock = async (retry: number): Promise<boolean> => {
Expand Down

0 comments on commit df14857

Please sign in to comment.