Skip to content

Commit

Permalink
Add support for progress in aggregated apis
Browse files Browse the repository at this point in the history
  • Loading branch information
134dd3v committed Mar 27, 2023
1 parent c6a81d2 commit 2290fa6
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 13 deletions.
17 changes: 17 additions & 0 deletions redis-utils/list-keys-containing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { getRedisClient } from './get-client'

export async function listKeysContaining(str: string) {
let cursor: string | number = 0
let keys = []
const result = []
const client = getRedisClient()
while (cursor !== '0') {
;[cursor, keys] = await client.scan(cursor)
for (const key of keys) {
if (key.includes(str)) {
result.push(key)
}
}
}
return result
}
15 changes: 14 additions & 1 deletion routes/aggregated/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import {
getExcludeRawData,
getNetworkName,
handleRuntimeErrors,
hours
hours,
secs,
getParamAsString
} from '../../utils'
import UserRouter from './user'
import V2Router from './v2'
Expand Down Expand Up @@ -225,6 +227,17 @@ router.get(
})
)

router.get(
'/progress',
handleRuntimeErrors(async (req) => {
const label = getParamAsString(req, 'label')
return cacheFunctionResult(aggregated.progress, [label], {
cacheSeconds: 1 * secs,
tags: ['aggregated']
})
})
)

router.use('/user', UserRouter)
router.use('/v2', V2Router)

Expand Down
1 change: 1 addition & 0 deletions scripts/aggregated/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './rebalance-info'
export * from './total-shares'
export * from './trader-pnl'
export * from './uniswap-slippage'
export * from './progress'
export * as user from './user'
export * from './vault-info'
export * as v2 from './v2'
10 changes: 10 additions & 0 deletions scripts/aggregated/progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { getRedisClient } from '../../redis-utils/get-client'

const redis = getRedisClient()

export async function progress(label: string) {
const progressKey = `parallelize-progress-${label}`
const data = JSON.parse((await redis.get(progressKey)) ?? '{}')

return { data }
}
124 changes: 112 additions & 12 deletions scripts/aggregated/util/parallelize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { keccak256, toUtf8Bytes } from 'ethers/lib/utils'

import { ENV } from '../../../env'
import { getRedisClient } from '../../../redis-utils/get-client'
import { currentTimestamp } from '../../../utils'

const redis = getRedisClient()

export type EventFn<Event> = (
networkName: NetworkName,
Expand Down Expand Up @@ -52,10 +55,13 @@ export async function parallelize<Data, Event extends ethers.Event>(
endBlockNumber
} = options

await updateProgress(label, 'fresh', 'started')

let allEvents: Event[] = []

if (Array.isArray(getEvents)) {
for (const _getEvents of getEvents) {
for (const [i, _getEvents] of getEvents.entries()) {
await updateProgress(label, 'update', `querying events bucket ${i}`)
const events = await _getEvents(
networkName,
provider,
Expand All @@ -65,9 +71,12 @@ export async function parallelize<Data, Event extends ethers.Event>(
allEvents = allEvents.concat(events)
}
} else {
await updateProgress(label, 'update', `querying single event bucket`)
allEvents = await getEvents(networkName, provider, startBlockNumber, endBlockNumber)
}

await updateProgress(label, 'update', `all events are queried`)

allEvents = allEvents.sort((a, b) => a.blockNumber - b.blockNumber)

if (startBlockNumber) {
Expand All @@ -91,9 +100,9 @@ export async function parallelize<Data, Event extends ethers.Event>(

// if source code stays the same, this fingerprint will not change and we can cache data
const fingerprint = keccak256(toUtf8Bytes(onEachEvent.toString()))
// TODO get these inline constant prepends in a file and import here
const key = `parallelize-fingerprint-${fingerprint}`

const redis = getRedisClient()
let oldData: any[] = []
const temp = new Map<string, Data>()
try {
Expand All @@ -105,6 +114,8 @@ export async function parallelize<Data, Event extends ethers.Event>(
}
} catch {}

await updateProgress(label, 'update', `starting query`)

const start = Date.now()
let i = 0
const promises = []
Expand Down Expand Up @@ -189,6 +200,7 @@ export async function parallelize<Data, Event extends ethers.Event>(

let redisPromise: Promise<any> = new Promise((res) => res(null))
const intr = setInterval(() => {
const speed = Number(((done * 1000) / (Date.now() - start)).toFixed(3))
console.info(
'retries',
failed,
Expand All @@ -199,11 +211,19 @@ export async function parallelize<Data, Event extends ethers.Event>(
'done',
done,
'speed',
((done * 1000) / (Date.now() - start)).toFixed(3),
speed,
`( ${label} )`
)

redisPromise = redis.set(key, JSON.stringify(oldData))
redisPromise = redis.set(key, JSON.stringify(oldData)).then(() =>
updateProgress(label, 'update', `interval`, {
retries: failed,
inflight,
total: allEvents.length,
done,
speed
})
)
}, 5000)

await Promise.all(promises)
Expand All @@ -212,14 +232,94 @@ export async function parallelize<Data, Event extends ethers.Event>(
await redisPromise
const finalResult = data.filter((d) => !!d) as Data[]
await redis.set(key, JSON.stringify(finalResult))
await updateProgress(label, 'end', `finished`)
return finalResult
}

// async function parallelizeRequest<T>(
// arr: () => Promise<T>,
// maxInflight: number
// ) {
// for (let i = 0; i < arr.length; i++) {
// arr[i]()
// }
// }
interface ProgressUpdate {
label: string
description: string
startTime: number
updateTime: number
endTime: number
currentProgress: {
retries: number
inflight: number
total: number
done: number
speed: number
}
}

async function updateProgress(
label: string,
type: 'fresh' | 'update' | 'end',
description: string,
currentProgress?: ProgressUpdate['currentProgress']
) {
// TODO get these inline constant prepends in a file
const progressKey = `parallelize-progress-${label}`
const str = await redis.get(progressKey)
let progress: ProgressUpdate
if (type === 'fresh' || !isProgressUpdate(str)) {
progress = {
label,
description,
startTime: currentTimestamp(),
updateTime: currentTimestamp(),
endTime: -1,
currentProgress: {
retries: 0,
inflight: 0,
total: 0,
done: 0,
speed: 0
}
}
} else if (type === 'update') {
progress = JSON.parse(str)
if (currentProgress === undefined) {
currentProgress = {
retries: 0,
inflight: 0,
total: 0,
done: 0,
speed: 0
}
}
progress.updateTime = currentTimestamp()
progress.description = description
progress.currentProgress = currentProgress
} else if (type === 'end') {
progress = JSON.parse(str)
progress.description = description
progress.updateTime = currentTimestamp()
progress.endTime = currentTimestamp()
} else {
throw new Error('unknown type in updateProgress: ' + JSON.stringify(type))
}
await redis.set(progressKey, JSON.stringify(progress))

function isProgressUpdate(redisResp: string | null): redisResp is string {
let val = null
try {
val = JSON.parse(redisResp ?? '')
} catch {}

return (
val !== null &&
typeof val === 'object' &&
typeof val.label === 'string' &&
typeof val.description === 'string' &&
typeof val.startTime === 'number' &&
typeof val.updateTime === 'number' &&
typeof val.endTime === 'number' &&
typeof val.currentProgress === 'object' &&
typeof val.currentProgress.retries === 'number' &&
typeof val.currentProgress.inflight === 'number' &&
typeof val.currentProgress.total === 'number' &&
typeof val.currentProgress.done === 'number' &&
typeof val.currentProgress.speed === 'number'
)
}
}

0 comments on commit 2290fa6

Please sign in to comment.