Skip to content

Commit

Permalink
Update argument parsing based on peer feedback to exclude needing ove…
Browse files Browse the repository at this point in the history
…rrides in jcommander

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Sep 26, 2024
1 parent cea160a commit 207e411
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public static class S3RepoInfo {
public static void main(String[] args) throws Exception {
Args arguments = new Args();
JCommander jCommander = JCommander.newBuilder()
.allowParameterOverwriting(true)
.addObject(arguments)
.build();
jCommander.parse(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static class Args {
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Default: ES 7.10"), required = false,
@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Default: ES_7.10"), required = false,
converter = VersionConverter.class)
public Version sourceVersion = Version.fromString("ES 7.10");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public static void main(String[] args) throws Exception {
var migrateArgs = new MigrateArgs();
var evaluateArgs = new EvaluateArgs();
var jCommander = JCommander.newBuilder()
.allowParameterOverwriting(true)
.addObject(metadataArgs)
.addCommand(migrateArgs)
.addCommand(evaluateArgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MigrateOrEvaluateArgs {
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@ParametersDelegate
public DataFilterArgs dataFilterArgs = new DataFilterArgs();
public DataFilterArgs dataFilterArgs = new DataFilterArgs();

// https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas" }, description = "Optional. The minimum number of replicas configured for migrated indices on the target."
Expand All @@ -50,6 +50,6 @@ public class MigrateOrEvaluateArgs {
+ "forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;

@Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3. Defaults to: ES 7.10", converter = VersionConverter.class)
@Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3. Defaults to: ES_7.10", converter = VersionConverter.class)
public Version sourceVersion = Version.fromString("ES 7.10");
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ public static class Parameters {
public String otelCollectorEndpoint;
@Parameter(required = false,
names = "--setHeader",
arity = 2,
splitter = NoSplitter.class,
arity = 2,
description = "[header-name header-value] Set an HTTP header (first argument) with to the specified value" +
" (second argument). Any existing headers with that name will be removed.")
public List<String> headerOverrides = new ArrayList<>();
@Parameter(required = false,
names = "--suppressCaptureForHeaderMatch",
arity = 2,
splitter = NoSplitter.class,
arity = 2,
description = "The header name (which will be interpreted in a case-insensitive manner) and a regex "
+ "pattern. When the incoming request has a header that matches the regex, it will be passed "
+ "through to the service but will NOT be captured. E.g. user-agent 'healthcheck'.")
Expand All @@ -177,7 +177,6 @@ public static class Parameters {
static Parameters parseArgs(String[] args) {
Parameters p = new Parameters();
JCommander jCommander = new JCommander(p);
jCommander.setAllowParameterOverwriting(true);
try {
jCommander.parse(args);
// Exactly one these 3 options are required. See that exactly one is set by summing up their presence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
public class TestHeaderRewrites {

public static final String ONLY_FOR_HEADERS_VALUE = "this is only for headers";
public static final String ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS = "!@#$%^&*()_+,./:\":;\\/";
public static final String BODY_WITH_HEADERS_CONTENTS = "\n" +
"body: should stay\n" +
"body: untouched\n" +
Expand All @@ -45,9 +44,6 @@ public void testHeaderRewrites() throws Exception {
"host",
"localhost",
"--setHeader",
"specialCharacters",
ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS,
"--setHeader",
"X-new-header",
"insignificant value"
);
Expand All @@ -74,9 +70,7 @@ public void testHeaderRewrites() throws Exception {
var capturedRequest = capturedRequestList.get(capturedRequestList.size()-1).getHeaders().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Assertions.assertEquals("localhost", capturedRequest.get("host"));
Assertions.assertEquals(ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS, capturedRequest.get("specialCharacters"));
Assertions.assertEquals("insignificant value", capturedRequest.get("X-new-header"));

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ static class Parameters {
public static Parameters parseArgs(String[] args) {
Parameters p = new Parameters();
JCommander jCommander = new JCommander(p);
jCommander.setAllowParameterOverwriting(true);
try {
jCommander.parse(args);
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ public static class Parameters {
private static Parameters parseArgs(String[] args) {
Parameters p = new Parameters();
JCommander jCommander = new JCommander(p);
jCommander.setAllowParameterOverwriting(true);
try {
jCommander.parse(args);
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,56 @@ export function getTargetPasswordAccessPolicy(targetPasswordSecretArn: string):
})
}

export function appendArgIfNotInExtraArgs(
baseCommand: string,
extraArgsDict: Record<string, string[]>,
arg: string,
value: string | null = null,
): string {
if (extraArgsDict[arg] === undefined) {
// If not present, append the argument and value (only append value if it exists)
baseCommand = value !== null ? baseCommand.concat(" ", arg, " ", value) : baseCommand.concat(" ", arg);
}
return baseCommand;
}

export function parseArgsToDict(argString: string | undefined): Record<string, string[]> {
const args: Record<string, string[]> = {};
if (argString === undefined) {
return args;
}
// Split based on '--' at the start of the string or preceded by whitespace, use non-capturing groups to include -- in parts
const parts = argString.split(/(?=\s--|^--)/).filter(Boolean);

parts.forEach(part => {
const trimmedPart = part.trim();
if (trimmedPart.length === 0) return; // Skip empty parts

// Use a regular expression to find the first whitespace character
const firstWhitespaceMatch = trimmedPart.match(/\s/);
const firstWhitespaceIndex = firstWhitespaceMatch?.index;

const key = firstWhitespaceIndex === undefined ? trimmedPart : trimmedPart.slice(0, firstWhitespaceIndex).trim();
const value = firstWhitespaceIndex === undefined ? '' : trimmedPart.slice(firstWhitespaceIndex + 1).trim();

// Validate the key starts with -- followed by a non-whitespace characters
if (/^--\S+/.test(key)) {
if (args[key] !== undefined) {
args[key].push(value);
} else {
args[key] = [value];
}
} else {
throw new Error(`Invalid argument key: '${key}'. Argument keys must start with '--' and contain no spaces.`);
}
});
if (argString.trim() && !args) {
throw new Error(`Unable to parse args provided: '${argString}'`);
}

return args;
}

export function createOpenSearchIAMAccessPolicy(partition: string, region: string, accountId: string): PolicyStatement {
return new PolicyStatement({
effect: Effect.ALLOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {StreamingSourceType} from "../streaming-source-type";
import {
MigrationSSMParameter,
createMSKProducerIAMPolicies,
getMigrationStringParameterValue,
getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs,
} from "../common-utilities";
import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar";

Expand Down Expand Up @@ -62,10 +62,21 @@ export class CaptureProxyESStack extends MigrationServiceCore {
...props,
parameter: MigrationSSMParameter.KAFKA_BROKERS,
});
let command = `/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml`
command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command
command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command
command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command

let command = "/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy"
const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", "https://localhost:19200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination", "https://localhost:19200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml")
if (props.streamingSourceType !== StreamingSourceType.DISABLED) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints)
}
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth")
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

this.createService({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
MigrationSSMParameter,
createMSKProducerIAMPolicies,
getCustomStringParameterValue,
getMigrationStringParameterValue,
getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs,
} from "../common-utilities";
import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar";

Expand Down Expand Up @@ -121,10 +121,22 @@ export class CaptureProxyStack extends MigrationServiceCore {

const destinationEndpoint = getDestinationEndpoint(this, props.destinationConfig, props);

let command = `/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri ${destinationEndpoint} --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml`
command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command
command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command
command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command
let command = "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy"

const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", destinationEndpoint)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--listenPort", "9200")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml")
if (props.streamingSourceType !== StreamingSourceType.DISABLED) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints)
}
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth")
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

this.createService({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
createOpenSearchServerlessIAMAccessPolicy,
getTargetPasswordAccessPolicy,
getMigrationStringParameterValue,
ClusterAuth
ClusterAuth, parseArgsToDict, appendArgIfNotInExtraArgs
} from "../common-utilities";
import { RFSBackfillYaml, SnapshotYaml } from "../migration-services-yaml";
import { OtelCollectorSidecar } from "./migration-otel-collector-sidecar";
Expand Down Expand Up @@ -68,21 +68,40 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
parameter: MigrationSSMParameter.OS_CLUSTER_ENDPOINT,
});
const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`;
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"${s3Uri}\" --s3-region ${this.region} --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ${osClusterEndpoint}`
rfsCommand = props.clusterAuthDetails.sigv4 ? rfsCommand.concat(`--target-aws-service-signing-name ${props.clusterAuthDetails.sigv4.serviceSigningName} --target-aws-region ${props.clusterAuthDetails.sigv4.region}`) : rfsCommand
rfsCommand = props.otelCollectorEnabled ? rfsCommand.concat(` --otel-collector-endpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : rfsCommand
rfsCommand = props.sourceClusterVersion ? rfsCommand.concat(` --source-version \"${props.sourceClusterVersion}\"`) : rfsCommand
// TODO: This approach with extraArgs may not work with the entryPoint env arg processing that is occurring here. https://opensearch.atlassian.net/browse/MIGRATIONS-2025
rfsCommand = props.extraArgs ? rfsCommand.concat(` ${props.extraArgs}`) : rfsCommand
let command = "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments"
const extraArgsDict = parseArgsToDict(props.extraArgs)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", "/tmp/s3_files")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", "/lucene")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint)
if (props.clusterAuthDetails.sigv4) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region)
}
if (props.otelCollectorEnabled) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otel-collector-endpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint())
}
if (props.sourceClusterVersion) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--source-version", `"${props.sourceClusterVersion}"`)
}

let targetUser = "";
let targetPassword = "";
let targetPasswordArn = "";
if (props.clusterAuthDetails.basicAuth) {
targetUser = props.clusterAuthDetails.basicAuth.username
targetPassword = props.clusterAuthDetails.basicAuth.password ?? ""
targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? ""
};
// Only set user or password if not overridden in extraArgs
if (extraArgsDict["--target-username"] === undefined) {
targetUser = props.clusterAuthDetails.basicAuth.username
}
if (extraArgsDict["--target-password"] === undefined) {
targetPassword = props.clusterAuthDetails.basicAuth.password ?? ""
targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? ""
}
}
command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command

const sharedLogFileSystem = new SharedLogFileSystem(this, props.stage, props.defaultDeployId);
const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account);
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account);
Expand All @@ -108,7 +127,7 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
taskMemoryLimitMiB: 4096,
ephemeralStorageGiB: 200,
environment: {
"RFS_COMMAND": rfsCommand,
"RFS_COMMAND": command,
"RFS_TARGET_USER": targetUser,
"RFS_TARGET_PASSWORD": targetPassword,
"RFS_TARGET_PASSWORD_ARN": targetPasswordArn,
Expand Down
Loading

0 comments on commit 207e411

Please sign in to comment.