diff --git a/lib/public-stats.js b/lib/public-stats.js index d1a49d6..9eaef3b 100644 --- a/lib/public-stats.js +++ b/lib/public-stats.js @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/node' import createDebug from 'debug' import * as providerRetrievalResultStats from './provider-retrieval-result-stats.js' import { updatePlatformStats } from './platform-stats.js' -import { getTaskId } from './retrieval-stats.js' +import { getTaskId, getValueAtPercentile } from './retrieval-stats.js' /** @import pg from 'pg' */ /** @import { Committee } from './committee.js' */ @@ -26,6 +26,7 @@ export const updatePublicStats = async ({ createPgClient, committees, allMeasure await updateIndexerQueryStats(pgClient, committees) await updateDailyDealsStats(pgClient, committees, findDealClients) await updatePlatformStats(pgClient, allMeasurements) + await updateRetrievalTimings(pgClient, committees) } finally { await pgClient.end() } @@ -227,3 +228,48 @@ const updateDailyDealsStats = async (pgClient, committees, findDealClients) => { flatStats.map(stat => stat.retrievable) ]) } + +/** + * @param {pg.Client} pgClient + * @param {Iterable} committees + */ +const updateRetrievalTimings = async (pgClient, committees) => { + /** @type {Map} */ + const retrievalTimings = new Map() + for (const c of committees) { + if (!c.evaluation || !c.evaluation.hasRetrievalMajority || c.evaluation.retrievalResult !== 'OK') continue + const { minerId } = c.retrievalTask + const ttfbMeasurments = [] + for (const m of c.measurements) { + // FIXME: assert first_byte_at and start_at during preprocessing + // See https://github.com/filecoin-station/spark-evaluate/issues/447 + if (m.fraudAssessment !== 'OK' || !m.first_byte_at || !m.start_at || m.start_at > m.first_byte_at) continue + const ttfbMeasurment = m.first_byte_at - m.start_at + if (isNaN(ttfbMeasurment)) continue + ttfbMeasurments.push(ttfbMeasurment) + } + + if (!retrievalTimings.has(minerId)) { + retrievalTimings.set(minerId, []) + } + + const ttfb = Math.ceil(getValueAtPercentile(ttfbMeasurments, 0.5)) + retrievalTimings.get(minerId).push(ttfb) + } + + // eslint-disable-next-line camelcase + const rows = Array.from(retrievalTimings.entries()).flatMap(([miner_id, ttfb_p50]) => ({ miner_id, ttfb_p50 })) + + await pgClient.query(` + INSERT INTO retrieval_timings (day, miner_id, ttfb_p50) + SELECT now(), miner_id, ttfb_p50 FROM jsonb_to_recordset($1::jsonb) AS t (miner_id text, ttfb_p50 int[]) + ON CONFLICT (day, miner_id) + DO UPDATE SET + ttfb_p50 = array_cat( + retrieval_timings.ttfb_p50, + EXCLUDED.ttfb_p50 + ) + `, [ + JSON.stringify(rows) + ]) +} diff --git a/migrations/022.do.add-retrieval-timings.sql b/migrations/022.do.add-retrieval-timings.sql new file mode 100644 index 0000000..eea85fe --- /dev/null +++ b/migrations/022.do.add-retrieval-timings.sql @@ -0,0 +1,6 @@ +CREATE TABLE retrieval_timings ( + day DATE NOT NULL, + miner_id TEXT NOT NULL, + ttfb_p50 INT[] NOT NULL, + PRIMARY KEY (day, miner_id) +); diff --git a/test/public-stats.test.js b/test/public-stats.test.js index 81ff4dd..a3a4589 100644 --- a/test/public-stats.test.js +++ b/test/public-stats.test.js @@ -28,6 +28,7 @@ describe('public-stats', () => { await pgClient.query('DELETE FROM retrieval_stats') await pgClient.query('DELETE FROM indexer_query_stats') await pgClient.query('DELETE FROM daily_deals') + await pgClient.query('DELETE FROM retrieval_timings') // Run all tests inside a transaction to ensure `now()` always returns the same value // See https://dba.stackexchange.com/a/63549/125312 @@ -560,8 +561,82 @@ describe('public-stats', () => { }) }) + describe('retrieval_times', () => { + it('creates or updates rows for today', async () => { + /** @type {Measurement[]} */ + let acceptedMeasurements = [ + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, 2000), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, 1000), + // measurments with invalid values + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, -1000), + { ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK', first_byte_at: /** @type {any} */('invalid') } + ] + + /** @type {Measurement[]} */ + const rejectedMeasurements = [ + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 100), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 200), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 300), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 300), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 200), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 100) + ] + + let allMeasurements = [...acceptedMeasurements, ...rejectedMeasurements] + let committees = buildEvaluatedCommitteesFromMeasurements(acceptedMeasurements) + + await updatePublicStats({ + createPgClient, + committees, + allMeasurements, + findDealClients: (_minerId, _cid) => ['f0client'] + }) + const { rows: created } = await pgClient.query( + 'SELECT day::TEXT, miner_id, ttfb_p50 FROM retrieval_timings ORDER BY miner_id' + ) + assert.deepStrictEqual(created, [ + { day: today, miner_id: 'f1first', ttfb_p50: [2000] }, + { day: today, miner_id: 'f1second', ttfb_p50: [1500] } + ]) + + acceptedMeasurements = [ + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 5000), + givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000) + ] + allMeasurements = [...acceptedMeasurements, ...rejectedMeasurements] + committees = buildEvaluatedCommitteesFromMeasurements(acceptedMeasurements) + await updatePublicStats({ + createPgClient, + committees, + allMeasurements, + findDealClients: (_minerId, _cid) => ['f0client'] + }) + const { rows: updated } = await pgClient.query( + 'SELECT day::TEXT, miner_id, ttfb_p50 FROM retrieval_timings ORDER BY miner_id' + ) + assert.deepStrictEqual(updated, [ + { day: today, miner_id: 'f1first', ttfb_p50: [2000, 3000] }, + { day: today, miner_id: 'f1second', ttfb_p50: [1500] } + ]) + }) + }) + const getCurrentDate = async () => { const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') return today } + + /** + * + * @param {Measurement} measurement + * @param {number} timeToFirstByte Time in milliseconds + * @returns + */ + function givenTimeToFirstByte (measurement, timeToFirstByte) { + measurement.first_byte_at = measurement.start_at + timeToFirstByte + return measurement + } })