Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark TTFB poc #434

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/committee.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export class Committee {
#measurements

/**
* @param {Pick<Measurement, 'cid' | 'minerId'>} retrievalTask
* @param {Pick<Measurement, 'cid' | 'minerId' | 'roundId'>} retrievalTask
*/
constructor ({ cid, minerId }) {
this.retrievalTask = { minerId, cid }
constructor ({ cid, minerId, roundId }) {
this.retrievalTask = { minerId, cid, roundId }

this.#measurements = []

Expand All @@ -48,6 +48,7 @@ export class Committee {
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.roundId, this.retrievalTask.roundId, 'roundId must match')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this added?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed task id structure from cid::minerId to cid::minerId::roundId hence we're checking round id here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but what is the motivation?

assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
this.#measurements.push(m)
}
Expand Down
2 changes: 2 additions & 0 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ export const runFraudDetection = async ({
// sanity checks to get nicer errors if we forget to set required fields in unit tests
assert(typeof m.inet_group === 'string', 'missing inet_group')
assert(typeof m.finished_at === 'number', 'missing finished_at')
assert(typeof m.start_at === 'number', 'missing start_at')
assert(typeof m.first_byte_at === 'number', 'missing first_byte_at')

const isValidTask = sparkRoundDetails.retrievalTasks.some(
t => t.cid === m.cid && t.minerId === m.minerId
Expand Down
6 changes: 4 additions & 2 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ const debug = createDebug('spark:preprocess')

export class Measurement {
/**
* @param {Partial<import('./round.js').RoundData>} r
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already passing round.pointerize as the last argument, I suggest we also pass round.index instead of round, for consistency and to prevent redundancy

* @param {Partial<import('./typings.js').RawMeasurement>} m
* @param {<T extends string>(str: T) => T} pointerize
*/
constructor (m, pointerize = (v) => v) {
constructor (r, m, pointerize = (v) => v) {
this.participantAddress = pointerize(parseParticipantAddress(m.participant_address))
this.retrievalResult = pointerize(getRetrievalResult(m))
this.cid = pointerize(m.cid)
Expand All @@ -38,6 +39,7 @@ export class Measurement {
this.stationId = pointerize(m.station_id)
this.carChecksum = pointerize(m.car_checksum)
this.carTooLarge = m.car_too_large
this.roundId = pointerize(r.index.toString())
}
}

Expand Down Expand Up @@ -76,7 +78,7 @@ export const preprocess = async ({
// eslint-disable-next-line camelcase
.map(measurement => {
try {
return new Measurement(measurement, round.pointerize)
return new Measurement(round, measurement, round.pointerize)
} catch (err) {
logger.error('Invalid measurement:', err.message, measurement)
return null
Expand Down
37 changes: 36 additions & 1 deletion lib/public-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import createDebug from 'debug'
import * as providerRetrievalResultStats from './provider-retrieval-result-stats.js'
import { updatePlatformStats } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'
import { getTaskId, getValueAtPercentile } from './retrieval-stats.js'

/** @import pg from 'pg' */
/** @import { Committee } from './committee.js' */
Expand All @@ -27,6 +27,7 @@
await updateIndexerQueryStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees, findDealClients)
await updatePlatformStats(pgClient, allMeasurements)
await updateRetreivalTimes(pgClient, committees)

Check failure on line 30 in lib/public-stats.js

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'updateRetreivalTimes'. Did you mean 'updateRetreivalTimings'?
} finally {
await pgClient.end()
}
Expand Down Expand Up @@ -225,3 +226,37 @@
flatStats.map(stat => stat.retrievable)
])
}

/**
* @param {pg.Client} pgClient
* @param {Iterable<Committee>} committees
*/
const updateRetreivalTimings = async (pgClient, committees) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

Suggested change
const updateRetreivalTimings = async (pgClient, committees) => {
const updateRetrievalTimings = async (pgClient, committees) => {

/** @type {Array<{minerId: string; taskId: string; timeToFirstByteP50: number}>} */
const stats = []
for (const c of committees) {
if (!c.evaluation || !c.evaluation.hasRetrievalMajority || c.evaluation.retrievalResult !== 'OK') continue
const { minerId } = c.retrievalTask
const taskId = getTaskId(c.retrievalTask)
const ttfbMeasurments = []
for (const m of c.measurements) {
if (m.fraudAssessment !== 'OK') continue
ttfbMeasurments.push(m.first_byte_at - m.start_at)
}

const timeToFirstByteP50 = getValueAtPercentile(ttfbMeasurments, 0.5)
stats.push({ minerId, taskId, timeToFirstByteP50 })
}

// conflic should never happen, but in case it does we'll ignore the new value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If conflicts shouldn't happen, I suggest we remove the conflict handling and let it fail. If everything is right, it will never fail. If it fails, it will inform us of a bug to fix.

await pgClient.query(`
INSERT INTO retrieval_timings
(day, miner_id, task_id, time_to_first_byte_p50) VALUES
(now(), unnest($1::text[]), unnest($2::text[]), unnest($3::int[]))
ON CONFLICT(day, miner_id, task_id) DO NOTHING
Comment on lines +253 to +256
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this design.

  • Task id was designed to distinguish tasks within one round. spark-evaluate always looks at one round only.
  • Since it's only updateRetreivalTimings() that needs to handle the case when one task is performed more than once during a day, I prefer to implement a solution that's limited to updateRetreivalTimings() only. For example, we can forward the current round number through updatePublicStats() to updateRetreivalTimings() and then combine old-style taskId with the round number.
  • Because task_id includes a round number, it does not help us detect cases when the same content was tested twice on the same day, it only ensures we can record timings for each task occurrence.

I propose a different DB schema for consideration: Instead of having one row per day+task+round, have only one row per day and store the p50 values in an array.

Something along the following lines:

INSERT INTO retrieval_timings
(day, miner_id, time_to_first_byte_p50) VALUES 
(now(), unnest($2::text[]), unnest($3::int[]))
ON CONFLICT(day, miner_id) 
DO UPDATE SET
  time_to_first_byte_p50 = array_cat(
    retrieval_timings.time_to_first_byte_p50,
    EXCLUDED.time_to_first_byte_p50
  )

`, [
stats.map(stat => stat.minerId),
stats.map(stat => stat.taskId),
stats.map(stat => stat.timeToFirstByteP50)
])
}
4 changes: 2 additions & 2 deletions lib/retrieval-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ const addHistogramToPoint = (point, values, fieldNamePrefix = '') => {
}

/**
* @param {Pick<Measurement, 'cid' | 'minerId'>} m
* @param {Pick<Measurement, 'cid' | 'minerId' | 'roundId'>} m
* @returns {string}
*/
export const getTaskId = (m) => `${m.cid}::${m.minerId}`
export const getTaskId = (m) => `${m.cid}::${m.minerId}::${m.roundId}`

/**
* @param {Measurement[]} measurements
Expand Down
7 changes: 7 additions & 0 deletions migrations/021.do.add-retrieval-times.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE retrieval_times (
day DATE NOT NULL,
miner_id TEXT NOT NULL,
task_id TEXT NOT NULL,
time_to_first_byte_p50 INT NOT NULL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This column name is rather long, how about using the abbreviation TTFB?

Suggested change
time_to_first_byte_p50 INT NOT NULL,
ttfb_p50 INT NOT NULL,

PRIMARY KEY (day, miner_id, task_id)
);
6 changes: 4 additions & 2 deletions test/helpers/test-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export const ROUND_DETAILS = 'bafybeie5rekb2jox77ow64wjjd2bjdsp6d3yeivhzzd234hnb
export const VALID_TASK = {
cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS',
minerId: 'f1test',
clients: ['f1client']
clients: ['f1client'],
roundId: '0'
}
Object.freeze(VALID_TASK)

Expand Down Expand Up @@ -40,7 +41,8 @@ export const VALID_MEASUREMENT = {
carTooLarge: false,
retrievalResult: 'OK',
indexerResult: 'OK',
fraudAssessment: null
fraudAssessment: null,
roundId: '0'
}

// Fraud detection is mutating the measurements parsed from JSON
Expand Down
22 changes: 12 additions & 10 deletions test/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ describe('preprocess', () => {
await preprocess({ round, cid, roundIndex, fetchMeasurements, recordTelemetry, logger })

assert.deepStrictEqual(round.measurements, [
new Measurement({
participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
first_byte_at: '2023-11-01T09:00:01.000Z',
start_at: '2023-11-01T09:00:02.000Z',
end_at: '2023-11-01T09:00:03.000Z'
})
new Measurement(
round,
{
participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
first_byte_at: '2023-11-01T09:00:01.000Z',
start_at: '2023-11-01T09:00:02.000Z',
end_at: '2023-11-01T09:00:03.000Z'
})
])
assert.deepStrictEqual(getCalls, [cid])
assert.deepStrictEqual(round.measurementBatches, [cid])
Expand Down
81 changes: 81 additions & 0 deletions test/public-stats.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('public-stats', () => {
await pgClient.query('DELETE FROM retrieval_stats')
await pgClient.query('DELETE FROM indexer_query_stats')
await pgClient.query('DELETE FROM daily_deals')
await pgClient.query('DELETE FROM retrieval_times')

// Run all tests inside a transaction to ensure `now()` always returns the same value
// See https://dba.stackexchange.com/a/63549/125312
Expand Down Expand Up @@ -534,8 +535,88 @@ describe('public-stats', () => {
})
})

describe('retrieval_times', () => {
it.only('creates or updates the row for today', async () => {
/** @type {Measurement[]} */
const honestMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 2000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000)
]

/** @type {Measurement[]} */
const dishonestMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 100),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 200),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 300)
]

let allMeasurements = [...honestMeasurements, ...dishonestMeasurements]
let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})
const { rows: created } = await pgClient.query(
'SELECT day::TEXT, miner_id, task_id, time_to_first_byte_p50 FROM retrieval_times'
)
assert.deepStrictEqual(created, [
{ day: today, miner_id: 'f1first', task_id: 'cidone::f1first::0', time_to_first_byte_p50: 2000 }
])

/** @type {Measurement[]} */
const newHonestMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000)
]

/** @type {Measurement[]} */
const newDishonestMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 10_000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 20_000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 30_000)
]

allMeasurements = [...newHonestMeasurements, ...newDishonestMeasurements]
committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, miner_id, task_id, time_to_first_byte_p50 FROM retrieval_times'
)

// on conflict, we ignore new values
assert.deepStrictEqual(updated, [
{ day: today, miner_id: 'f1first', task_id: 'cidone::f1first::0', time_to_first_byte_p50: 2000 }
])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today')
return today
}

/**
*
* @param {Measurement} measurment
* @param {number} timeToFirstByte Time in milliseconds
* @returns
*/
function givenTimeToFirstByte (measurment, timeToFirstByte) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Suggested change
function givenTimeToFirstByte (measurment, timeToFirstByte) {
function givenTimeToFirstByte (measurement, timeToFirstByte) {

measurment.first_byte_at = measurment.start_at + timeToFirstByte
return measurment
}
})
Loading