Skip to content

Commit

Permalink
[o11y] Add KV span tags
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Dec 31, 2024
1 parent 13135a2 commit b48a847
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 18 deletions.
81 changes: 69 additions & 12 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ constexpr auto FLPROD_405_HEADER = "CF-KV-FLPROD-405"_kj;
kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrUnknown,
kj::StringPtr urlStr) {
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options) {
const auto operationName = [&] {
KJ_SWITCH_ONEOF(opTypeOrUnknown) {
KJ_CASE_ONEOF(name, kj::LiteralStringConst) {
Expand All @@ -82,6 +83,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
switch (opType) {
case LimitEnforcer::KvOpType::GET:
return "kv_get"_kjc;
case LimitEnforcer::KvOpType::GET_WITH:
return "kv_getWithMetadata"_kjc;
case LimitEnforcer::KvOpType::PUT:
return "kv_put"_kjc;
case LimitEnforcer::KvOpType::LIST:
Expand All @@ -97,8 +100,53 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,

kj::Vector<Span::Tag> tags;
tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc));
tags.add("cloudflare.kv.operation.name"_kjc, kj::str(operationName.slice(3)));

KJ_IF_SOME(_options, options) {
KJ_SWITCH_ONEOF(_options) {
KJ_CASE_ONEOF(o2, kj::OneOf<kj::String, GetOptions>) {
KJ_SWITCH_ONEOF(o2) {
KJ_CASE_ONEOF(type, kj::String) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_CASE_ONEOF(o, GetOptions) {
KJ_IF_SOME(type, o.type) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_IF_SOME(cacheTtl, o.cacheTtl) {
tags.add("cloudflare.kv.query.parameter.cacheTtl"_kjc, (int64_t)cacheTtl);
}
}
}
}
KJ_CASE_ONEOF(o, ListOptions) {
KJ_IF_SOME(l, o.limit) {
tags.add("cloudflare.kv.query.parameter.limit"_kjc, (int64_t)l);
}
KJ_IF_SOME(prefix, o.prefix) {
KJ_IF_SOME(p, prefix) {
tags.add("cloudflare.kv.query.parameter.prefix"_kjc, kj::mv(p));
}
}
KJ_IF_SOME(cursor, o.cursor) {
KJ_IF_SOME(c, cursor) {
tags.add("cloudflare.kv.query.parameter.cursor"_kjc, kj::mv(c));
}
}
}
KJ_CASE_ONEOF(o, PutOptions) {
KJ_IF_SOME(expiration, o.expiration) {
tags.add("cloudflare.kv.query.parameter.expiration"_kjc, (int64_t)expiration);
}
KJ_IF_SOME(expirationTtl, o.expirationTtl) {
tags.add("cloudflare.kv.query.parameter.expirationTtl"_kjc, (int64_t)expirationTtl);
}
}
}
}
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, kj::mv(tags));

headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand All @@ -107,19 +155,25 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags) {
jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
auto resp = getWithMetadata(js, kj::mv(name), kj::mv(options));
auto resp =
getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET);
return resp.then(js,
[](jsg::Lock&, KvNamespace::GetWithMetadataResult result) { return kj::mv(result.value); });
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op) {
validateKeyName("GET", name);

auto& context = IoContext::current();
Expand All @@ -134,11 +188,11 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_IF_SOME(cacheTtl, options.cacheTtl) {
url.query.add(kj::Url::QueryParam{kj::str("cache_ttl"), kj::str(cacheTtl)});
Expand All @@ -150,7 +204,7 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET, urlStr);
auto client = getHttpClient(context, headers, op, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -262,7 +316,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::list(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -365,7 +420,8 @@ jsg::Promise<void> KvNamespace::put(jsg::Lock& js,

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[&context, client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
Expand Down Expand Up @@ -421,7 +477,8 @@ jsg::Promise<void> KvNamespace::delete_(jsg::Lock& js, kj::String name) {

kj::HttpHeaders headers(context.getHeaderTable());

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr, kj::none);

auto promise = context.waitForOutputLocks().then(
[headers = kj::mv(headers), client = kj::mv(client), urlStr = kj::mv(urlStr)]() mutable {
Expand Down
13 changes: 8 additions & 5 deletions src/workerd/api/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ class KvNamespace: public jsg::Object {
using GetResult = kj::Maybe<
kj::OneOf<jsg::Ref<ReadableStream>, kj::Array<byte>, kj::String, jsg::JsRef<jsg::JsValue>>>;

jsg::Promise<GetResult> get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags);
jsg::Promise<GetResult> get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

struct GetWithMetadataResult {
GetResult value;
Expand All @@ -68,6 +66,10 @@ class KvNamespace: public jsg::Object {
});
};

jsg::Promise<GetWithMetadataResult> getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op);
jsg::Promise<GetWithMetadataResult> getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

Expand Down Expand Up @@ -173,7 +175,8 @@ class KvNamespace: public jsg::Object {
kj::Own<kj::HttpClient> getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrName,
kj::StringPtr urlStr);
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options);

private:
kj::Array<AdditionalHeader> additionalHeaders;
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/limit-enforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class LimitEnforcer {
// external subrequests.
virtual void newSubrequest(bool isInHouse) = 0;

enum class KvOpType { GET, PUT, LIST, DELETE };
enum class KvOpType { GET, GET_WITH, PUT, LIST, DELETE };
// Called before starting a KV operation. Throws a JSG exception if the operation should be
// blocked due to exceeding limits, such as the free tier daily operation limit.
virtual void newKvRequest(KvOpType op) = 0;
Expand Down

0 comments on commit b48a847

Please sign in to comment.