-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark TTFB poc #434
base: main
Are you sure you want to change the base?
Spark TTFB poc #434
Changes from all commits
9a627ab
5e6835e
b8a2ec7
2642dcf
77e0f73
c16c44c
cbf5993
73980b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,11 @@ const debug = createDebug('spark:preprocess') | |
|
||
export class Measurement { | ||
/** | ||
* @param {Partial<import('./round.js').RoundData>} r | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're already passing |
||
* @param {Partial<import('./typings.js').RawMeasurement>} m | ||
* @param {<T extends string>(str: T) => T} pointerize | ||
*/ | ||
constructor (m, pointerize = (v) => v) { | ||
constructor (r, m, pointerize = (v) => v) { | ||
this.participantAddress = pointerize(parseParticipantAddress(m.participant_address)) | ||
this.retrievalResult = pointerize(getRetrievalResult(m)) | ||
this.cid = pointerize(m.cid) | ||
|
@@ -38,6 +39,7 @@ export class Measurement { | |
this.stationId = pointerize(m.station_id) | ||
this.carChecksum = pointerize(m.car_checksum) | ||
this.carTooLarge = m.car_too_large | ||
this.roundId = pointerize(r.index.toString()) | ||
} | ||
} | ||
|
||
|
@@ -76,7 +78,7 @@ export const preprocess = async ({ | |
// eslint-disable-next-line camelcase | ||
.map(measurement => { | ||
try { | ||
return new Measurement(measurement, round.pointerize) | ||
return new Measurement(round, measurement, round.pointerize) | ||
} catch (err) { | ||
logger.error('Invalid measurement:', err.message, measurement) | ||
return null | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,7 +2,7 @@ | |||||
import createDebug from 'debug' | ||||||
import * as providerRetrievalResultStats from './provider-retrieval-result-stats.js' | ||||||
import { updatePlatformStats } from './platform-stats.js' | ||||||
import { getTaskId } from './retrieval-stats.js' | ||||||
import { getTaskId, getValueAtPercentile } from './retrieval-stats.js' | ||||||
|
||||||
/** @import pg from 'pg' */ | ||||||
/** @import { Committee } from './committee.js' */ | ||||||
|
@@ -27,6 +27,7 @@ | |||||
await updateIndexerQueryStats(pgClient, committees) | ||||||
await updateDailyDealsStats(pgClient, committees, findDealClients) | ||||||
await updatePlatformStats(pgClient, allMeasurements) | ||||||
await updateRetreivalTimes(pgClient, committees) | ||||||
} finally { | ||||||
await pgClient.end() | ||||||
} | ||||||
|
@@ -225,3 +226,37 @@ | |||||
flatStats.map(stat => stat.retrievable) | ||||||
]) | ||||||
} | ||||||
|
||||||
/** | ||||||
* @param {pg.Client} pgClient | ||||||
* @param {Iterable<Committee>} committees | ||||||
*/ | ||||||
const updateRetreivalTimings = async (pgClient, committees) => { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo.
Suggested change
|
||||||
/** @type {Array<{minerId: string; taskId: string; timeToFirstByteP50: number}>} */ | ||||||
const stats = [] | ||||||
for (const c of committees) { | ||||||
if (!c.evaluation || !c.evaluation.hasRetrievalMajority || c.evaluation.retrievalResult !== 'OK') continue | ||||||
const { minerId } = c.retrievalTask | ||||||
const taskId = getTaskId(c.retrievalTask) | ||||||
const ttfbMeasurments = [] | ||||||
for (const m of c.measurements) { | ||||||
if (m.fraudAssessment !== 'OK') continue | ||||||
ttfbMeasurments.push(m.first_byte_at - m.start_at) | ||||||
} | ||||||
|
||||||
const timeToFirstByteP50 = getValueAtPercentile(ttfbMeasurments, 0.5) | ||||||
stats.push({ minerId, taskId, timeToFirstByteP50 }) | ||||||
} | ||||||
|
||||||
// conflic should never happen, but in case it does we'll ignore the new value | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If conflicts shouldn't happen, I suggest we remove the conflict handling and let it fail. If everything is right, it will never fail. If it fails, it will inform us of a bug to fix. |
||||||
await pgClient.query(` | ||||||
INSERT INTO retrieval_timings | ||||||
(day, miner_id, task_id, time_to_first_byte_p50) VALUES | ||||||
(now(), unnest($1::text[]), unnest($2::text[]), unnest($3::int[])) | ||||||
ON CONFLICT(day, miner_id, task_id) DO NOTHING | ||||||
Comment on lines
+253
to
+256
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have mixed feelings about this design.
I propose a different DB schema for consideration: Instead of having one row per day+task+round, have only one row per day and store the p50 values in an array. Something along the following lines: INSERT INTO retrieval_timings
(day, miner_id, time_to_first_byte_p50) VALUES
(now(), unnest($2::text[]), unnest($3::int[]))
ON CONFLICT(day, miner_id)
DO UPDATE SET
time_to_first_byte_p50 = array_cat(
retrieval_timings.time_to_first_byte_p50,
EXCLUDED.time_to_first_byte_p50
) |
||||||
`, [ | ||||||
stats.map(stat => stat.minerId), | ||||||
stats.map(stat => stat.taskId), | ||||||
stats.map(stat => stat.timeToFirstByteP50) | ||||||
]) | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,7 @@ | ||||||
CREATE TABLE retrieval_times ( | ||||||
day DATE NOT NULL, | ||||||
miner_id TEXT NOT NULL, | ||||||
task_id TEXT NOT NULL, | ||||||
time_to_first_byte_p50 INT NOT NULL, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This column name is rather long, how about using the abbreviation TTFB?
Suggested change
|
||||||
PRIMARY KEY (day, miner_id, task_id) | ||||||
); |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -28,6 +28,7 @@ describe('public-stats', () => { | |||||
await pgClient.query('DELETE FROM retrieval_stats') | ||||||
await pgClient.query('DELETE FROM indexer_query_stats') | ||||||
await pgClient.query('DELETE FROM daily_deals') | ||||||
await pgClient.query('DELETE FROM retrieval_times') | ||||||
|
||||||
// Run all tests inside a transaction to ensure `now()` always returns the same value | ||||||
// See https://dba.stackexchange.com/a/63549/125312 | ||||||
|
@@ -534,8 +535,88 @@ describe('public-stats', () => { | |||||
}) | ||||||
}) | ||||||
|
||||||
describe('retrieval_times', () => { | ||||||
it.only('creates or updates the row for today', async () => { | ||||||
/** @type {Measurement[]} */ | ||||||
const honestMeasurements = [ | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 2000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000) | ||||||
] | ||||||
|
||||||
/** @type {Measurement[]} */ | ||||||
const dishonestMeasurements = [ | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 100), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 200), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 300) | ||||||
] | ||||||
|
||||||
let allMeasurements = [...honestMeasurements, ...dishonestMeasurements] | ||||||
let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) | ||||||
|
||||||
await updatePublicStats({ | ||||||
createPgClient, | ||||||
committees, | ||||||
honestMeasurements, | ||||||
allMeasurements, | ||||||
findDealClients: (_minerId, _cid) => ['f0client'] | ||||||
}) | ||||||
const { rows: created } = await pgClient.query( | ||||||
'SELECT day::TEXT, miner_id, task_id, time_to_first_byte_p50 FROM retrieval_times' | ||||||
) | ||||||
assert.deepStrictEqual(created, [ | ||||||
{ day: today, miner_id: 'f1first', task_id: 'cidone::f1first::0', time_to_first_byte_p50: 2000 } | ||||||
]) | ||||||
|
||||||
/** @type {Measurement[]} */ | ||||||
const newHonestMeasurements = [ | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000) | ||||||
] | ||||||
|
||||||
/** @type {Measurement[]} */ | ||||||
const newDishonestMeasurements = [ | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 10_000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 20_000), | ||||||
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 30_000) | ||||||
] | ||||||
|
||||||
allMeasurements = [...newHonestMeasurements, ...newDishonestMeasurements] | ||||||
committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) | ||||||
|
||||||
await updatePublicStats({ | ||||||
createPgClient, | ||||||
committees, | ||||||
honestMeasurements, | ||||||
allMeasurements, | ||||||
findDealClients: (_minerId, _cid) => ['f0client'] | ||||||
}) | ||||||
|
||||||
const { rows: updated } = await pgClient.query( | ||||||
'SELECT day::TEXT, miner_id, task_id, time_to_first_byte_p50 FROM retrieval_times' | ||||||
) | ||||||
|
||||||
// on conflict, we ignore new values | ||||||
assert.deepStrictEqual(updated, [ | ||||||
{ day: today, miner_id: 'f1first', task_id: 'cidone::f1first::0', time_to_first_byte_p50: 2000 } | ||||||
]) | ||||||
}) | ||||||
}) | ||||||
|
||||||
const getCurrentDate = async () => { | ||||||
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') | ||||||
return today | ||||||
} | ||||||
|
||||||
/** | ||||||
* | ||||||
* @param {Measurement} measurment | ||||||
* @param {number} timeToFirstByte Time in milliseconds | ||||||
* @returns | ||||||
*/ | ||||||
function givenTimeToFirstByte (measurment, timeToFirstByte) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo
Suggested change
|
||||||
measurment.first_byte_at = measurment.start_at + timeToFirstByte | ||||||
return measurment | ||||||
} | ||||||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed task id structure from
cid::minerId
tocid::minerId::roundId
hence we're checking round id here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, but what is the motivation?