Skip to content

Commit

Permalink
cli run explain use createDispatcher API (#495)
Browse files Browse the repository at this point in the history
- Stop exposing internal `CommandHandlerContext` APIs. 
- Switch to use `createDispatcher` and use the `@dispatcher explain`
command
- Move some of the additional functionality from the CLI to the command
handler.
- Fix consoleIO input hanging the CLI in `run explain`
  • Loading branch information
curtisman authored Dec 13, 2024
1 parent 42c9020 commit 53cf1db
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 322 deletions.
191 changes: 39 additions & 152 deletions ts/packages/cache/src/cache/cache.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { QueueObject, queue } from "async";
import { DeepPartialUndefined } from "common-utils";
import * as Telemetry from "telemetry";
import { ExplanationData } from "../explanation/explanationData.js";
import {
Actions,
normalizeParamString,
RequestAction,
} from "../explanation/requestAction.js";
import { RequestAction } from "../explanation/requestAction.js";
import {
SchemaInfoProvider,
doCacheAction,
} from "../explanation/schemaInfoProvider.js";
import { GenericExplanationResult } from "../index.js";
import { ConstructionStore, ConstructionStoreImpl } from "./store.js";
import { ExplainerFactory } from "./factory.js";
import { getLanguageTools } from "../utils/language.js";
import { NamespaceKeyFilter } from "../constructions/constructionCache.js";

export type ProcessExplanationResult = {
explanation: GenericExplanationResult;
elapsedMs: number;
toPrettyString?: ((explanation: object) => string) | undefined;
};
import {
ExplainWorkQueue,
ExplanationOptions,
ProcessExplanationResult,
} from "./explainWorkQueue.js";

export type ProcessRequestActionResult = {
explanationResult: ProcessExplanationResult;
Expand Down Expand Up @@ -53,63 +45,6 @@ function getFailedResult(message: string): ProcessRequestActionResult {
};
}

export type ExplanationOptions = {
concurrent?: boolean; // whether to limit to run one at a time, require cache to be false
valueInRequest?: boolean;
noReferences?: boolean;
checkExplainable?:
| ((requestAction: RequestAction) => Promise<void>)
| undefined; // throw exception if not explainable
};

const langTool = getLanguageTools("en");

function checkExplainableValues(
requestAction: RequestAction,
valueInRequest: boolean,
noReferences: boolean,
) {
// Do a cheap parameter check first.
const normalizedRequest = normalizeParamString(requestAction.request);
const pending: unknown[] = [];

for (const action of requestAction.actions) {
pending.push(action.parameters);
}

while (pending.length > 0) {
const value = pending.pop();
if (!value) {
continue;
}

// TODO: check number too.
if (typeof value === "string") {
if (noReferences && langTool?.possibleReferentialPhrase(value)) {
throw new Error(
"Request contains a possible referential phrase used for property values.",
);
}
if (
valueInRequest &&
!normalizedRequest.includes(normalizeParamString(value))
) {
throw new Error(
`Action parameter value '${value}' not found in the request`,
);
}
continue;
}
if (typeof value === "object") {
if (Array.isArray(value)) {
pending.push(...value);
} else {
pending.push(...Object.values(value));
}
}
}
}

// Construction namespace policy
export function getSchemaNamespaceKeys(
schemaNames: string[],
Expand All @@ -126,18 +61,13 @@ export function getSchemaNamespaceKeys(

export class AgentCache {
private _constructionStore: ConstructionStoreImpl;
private queue: QueueObject<{
task: () => Promise<ProcessRequestActionResult>;
resolve: (value: ProcessRequestActionResult) => void;
reject: (reason?: any) => void;
}>;

private readonly explainWorkQueue: ExplainWorkQueue;
private readonly namespaceKeyFilter?: NamespaceKeyFilter;
private readonly logger: Telemetry.Logger | undefined;
public model?: string;
constructor(
public readonly explainerName: string,
private readonly getExplainerForTranslator: ExplainerFactory,
getExplainerForTranslator: ExplainerFactory,
private readonly schemaInfoProvider?: SchemaInfoProvider,
cacheOptions?: CacheOptions,
logger?: Telemetry.Logger,
Expand All @@ -147,13 +77,7 @@ export class AgentCache {
cacheOptions,
);

this.queue = queue(async (item, callback) => {
try {
item.resolve(await item.task());
} catch (e: any) {
item.reject(e);
}
});
this.explainWorkQueue = new ExplainWorkQueue(getExplainerForTranslator);

this.logger = logger
? new Telemetry.ChildLogger(logger, "cache", {
Expand Down Expand Up @@ -190,56 +114,43 @@ export class AgentCache {

return this._constructionStore.prune(this.namespaceKeyFilter);
}
private getExplainerForActions(actions: Actions) {
return this.getExplainerForTranslator(
actions.translatorNames,
this.model,
);
}

private async queueTask(
public async processRequestAction(
requestAction: RequestAction,
cache: boolean,
cache: boolean = true,
options?: ExplanationOptions,
): Promise<ProcessRequestActionResult> {
const concurrent = options?.concurrent ?? false;
const valueInRequest = options?.valueInRequest ?? true;
const noReferences = options?.noReferences ?? true;
const checkExplainable = options?.checkExplainable;
const actions = requestAction.actions;
for (const action of actions) {
const cacheAction = doCacheAction(action, this.schemaInfoProvider);

if (!cacheAction) {
return getFailedResult(
`Caching disabled in schema config for action '${action.fullActionName}'`,
);
}
}

checkExplainableValues(requestAction, valueInRequest, noReferences);

const task = async () => {
const store = this._constructionStore;
const generateConstruction = cache && store.isEnabled();
const startTime = performance.now();
try {
const actions = requestAction.actions;
const explainer = this.getExplainerForActions(actions);
const constructionCreationConfig = {
schemaInfoProvider: this.schemaInfoProvider,
};
if (cache) {
for (const action of actions) {
const cacheAction = doCacheAction(
action,
this.schemaInfoProvider,
);

const explainerConfig = {
constructionCreationConfig,
};
if (!cacheAction) {
return getFailedResult(
`Caching disabled in schema config for action '${action.fullActionName}'`,
);
}
}
}

await checkExplainable?.(requestAction);
const explanation = await explainer.generate(
const constructionCreationConfig = cache
? {
schemaInfoProvider: this.schemaInfoProvider,
}
: undefined;
const explanationResult = await this.explainWorkQueue.queueTask(
requestAction,
explainerConfig,
cache,
options,
constructionCreationConfig,
this.model,
);
const elapsedMs = performance.now() - startTime;

const { explanation, elapsedMs } = explanationResult;
this.logger?.logEvent("explanation", {
request: requestAction.request,
actions,
Expand All @@ -248,11 +159,8 @@ export class AgentCache {
elapsedMs,
});

const explanationResult = {
explanation,
elapsedMs,
toPrettyString: explainer.toPrettyString,
};
const store = this._constructionStore;
const generateConstruction = cache && store.isEnabled();
if (generateConstruction && explanation.success) {
const construction = explanation.construction;
let added = false;
Expand Down Expand Up @@ -299,27 +207,6 @@ export class AgentCache {
return {
explanationResult,
};
};

if (concurrent && !cache) {
return task();
}
return new Promise((resolve, reject) => {
this.queue.push({
task,
resolve,
reject,
});
});
}

public async processRequestAction(
requestAction: RequestAction,
cache: boolean = true,
options?: ExplanationOptions,
): Promise<ProcessRequestActionResult> {
try {
return await this.queueTask(requestAction, cache, options);
} catch (e: any) {
this.logger?.logEvent("error", {
request: requestAction.request,
Expand All @@ -340,7 +227,7 @@ export class AgentCache {
) {
return this._constructionStore.import(
data,
this.getExplainerForTranslator,
this.explainWorkQueue.getExplainerForTranslator,
this.schemaInfoProvider,
ignoreSourceHash,
);
Expand Down
Loading

0 comments on commit 53cf1db

Please sign in to comment.