Skip to content

Commit

Permalink
packages/core: add bare record model and key to $writes and snapshot …
Browse files Browse the repository at this point in the history
…effects
  • Loading branch information
joeltg committed Dec 9, 2024
1 parent 219a058 commit 0bd29e0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 18 deletions.
94 changes: 83 additions & 11 deletions packages/core/src/runtime/AbstractRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
AbstractGossipLog,
BranchMergeRecord,
} from "@canvas-js/gossiplog"
import { assert, mapValues } from "@canvas-js/utils"
import { assert, mapValues, signalInvalidType } from "@canvas-js/utils"
import { isAction, isSession, isSnapshot } from "../utils.js"
import { unzip } from "zlib"

Expand All @@ -41,8 +41,10 @@ export type ExecutionContext = {

export type WriteRecord = {
key: string
record_model: string
record_key: string
record_version: string | null
value: Uint8Array | null
version: string | null
reverted: boolean
}

Expand Down Expand Up @@ -70,10 +72,12 @@ export abstract class AbstractRuntime {
protected static effectsModel: ModelSchema = {
$writes: {
key: "primary", // `${hash(model, key)}/${msgid}`
version: "string?", // same as the last component of primary key
record_model: "string",
record_key: "string",
record_version: "string?", // same as the last component of primary key
value: "bytes?",
reverted: "boolean",
$indexes: ["version"],
$indexes: ["record_version"],
},
$reads: {
key: "primary", // `${hash(model, key)}/${msgid}`
Expand Down Expand Up @@ -175,8 +179,16 @@ export abstract class AbstractRuntime {
const messages = await messageLog.getMessages()
assert(messages.length === 0, "snapshot must be first entry on log")

for (const { key, value } of effects) {
await this.db.set("$writes", { key, value, version: null, reverted: false })
for (const { model, key, value } of effects) {
const recordId = AbstractRuntime.getRecordId(model, key)
await this.db.set("$writes", {
key: `${recordId}/${MIN_MESSAGE_ID}`,
record_model: model,
record_key: key,
record_version: null,
value,
reverted: false,
})
}

for (const [model, rows] of Object.entries(models)) {
Expand Down Expand Up @@ -274,10 +286,13 @@ export abstract class AbstractRuntime {
// Step 1a: identify write-write conflicts
const writeConflicts = new Set<string>()
for (const [recordId, effect] of Object.entries(writes)) {
const [model, key, value] = this.parseEffect(effect)
const writeRecord: WriteRecord = {
key: `${recordId}/${id}`,
value: effect.operation === "set" ? cbor.encode(effect.value) : null,
version: id,
record_model: model,
record_key: key,
record_version: id,
value: value && cbor.encode(value),
reverted: false,
}

Expand All @@ -304,11 +319,12 @@ export abstract class AbstractRuntime {
}
}

const revertWrites: Record<string, Effect> = {}

// Step 1b: revert inferior write-write conflicts
for (const messageId of inferiorWrites) {
this.log("reverting inferior write conflict %s", messageId)
// await this.revert(messageId)
throw new Error("not implemented")
await this.revert(messageId, effects, revertWrites)
}

// n.b. there's an open question here of whether we can safely
Expand Down Expand Up @@ -403,7 +419,9 @@ export abstract class AbstractRuntime {
if (record === undefined || record.value === null) {
return null
} else {
return cbor.decode<T | null>(record.value)
const value = cbor.decode<T>(record.value)
assert(value !== null, "expected value !== null")
return value
}
}

Expand Down Expand Up @@ -527,13 +545,55 @@ export abstract class AbstractRuntime {
return lastVersion
}

private async revert(
messageId: string,
effects: Effect[],
revertWrites: Record<string, Effect>,
callback?: (messageId: string) => void,
): Promise<void> {
// we are guaranteed a "linear version history" invariant

const writes = await this.db.query<{
key: string
record_model: string
record_key: string
}>("$writes", {
select: { key: true, record_model: true, record_key: true },
where: { version: messageId },
})

for (const { key, record_model, record_key } of writes) {
const [recordId, _] = key.split("/")
const minKey = `${recordId}/${MIN_MESSAGE_ID}`

const [prev] = await this.db.query<WriteRecord>("$writes", {
orderBy: { key: "desc" },
where: { key: { gte: minKey, lt: key }, reverted: false },
limit: 1,
})

if (prev === undefined || prev.value === null) {
revertWrites[recordId] = { operation: "delete", model: record_model, key: record_key }
} else {
const value = cbor.decode<ModelValue>(prev.value)
assert(value !== null, "expected value !== null")

revertWrites[recordId] =
value == null
? { operation: "delete", model: record_model, key: record_key }
: { operation: "set", model: record_model, value }
}
}
}

/**
* This is a utility method for finding if msgId is an ancestor of the
* current execution context. This loops over context.message.parents,
* since the action has not been committed yet so context.id doesn't
* exist in the database yet.
*/
private async isAncestor(context: ExecutionContext, msgId: string): Promise<boolean> {
// TODO: handle this in a more elegant way
if (msgId === MIN_MESSAGE_ID) {
return true
}
Expand All @@ -548,4 +608,16 @@ export abstract class AbstractRuntime {

return false
}

private parseEffect(effect: Effect): [string, string, ModelValue | null] {
if (effect.operation === "set") {
assert(this.db.models[effect.model] !== undefined)
const { primaryKey } = this.db.models[effect.model]
return [effect.model, effect.value[primaryKey], effect.value]
} else if (effect.operation === "delete") {
return [effect.model, effect.key, null]
} else {
signalInvalidType(effect)
}
}
}
1 change: 1 addition & 0 deletions packages/core/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Session struct {
}
type SnapshotEffect struct {
model String
key String
value nullable Bytes
}
Expand Down
20 changes: 14 additions & 6 deletions packages/core/src/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,24 @@ export async function createSnapshot(app: Canvas): Promise<Snapshot> {
// flatten $writes table
const writesMap = new Map<string, WriteRecord>()
for await (const row of app.db.iterate<WriteRecord>("$writes", { where: { reverted: false } })) {
const { key, value, version } = row
const recordId = key.slice(0, key.indexOf("/"))
const { key, record_model, record_key, record_version, value } = row
const [recordId, _] = key.split("/")
const existingEffect = writesMap.get(recordId)
if (existingEffect === undefined || lessThan(existingEffect.version, version)) {
const effectKey = `${recordId}/${MIN_MESSAGE_ID}`
writesMap.set(recordId, { key: effectKey, value, version: null, reverted: false })
if (existingEffect === undefined || lessThan(existingEffect.record_version, record_version)) {
writesMap.set(recordId, {
key: `${recordId}/${MIN_MESSAGE_ID}`,
record_model,
record_key,
value,
record_version: null,
reverted: false,
})
}
}

const effects = Array.from(writesMap.values()).map(({ key, value }: SnapshotEffect) => ({ key, value }))
const effects = Array.from(writesMap.values()).map(
({ record_model: model, record_key: key, value }: WriteRecord): SnapshotEffect => ({ model, key, value }),
)

return { type: "snapshot", models, effects }
}
Expand Down
3 changes: 2 additions & 1 deletion packages/interfaces/src/Snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type Snapshot = {
}

export type SnapshotEffect = {
key: string // `${hash(model, key)}/${version}
model: string
key: string
value: Uint8Array | null // cbor.encode(value)
}

0 comments on commit 0bd29e0

Please sign in to comment.