Skip to content

Commit

Permalink
Update argument parsing to remove dependency on yargs and make more r…
Browse files Browse the repository at this point in the history
…obust

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Sep 26, 2024
1 parent 3ec8dcd commit f934cb5
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 204 deletions.
6 changes: 3 additions & 3 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ echo "RFS_TARGET_PASSWORD_ARN: $RFS_TARGET_PASSWORD_ARN"
if [[ $RFS_COMMAND != *"--target-username"* ]]; then
if [[ -n "$RFS_TARGET_USER" ]]; then
echo "Using username from ENV variable RFS_TARGET_USER. Updating RFS Command with username."
RFS_COMMAND="$RFS_COMMAND --target-username $RFS_TARGET_USER"
RFS_COMMAND="$RFS_COMMAND --target-username \"$RFS_TARGET_USER\""
fi
fi

Expand All @@ -39,9 +39,9 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then
# Append the username/password to the RFS Command if have an updated password
if [[ -n "$PASSWORD_TO_USE" ]]; then
echo "Updating RFS Command with password."
RFS_COMMAND="$RFS_COMMAND --target-password $PASSWORD_TO_USE"
RFS_COMMAND="$RFS_COMMAND --target-password \"$PASSWORD_TO_USE\""
fi
fi

echo "Executing RFS Command"
eval $RFS_COMMAND
eval $RFS_COMMAND
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.config.SaslConfigs;

import org.opensearch.common.settings.Settings;
import org.opensearch.migrations.jcommander.NoSplitter;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
Expand Down Expand Up @@ -158,12 +159,14 @@ public static class Parameters {
public String otelCollectorEndpoint;
@Parameter(required = false,
names = "--setHeader",
splitter = NoSplitter.class,
arity = 2,
description = "[header-name header-value] Set an HTTP header (first argument) with to the specified value" +
" (second argument). Any existing headers with that name will be removed.")
public List<String> headerOverrides = new ArrayList<>();
@Parameter(required = false,
names = "--suppressCaptureForHeaderMatch",
splitter = NoSplitter.class,
arity = 2,
description = "The header name (which will be interpreted in a case-insensitive manner) and a regex "
+ "pattern. When the incoming request has a header that matches the regex, it will be passed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.opensearch.migrations.jcommander.NoSplitter;
import org.opensearch.migrations.replay.tracing.RootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.TrafficStreamLimiter;
import org.opensearch.migrations.replay.util.ActiveContextMonitor;
Expand Down Expand Up @@ -117,6 +118,7 @@ public static class Parameters {
@Parameter(
required = false, names = {
AWS_AUTH_HEADER_USER_AND_SECRET_ARG },
splitter = NoSplitter.class,
arity = 2,
description = "<USERNAME> <SECRET_ARN> pair to specify "
+ "\"authorization\" header value for each request. "
Expand Down
3 changes: 3 additions & 0 deletions coreUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dependencies {
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'

// JCommander
compileOnly group: 'org.jcommander', name: 'jcommander'

// OpenTelemetry core
api group: 'io.opentelemetry', name: 'opentelemetry-api'
api group: 'io.opentelemetry', name: 'opentelemetry-sdk'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.migrations.jcommander;

import java.util.List;

import com.beust.jcommander.converters.IParameterSplitter;

public class NoSplitter implements IParameterSplitter {
@Override
public List<String> split(String value) {
return List.of(value);
}
}
115 changes: 47 additions & 68 deletions deployment/cdk/opensearch-service-migration/lib/common-utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,66 @@ import {CpuArchitecture} from "aws-cdk-lib/aws-ecs";
import {RemovalPolicy, Stack} from "aws-cdk-lib";
import { IStringParameter, StringParameter } from "aws-cdk-lib/aws-ssm";
import * as forge from 'node-forge';
import * as yargs from 'yargs';
import { ClusterYaml } from "./migration-services-yaml";

export function getTargetPasswordAccessPolicy(targetPasswordSecretArn: string): PolicyStatement {
return new PolicyStatement({
effect: Effect.ALLOW,
resources: [targetPasswordSecretArn],
actions: [
"secretsmanager:GetSecretValue"
]
})
}

// parseAndMergeArgs, see @common-utilities.test.ts for an example of different cases
export function parseAndMergeArgs(baseCommand: string, extraArgs?: string): string {
if (!extraArgs) {
return baseCommand;
export function appendArgIfNotInExtraArgs(
baseCommand: string,
extraArgsDict: Record<string, string[]>,
arg: string,
value: string | null = null,
): string {
if (extraArgsDict[arg] === undefined) {
// If not present, append the argument and value (only append value if it exists)
baseCommand = value !== null ? baseCommand.concat(" ", arg, " ", value) : baseCommand.concat(" ", arg);
}
return baseCommand;
}

// Extract command prefix
const commandPrefix = baseCommand.substring(0, baseCommand.indexOf('--')).trim();
const baseArgs = baseCommand.substring(baseCommand.indexOf('--'));
export function parseArgsToDict(argString: string | undefined): Record<string, string[]> {
const args: Record<string, string[]> = {};
if (argString === undefined) {
return args;
}
// Split based on '--' at the start of the string or preceded by whitespace, use non-capturing groups to include -- in parts
const parts = argString.split(/(?=\s--|^--)/).filter(Boolean);

// Parse base command
const baseYargsConfig = {
parserConfiguration: {
'camel-case-expansion': false,
'boolean-negation': false,
}
};
parts.forEach(part => {
const trimmedPart = part.trim();
if (trimmedPart.length === 0) return; // Skip empty parts

const baseArgv = yargs(baseArgs)
.parserConfiguration(baseYargsConfig.parserConfiguration)
.parse();
// Use a regular expression to find the first whitespace character
const firstWhitespaceMatch = trimmedPart.match(/\s/);
const firstWhitespaceIndex = firstWhitespaceMatch?.index;

// Parse extra args if provided
const extraYargsConfig = {
parserConfiguration: {
'camel-case-expansion': false,
'boolean-negation': true,
}
};

const extraArgv = extraArgs
? yargs(extraArgs.split(' '))
.parserConfiguration(extraYargsConfig.parserConfiguration)
.parse()
: {};

// Merge arguments
const mergedArgv: { [key: string]: unknown } = { ...baseArgv };
for (const [key, value] of Object.entries(extraArgv)) {
if (key !== '_' && key !== '$0') {
if (!value &&
typeof value === 'boolean' &&
(
typeof (baseArgv as any)[key] === 'boolean' ||
(typeof (baseArgv as any)[`no-${key}`] != 'boolean' && typeof (baseArgv as any)[`no-${key}`])
)
) {
delete mergedArgv[key];
const key = firstWhitespaceIndex === undefined ? trimmedPart : trimmedPart.slice(0, firstWhitespaceIndex).trim();
const value = firstWhitespaceIndex === undefined ? '' : trimmedPart.slice(firstWhitespaceIndex + 1).trim();

// Validate the key starts with -- followed by a non-whitespace characters
if (/^--\S+/.test(key)) {
if (args[key] !== undefined) {
args[key].push(value);
} else {
mergedArgv[key] = value;
args[key] = [value];
}
} else {
throw new Error(`Invalid argument key: '${key}'. Argument keys must start with '--' and contain no spaces.`);
}
});
if (argString.trim() && !args) {
throw new Error(`Unable to parse args provided: '${argString}'`);
}

// Reconstruct command
const mergedArgs = Object.entries(mergedArgv)
.filter(([key]) => key !== '_' && key !== '$0')
.map(([key, value]) => {
if (typeof value === 'boolean') {
return value ? `--${key}` : `--no-${key}`;
}
return `--${key} ${value}`;
})
.join(' ');

let fullCommand = `${commandPrefix} ${mergedArgs}`.trim()
return fullCommand;
}

export function getTargetPasswordAccessPolicy(targetPasswordSecretArn: string): PolicyStatement {
return new PolicyStatement({
effect: Effect.ALLOW,
resources: [targetPasswordSecretArn],
actions: [
"secretsmanager:GetSecretValue"
]
})
return args;
}

export function createOpenSearchIAMAccessPolicy(partition: string, region: string, accountId: string): PolicyStatement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import {StreamingSourceType} from "../streaming-source-type";
import {
MigrationSSMParameter,
createMSKProducerIAMPolicies,
getMigrationStringParameterValue,
parseAndMergeArgs
getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs,
} from "../common-utilities";
import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar";

Expand Down Expand Up @@ -63,11 +62,22 @@ export class CaptureProxyESStack extends MigrationServiceCore {
...props,
parameter: MigrationSSMParameter.KAFKA_BROKERS,
});
let command = `/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml`
command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command
command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command
command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command
command = parseAndMergeArgs(command, props.extraArgs);

let command = "/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy"
const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", "https://localhost:19200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination", "https://localhost:19200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml")
if (props.streamingSourceType !== StreamingSourceType.DISABLED) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints)
}
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth")
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

this.createService({
serviceName: "capture-proxy-es",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import {
MigrationSSMParameter,
createMSKProducerIAMPolicies,
getCustomStringParameterValue,
getMigrationStringParameterValue,
parseAndMergeArgs
getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs,
} from "../common-utilities";
import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar";

Expand Down Expand Up @@ -122,11 +121,23 @@ export class CaptureProxyStack extends MigrationServiceCore {

const destinationEndpoint = getDestinationEndpoint(this, props.destinationConfig, props);

let command = `/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri ${destinationEndpoint} --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml`
command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command
command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command
command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command
command = parseAndMergeArgs(command, props.extraArgs);
let command = "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy"

const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", destinationEndpoint)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--listenPort", "9200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml")
if (props.streamingSourceType !== StreamingSourceType.DISABLED) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints)
}
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth")
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

this.createService({
serviceName: serviceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import {
createOpenSearchServerlessIAMAccessPolicy,
getTargetPasswordAccessPolicy,
getMigrationStringParameterValue,
parseAndMergeArgs,
ClusterAuth
ClusterAuth, parseArgsToDict, appendArgIfNotInExtraArgs
} from "../common-utilities";
import { RFSBackfillYaml, SnapshotYaml } from "../migration-services-yaml";
import { OtelCollectorSidecar } from "./migration-otel-collector-sidecar";
Expand Down Expand Up @@ -69,20 +68,40 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
parameter: MigrationSSMParameter.OS_CLUSTER_ENDPOINT,
});
const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`;
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${this.region} --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ${osClusterEndpoint}`
rfsCommand = props.clusterAuthDetails.sigv4 ? rfsCommand.concat(`--target-aws-service-signing-name ${props.clusterAuthDetails.sigv4.serviceSigningName} --target-aws-region ${props.clusterAuthDetails.sigv4.region}`) : rfsCommand
rfsCommand = props.otelCollectorEnabled ? rfsCommand.concat(` --otel-collector-endpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : rfsCommand
rfsCommand = props.sourceClusterVersion ? rfsCommand.concat(` --source-version ${props.sourceClusterVersion}`) : rfsCommand
rfsCommand = parseAndMergeArgs(rfsCommand, props.extraArgs);
let command = "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments"
const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", "/tmp/s3_files")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", "/lucene")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint)
if (props.clusterAuthDetails.sigv4) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region)
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otel-collector-endpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
if (props.sourceClusterVersion) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--source-version", `"${props.sourceClusterVersion}"`)
}

let targetUser = "";
let targetPassword = "";
let targetPasswordArn = "";
if (props.clusterAuthDetails.basicAuth) {
targetUser = props.clusterAuthDetails.basicAuth.username
targetPassword = props.clusterAuthDetails.basicAuth.password ?? ""
targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? ""
};
// Only set user or password if not overridden in extraArgs
if (extraArgsDict["--target-username"] === undefined) {
targetUser = props.clusterAuthDetails.basicAuth.username
}
if (extraArgsDict["--target-password"] === undefined) {
targetPassword = props.clusterAuthDetails.basicAuth.password ?? ""
targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? ""
}
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

const sharedLogFileSystem = new SharedLogFileSystem(this, props.stage, props.defaultDeployId);
const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account);
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account);
Expand All @@ -108,7 +127,7 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
taskMemoryLimitMiB: 4096,
ephemeralStorageGiB: 200,
environment: {
"RFS_COMMAND": rfsCommand,
"RFS_COMMAND": command,
"RFS_TARGET_USER": targetUser,
"RFS_TARGET_PASSWORD": targetPassword,
"RFS_TARGET_PASSWORD_ARN": targetPasswordArn,
Expand Down
Loading

0 comments on commit f934cb5

Please sign in to comment.