Skip to content

Commit

Permalink
Merge pull request #577 from metrico/fix/profiles_574_576
Browse files Browse the repository at this point in the history
WIP: fix/profiles 574 576
  • Loading branch information
akvlad authored Sep 26, 2024
2 parents 8c73c48 + 0881437 commit 312885d
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 39 deletions.
11 changes: 11 additions & 0 deletions lib/bun_wrapper.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { Transform } = require('stream')
const log = require('./logger')
const { EventEmitter } = require('events')
const zlib = require('zlib')

class BodyStream extends Transform {
_transform (chunk, encoding, callback) {
Expand Down Expand Up @@ -121,6 +122,16 @@ const wrapper = (handler, parsers) => {
headers['Content-Type'] = 'application/json'
response = JSON.stringify(response)
}
if (response && (ctx.headers.get('accept-encoding') || '').indexOf('gzip') !== -1) {
if (response.on) {
const _r = zlib.createGzip()
response.pipe(_r)
response = _r
} else {
response = Bun.gzipSync(response)
}
headers['Content-Encoding'] = 'gzip'
}
return new Response(response, { status: status, headers: headers })
}
return res
Expand Down
5 changes: 3 additions & 2 deletions pyroscope/json_parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const labelNames = async (req, payload) => {
return {
getStart: () => body.start,
getEnd: () => body.end,
getName: () => body.name
getName: () => body.name,
getMatchersList: () => body.matchers
}
}

Expand All @@ -43,7 +44,7 @@ const labelValues = async (req, payload) => {
body = JSON.parse(body.toString())
return {
getName: () => body.name,
getMatchers: () => body.matchers,
getMatchersList: () => body.matchers,
getStart: () => body.start,
getEnd: () => body.end
}
Expand Down
218 changes: 184 additions & 34 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,60 @@ WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDa
}

const labelNames = async (req, res) => {
const body = req.body
const dist = clusterName ? '_dist' : ''
const fromTimeSec = Math.floor(req.body && req.body.getStart
? parseInt(req.body.getStart()) / 1000
: (Date.now() - HISTORY_TIMESPAN) / 1000)
const toTimeSec = Math.floor(req.body && req.body.getEnd
? parseInt(req.body.getEnd()) / 1000
: Date.now() / 1000)
const labelNames = await clickhouse.rawRequest(`SELECT DISTINCT key
FROM profiles_series_keys${dist}
WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`,
null, DATABASE_NAME())
if (!body.getMatchersList || body.getMatchersList().length === 0) {
const q = `SELECT DISTINCT key
FROM profiles_series_keys ${dist}
WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))
AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`
console.log(q)
const labelNames = await clickhouse.rawRequest(q, null, DATABASE_NAME())
const resp = new types.LabelNamesResponse()
resp.setNamesList(labelNames.data.data.map(label => label.key))
return resp
}
const promises = []
for (const matcher of body.getMatchersList()) {
const specialMatchers = getSpecialMatchers(matcher)
const idxReq = matcherIdxRequest(matcher, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = new Sql.With('idx', idxReq)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(matcher)
const req = (new Sql.Select()).with(withIdxReq)
.select('key')
.distinct(true)
.from(`profiles_series_gin${dist}`)
.where(Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq))
))
promises.push(clickhouse.rawRequest(req.toString() + ' FORMAT JSON', null, DATABASE_NAME()))
}
const labelNames = await Promise.all(promises)
const labelNamesDedup = Object.fromEntries(
labelNames.flatMap(val => {
return val.data.data.map(row => [row.key, true])
})
)
const resp = new types.LabelNamesResponse()
resp.setNamesList(labelNames.data.data.map(label => label.key))
resp.setNamesList([...Object.keys(labelNamesDedup)])
return resp
}

const labelValues = async (req, res) => {
const dist = clusterName ? '_dist' : ''
const body = req.body;
const name = req.body && req.body.getName
? req.body.getName()
: ''
Expand All @@ -83,13 +119,45 @@ const labelValues = async (req, res) => {
if (!name) {
throw new Error('No name provided')
}
const labelValues = await clickhouse.rawRequest(`SELECT DISTINCT val
if (!body.getMatchersList || body.getMatchersList().length === 0) {
const labelValues = await clickhouse.rawRequest(`SELECT DISTINCT val
FROM profiles_series_gin${dist}
WHERE key = ${Sql.quoteVal(name)} AND
date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND
date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`, null, DATABASE_NAME())
const resp = new types.LabelValuesResponse()
resp.setNamesList(labelValues.data.data.map(label => label.val))
return resp
}
const promises = []
for (const matcher of body.getMatchersList()) {
const specialMatchers = getSpecialMatchers(matcher)
const idxReq = matcherIdxRequest(matcher, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = new Sql.With('idx', idxReq)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(matcher)
const req = (new Sql.Select()).with(withIdxReq)
.select('val')
.distinct(true)
.from(`profiles_series_gin${dist}`)
.where(Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq('key', name),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq))
))
console.log(req.toString())
promises.push(clickhouse.rawRequest(req.toString() + ' FORMAT JSON', null, DATABASE_NAME()))
}
const labelValues = await Promise.all(promises)
const labelValuesDedup = Object.fromEntries(
labelValues.flatMap(val => val.data.data.map(row => [row.val, true]))
)
const resp = new types.LabelValuesResponse()
resp.setNamesList(labelValues.data.data.map(label => label.val))
resp.setNamesList([...Object.keys(labelValuesDedup)])
return resp
}

Expand Down Expand Up @@ -244,6 +312,36 @@ const selectMergeProfile = async (req, res) => {
}
}

/**
*
* @param labelSelector {string}
* @param specialMatchers {object || undefined}
* @param fromTimeSec {number}
* @param toTimeSec {number}
* @returns {Sql.Select}
*/
const matcherIdxRequest = (labelSelector, specialMatchers, fromTimeSec, toTimeSec) => {
specialMatchers = specialMatchers || getSpecialMatchers(labelSelector)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
)
if (!specialMatchers.query.match(/^[{} ]*$/)) {
labelSelectorQuery(idxReq, specialMatchers.query)
}
return idxReq
}

const series = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(_req.getStart && _req.getStart()
Expand All @@ -256,19 +354,40 @@ const series = async (req, res) => {
const promises = []
for (const labelSelector of _req.getMatchersList() || []) {
const specialMatchers = getSpecialMatchers(labelSelector)
const specialClauses = specialMatchersQuery(specialMatchers.matchers)
// Special matchers -> query clauses
const sampleTypesUnitsFieldName = '_sample_types_units'
const clauses = []
if (specialMatchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", specialMatchers.__name__))
}
if (specialMatchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", specialMatchers.__period_type__))
}
if (specialMatchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", specialMatchers.__period_unit__))
}
if (specialMatchers.__sample_type__) {
clauses.push(matcherClause(`${sampleTypesUnitsFieldName}.1`, specialMatchers.__sample_type__))
}
if (specialMatchers.__sample_unit__) {
clauses.push(matcherClause(`${sampleTypesUnitsFieldName}.2`, specialMatchers.__sample_unit__))
}
if (specialMatchers.__profile_type__) {
clauses.push(matcherClause(
`format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], ${sampleTypesUnitsFieldName}.1, ${sampleTypesUnitsFieldName}.2, _parts[2], _parts[3])`,
specialMatchers.__profile_type__))
}
let specialClauses = null
if (clauses.length === 0) {
specialClauses = Sql.Eq(new Sql.Raw('1'), 1)
} else if (clauses.length === 1) {
specialClauses = clauses[0]
} else {
specialClauses = Sql.And(...clauses)
}
//
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
)
labelSelectorQuery(idxReq, specialMatchers.query)
const idxReq = matcherIdxRequest(labelSelector, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const labelsReq = (new Sql.Select())
.with(withIdxReq)
Expand Down Expand Up @@ -349,8 +468,21 @@ const series = async (req, res) => {
}

/**
* returns special matchers and sanitized query without them as following:
* @example
* {
* "matchers": {
* "__name__": ["=", "foo"],
* "__period_type__": ["=~", "bar"],
* },
* "query": "{service_name=\"abc\", job=\"def\"}"
* }
*
* @param query {string}
* @returns {{
* matchers: { [fieldName: string]: [operator: string, value: string] },
* query: string
* }}
*/
const getSpecialMatchers = (query) => {
if (query.length <= 2) {
Expand Down Expand Up @@ -395,27 +527,45 @@ const matcherClause = (field, matcher) => {
return valRul
}

const specialMatchersQuery = (matchers) => {
/**
* @example
* specialMatchersQuery({
* "__name__": ["=", "foo"],
* "__period_type__": ["=~", "bar"],
* })
*
* @param specialMatchers {Object}
* @returns {Sql.Condition}
*/
const specialMatchersQuery = (specialMatchers) => {
const sampleTypesUnitsFieldName = 'sample_types_units'
const clauses = []
if (matchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", matchers.__name__))
if (specialMatchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", specialMatchers.__name__))
}
if (matchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", matchers.__period_type__))
if (specialMatchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", specialMatchers.__period_type__))
}
if (matchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", matchers.__period_unit__))
if (specialMatchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", specialMatchers.__period_unit__))
}
const arrayExists = (field) => {
const arrayExists = Sql.Condition(null, null, null)
arrayExists.toString = () => {
return `arrayExists(x -> ${field}, ${sampleTypesUnitsFieldName})`
}
return arrayExists
}
if (matchers.__sample_type__) {
clauses.push(matcherClause('_sample_types_units.1', matchers.__sample_type__))
if (specialMatchers.__sample_type__) {
clauses.push(arrayExists(matcherClause('x.1', specialMatchers.__sample_type__)))
}
if (matchers.__sample_unit__) {
clauses.push(matcherClause('_sample_types_units.2', matchers.__sample_unit__))
if (specialMatchers.__sample_unit__) {
clauses.push(arrayExists(matcherClause('x.2', specialMatchers.__sample_unit__)))
}
if (matchers.__profile_type__) {
clauses.push(matcherClause(
"format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], _sample_types_units.1, _sample_types_units.2, _parts[2], _parts[3])",
matchers.__profile_type__))
if (specialMatchers.__profile_type__) {
clauses.push(arrayExists(matcherClause(
"format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], x.1, x.2, _parts[2], _parts[3])",
specialMatchers.__profile_type__)))
}
if (clauses.length === 0) {
return Sql.Eq(new Sql.Raw('1'), 1)
Expand Down
4 changes: 2 additions & 2 deletions pyroscope/render.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const render = async (req, res) => {
? Math.floor(parseInt(req.query.until) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
if (!parsedQuery) {
return res.sendStatus(400).send('Invalid query')
return res.code(400).send('Invalid query')
}
const groupBy = req.query.groupBy || []
let agg = ''
Expand All @@ -26,7 +26,7 @@ const render = async (req, res) => {
break
}
if (req.query.format === 'dot') {
return res.sendStatus(400).send('Dot format is not supported')
return res.code(400).send('Dot format is not supported')
}
const promises = []
promises.push(mergeStackTraces(
Expand Down
2 changes: 1 addition & 1 deletion test/e2e
Submodule e2e updated from 376a7d to 7d7767

0 comments on commit 312885d

Please sign in to comment.