Skip to content

Commit

Permalink
packages/core: fix getModelValue ancestor filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
joeltg committed Dec 11, 2024
1 parent 6af7a3e commit bc83c39
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 26 deletions.
47 changes: 26 additions & 21 deletions packages/core/src/runtime/AbstractRuntime.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import * as cbor from "@ipld/dag-cbor"
import { blake3 } from "@noble/hashes/blake3"
import { bytesToHex } from "@noble/hashes/utils"
import { logger } from "@libp2p/logger"

import type { Signature, Action, Message, Session, Snapshot, SignerCache } from "@canvas-js/interfaces"
Expand All @@ -14,13 +12,7 @@ import {
validateModelValue,
updateModelValues,
} from "@canvas-js/modeldb"
import {
GossipLogConsumer,
MAX_MESSAGE_ID,
MIN_MESSAGE_ID,
AbstractGossipLog,
BranchMergeRecord,
} from "@canvas-js/gossiplog"
import { GossipLogConsumer, MAX_MESSAGE_ID, MIN_MESSAGE_ID, AbstractGossipLog } from "@canvas-js/gossiplog"
import { assert, mapValues, signalInvalidType } from "@canvas-js/utils"
import { getRecordId, isAction, isSession, isSnapshot } from "../utils.js"

Expand Down Expand Up @@ -432,18 +424,7 @@ export abstract class AbstractRuntime {
return context.reads[recordId].value as T
}

const minKey = `${recordId}:${MIN_MESSAGE_ID}`
const maxKey = `${recordId}:${MAX_MESSAGE_ID}`

const [record = null] = await this.db.query<{ key: string; version: string | null; value: Uint8Array | null }>(
"$writes",
{
select: { key: true, version: true, value: true },
orderBy: { key: "desc" },
where: { key: { gte: minKey, lte: maxKey } },
limit: 1,
},
)
const record = await this.getLatestAncestorWrite(context, recordId)

if (record === null) {
context.reads[recordId] = { version: null, value: null }
Expand All @@ -461,6 +442,30 @@ export abstract class AbstractRuntime {
}
}

private async getLatestAncestorWrite(context: ExecutionContext, recordId: string): Promise<WriteRecord | null> {
// TODO: what we really need is to find a min-ID winner of the most recent set of mutually concurrent
// writes *WITHIN* the transitive ancestor set of the current execution context,
// is actually a new kind of search that we havne't done before.

// For now we just find the max-ID ancestor write which is deterministic but not quite correct.

const minKey = `${recordId}:${MIN_MESSAGE_ID}`
const maxKey = `${recordId}:${context.id}`

for await (const record of this.db.iterate<WriteRecord>("$writes", {
orderBy: { key: "desc" },
where: { key: { gte: minKey, lt: maxKey } },
})) {
const [_, writerMsgId] = record.key.split(":")
const isAncestor = await this.isAncestor(context, writerMsgId)
if (isAncestor) {
return record
}
}

return null
}

protected async setModelValue(context: ExecutionContext, model: string, value: ModelValue): Promise<void> {
assert(this.db.models[model] !== undefined, "model not found")
validateModelValue(this.db.models[model], value)
Expand Down
41 changes: 36 additions & 5 deletions packages/core/test/conflicts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ const init = async (t: ExecutionContext) => {
},
async createPost(db, roomId, content) {
const postId = [this.did, this.id].join("/")
await db
.get("memberships", `${roomId}/${this.did}`)
.then((membership) => assert(membership !== null, "not a member"))

const membership = await db.get("memberships", `${roomId}/${this.did}`)
assert(membership !== null, "not a member")
await db.set("posts", { id: postId, room_id: roomId, content })
},
},
Expand All @@ -59,7 +57,7 @@ test("create a room and post a message", async (t) => {

const createRoom = await app.actions.createRoom()
t.log(`applied createRoom ${createRoom.id}`)
const roomId = createRoom.result
const roomId = createRoom.id
const adminDid = createRoom.message.payload.did
const membershipId = `${roomId}/${adminDid}`
Promise.resolve(app.db.get("rooms", roomId)).then((room) => t.is(room?.admin_did, adminDid))
Expand Down Expand Up @@ -180,3 +178,36 @@ test("create a room and add the same member twice concurrently (write-write conf

t.pass()
})

test("concurrently post and remove a member (read-write conflict)", async (t) => {
const alice = new SIWESigner({ signer: Wallet.createRandom() })
const aliceDid = await alice.getDid()

const { app: app1, signer: signer1 } = await init(t)
const { app: app2, signer: signer2 } = await init(t)
const { app: app3, signer: signer3 } = await init(t)
const did1 = await signer1.getDid()
const did2 = await signer2.getDid()
const did3 = await signer3.getDid()
t.log("signer1 did:", did1)
t.log("signer2 did:", did2)
t.log("signer3 did:", did3)

const createRoom = await app1.actions.createRoom()
t.log(`applied createRoom ${createRoom.id}`)
const roomId = createRoom.result

await app1.actions.addMember(roomId, aliceDid)

await app1.messageLog.serve((snapshot) => app2.messageLog.sync(snapshot))
await app1.messageLog.serve((snapshot) => app3.messageLog.sync(snapshot))

await app2.as(signer1).removeMember(roomId, aliceDid)
await app3.as(alice).createPost(roomId, "HELLO WORLD")

// okay now we have read-write conflict between createPost and removeMember
await app2.messageLog.serve((snapshot) => app3.messageLog.sync(snapshot))
await app3.messageLog.serve((snapshot) => app2.messageLog.sync(snapshot))

t.pass()
})

0 comments on commit bc83c39

Please sign in to comment.