Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch to eligible_deals + set piece info #30

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions scripts/build-spark-update-sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ await pipeline(
// FIXME: on conflict update expires_at to MAX(old.expires_at, new.expires_at)
const END_OF_INSERT_STATEMENT = '\nON CONFLICT DO NOTHING;\n'

// yield 'TRUNCATE TABLE retrievable_deals;\n'
yield 'DELETE FROM retrievable_deals WHERE expires_at < now();\n'
// yield 'TRUNCATE TABLE eligible_deals;\n'
yield 'DELETE FROM eligible_deals WHERE expires_at < now();\n'

let counter = 0
for await (const deal of source) {
signal.throwIfAborted()

assert(deal.pieceCID)
assert(deal.pieceSize)
assert(deal.payloadCID)
assert(deal.provider)
assert(deal.client)
Expand All @@ -48,16 +50,16 @@ await pipeline(

if (counter % 5000 === 1) {
if (counter > 1) yield END_OF_INSERT_STATEMENT
yield 'INSERT INTO retrievable_deals (cid, miner_id, client_id, expires_at) VALUES\n'
yield 'INSERT INTO eligible_deals (miner_id, client_id, piece_cid, piece_size, payload_cid, expires_at) VALUES\n'
} else {
yield ',\n'
}

const escape = (val) => typeof val === 'number' ? val : pg.escapeLiteral(val)
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
const q = `(${[
deal.payloadCID, deal.provider, deal.client, new Date(deal.expires).toISOString()
].map(pg.escapeLiteral).join(', ')})`
deal.provider, deal.client, deal.pieceCID, deal.pieceSize, deal.payloadCID, new Date(deal.expires).toISOString()
].map(escape).join(', ')})`
yield q
// console.log(q)
}
yield END_OF_INSERT_STATEMENT
},
Expand Down
3 changes: 2 additions & 1 deletion scripts/parse-retrievable-deals.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ try {
}} deal
*/
function * processDeal (deal) {
const { VerifiedDeal, StartEpoch, EndEpoch, Client, Label, Provider, PieceCID } = deal.Proposal
const { VerifiedDeal, StartEpoch, EndEpoch, Client, Label, Provider, PieceCID, PieceSize } = deal.Proposal
assert.strictEqual(typeof VerifiedDeal, 'boolean', `VerifiedDeal is not a boolean: ${JSON.stringify(deal.Proposal)}`)
if (!VerifiedDeal) return

Expand Down Expand Up @@ -121,6 +121,7 @@ function * processDeal (deal) {
provider: Provider,
client: Client,
pieceCID: PieceCID['/'],
pieceSize: PieceSize,
payloadCID: Label,
started,
expires
Expand Down
6 changes: 3 additions & 3 deletions scripts/update-allocator-clients.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ if (!DATACAPS_TOKEN) {
const pgClient = new pg.Client(DATABASE_URL)
await pgClient.connect()

// The "WITH" query is a more performing variant of `SELECT DISTINCT(client) FROM retrievable_deals`
// The "WITH" query is a more performing variant of `SELECT DISTINCT(client) FROM eligible_deals`
// See https://wiki.postgresql.org/wiki/Loose_indexscan
const { rows: clientsMissingAllocator } = await pgClient.query(`
WITH all_clients AS (
WITH RECURSIVE t AS (
(SELECT client_id FROM retrievable_deals ORDER BY client_id LIMIT 1)
(SELECT client_id FROM eligible_deals ORDER BY client_id LIMIT 1)
UNION ALL
SELECT (SELECT client_id FROM retrievable_deals WHERE client_id > t.client_id ORDER BY client_id LIMIT 1)
SELECT (SELECT client_id FROM eligible_deals WHERE client_id > t.client_id ORDER BY client_id LIMIT 1)
FROM t
WHERE t.client_id IS NOT NULL
)
Expand Down