From cb162006e653a1723ec910d17de13b0ea8525550 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 26 Sep 2024 13:13:32 -0500 Subject: [PATCH] Update argument parsing based on peer feedback to exclude needing overrides in jcommander Signed-off-by: Andre Kurait --- .../opensearch/migrations/CreateSnapshot.java | 5 +- .../migrations/RfsMigrateDocuments.java | 2 +- .../migrations/MetadataMigration.java | 3 +- .../migrations/MigrateOrEvaluateArgs.java | 4 +- .../proxyserver/CaptureProxy.java | 5 +- .../proxyserver/TestHeaderRewrites.java | 6 - .../trafficcapture/JMeterLoadTest.java | 1 - .../migrations/replay/TrafficReplayer.java | 1 - .../lib/common-utilities.ts | 50 +++++ .../service-stacks/capture-proxy-es-stack.ts | 21 +- .../lib/service-stacks/capture-proxy-stack.ts | 22 +- .../reindex-from-snapshot-stack.ts | 43 +++- .../service-stacks/traffic-replayer-stack.ts | 34 ++- .../test/common-utilities.test.ts | 208 +++++++++++++++++- .../test/reindex-from-snapshot-stack.test.ts | 10 +- 15 files changed, 358 insertions(+), 57 deletions(-) diff --git a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java index 66f266c838..4a2a16962b 100644 --- a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java @@ -74,10 +74,7 @@ public static class S3RepoInfo { public static void main(String[] args) throws Exception { Args arguments = new Args(); - JCommander jCommander = JCommander.newBuilder() - .allowParameterOverwriting(true) - .addObject(arguments) - .build(); + JCommander jCommander = JCommander.newBuilder().addObject(arguments).build(); jCommander.parse(args); if (arguments.help) { diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index a506bda6f6..c1ab52b977 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -127,7 +127,7 @@ public static class Args { "used to communicate to the target, default 10") int maxConnections = 10; - @Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Default: ES 7.10"), required = false, + @Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Default: ES_7.10"), required = false, converter = VersionConverter.class) public Version sourceVersion = Version.fromString("ES 7.10"); } diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java b/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java index 402083ba15..2c8436966d 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java @@ -24,9 +24,8 @@ public class MetadataMigration { public static void main(String[] args) throws Exception { var metadataArgs = new MetadataArgs(); var migrateArgs = new MigrateArgs(); - var evaluateArgs = new EvaluateArgs(); + var evaluateArgs = new EvaluateArgs(); var jCommander = JCommander.newBuilder() - .allowParameterOverwriting(true) .addObject(metadataArgs) .addCommand(migrateArgs) .addCommand(evaluateArgs) diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java b/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java index 3fbed51f37..037a6f8814 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java @@ -38,7 +38,7 @@ public class MigrateOrEvaluateArgs { public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs(); @ParametersDelegate - public DataFilterArgs dataFilterArgs = new DataFilterArgs(); + public DataFilterArgs dataFilterArgs = new DataFilterArgs(); // https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ @Parameter(names = {"--min-replicas" }, description = "Optional. The minimum number of replicas configured for migrated indices on the target." @@ -50,6 +50,6 @@ public class MigrateOrEvaluateArgs { + "forwarded. If no value is provided, metrics will not be forwarded.") String otelCollectorEndpoint; - @Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3. Defaults to: ES 7.10", converter = VersionConverter.class) + @Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3. Defaults to: ES_7.10", converter = VersionConverter.class) public Version sourceVersion = Version.fromString("ES 7.10"); } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index 4e92bf80a9..4a82315c08 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -159,15 +159,15 @@ public static class Parameters { public String otelCollectorEndpoint; @Parameter(required = false, names = "--setHeader", - arity = 2, splitter = NoSplitter.class, + arity = 2, description = "[header-name header-value] Set an HTTP header (first argument) with to the specified value" + " (second argument). Any existing headers with that name will be removed.") public List headerOverrides = new ArrayList<>(); @Parameter(required = false, names = "--suppressCaptureForHeaderMatch", - arity = 2, splitter = NoSplitter.class, + arity = 2, description = "The header name (which will be interpreted in a case-insensitive manner) and a regex " + "pattern. When the incoming request has a header that matches the regex, it will be passed " + "through to the service but will NOT be captured. E.g. user-agent 'healthcheck'.") @@ -177,7 +177,6 @@ public static class Parameters { static Parameters parseArgs(String[] args) { Parameters p = new Parameters(); JCommander jCommander = new JCommander(p); - jCommander.setAllowParameterOverwriting(true); try { jCommander.parse(args); // Exactly one these 3 options are required. See that exactly one is set by summing up their presence diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java index 4f39863647..20221a8075 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java @@ -25,7 +25,6 @@ public class TestHeaderRewrites { public static final String ONLY_FOR_HEADERS_VALUE = "this is only for headers"; - public static final String ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS = "!@#$%^&*()_+,./:\":;\\/"; public static final String BODY_WITH_HEADERS_CONTENTS = "\n" + "body: should stay\n" + "body: untouched\n" + @@ -45,9 +44,6 @@ public void testHeaderRewrites() throws Exception { "host", "localhost", "--setHeader", - "specialCharacters", - ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS, - "--setHeader", "X-new-header", "insignificant value" ); @@ -74,9 +70,7 @@ public void testHeaderRewrites() throws Exception { var capturedRequest = capturedRequestList.get(capturedRequestList.size()-1).getHeaders().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Assertions.assertEquals("localhost", capturedRequest.get("host")); - Assertions.assertEquals(ONLY_FOR_HEADERS_VALUE_SPECIAL_CHARACTERS, capturedRequest.get("specialCharacters")); Assertions.assertEquals("insignificant value", capturedRequest.get("X-new-header")); - } } diff --git a/TrafficCapture/trafficCaptureProxyServerTest/src/main/java/org/opensearch/migrations/trafficcapture/JMeterLoadTest.java b/TrafficCapture/trafficCaptureProxyServerTest/src/main/java/org/opensearch/migrations/trafficcapture/JMeterLoadTest.java index fcbbf5b52e..8302e2ec19 100644 --- a/TrafficCapture/trafficCaptureProxyServerTest/src/main/java/org/opensearch/migrations/trafficcapture/JMeterLoadTest.java +++ b/TrafficCapture/trafficCaptureProxyServerTest/src/main/java/org/opensearch/migrations/trafficcapture/JMeterLoadTest.java @@ -42,7 +42,6 @@ static class Parameters { public static Parameters parseArgs(String[] args) { Parameters p = new Parameters(); JCommander jCommander = new JCommander(p); - jCommander.setAllowParameterOverwriting(true); try { jCommander.parse(args); return p; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index a03dac7238..73b88ad582 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -260,7 +260,6 @@ public static class Parameters { private static Parameters parseArgs(String[] args) { Parameters p = new Parameters(); JCommander jCommander = new JCommander(p); - jCommander.setAllowParameterOverwriting(true); try { jCommander.parse(args); return p; diff --git a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts index 3f179ee4a2..b242751666 100644 --- a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts +++ b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts @@ -16,6 +16,56 @@ export function getTargetPasswordAccessPolicy(targetPasswordSecretArn: string): }) } +export function appendArgIfNotInExtraArgs( + baseCommand: string, + extraArgsDict: Record, + arg: string, + value: string | null = null, +): string { + if (extraArgsDict[arg] === undefined) { + // If not present, append the argument and value (only append value if it exists) + baseCommand = value !== null ? baseCommand.concat(" ", arg, " ", value) : baseCommand.concat(" ", arg); + } + return baseCommand; +} + +export function parseArgsToDict(argString: string | undefined): Record { + const args: Record = {}; + if (argString === undefined) { + return args; + } + // Split based on '--' at the start of the string or preceded by whitespace, use non-capturing groups to include -- in parts + const parts = argString.split(/(?=\s--|^--)/).filter(Boolean); + + parts.forEach(part => { + const trimmedPart = part.trim(); + if (trimmedPart.length === 0) return; // Skip empty parts + + // Use a regular expression to find the first whitespace character + const firstWhitespaceMatch = trimmedPart.match(/\s/); + const firstWhitespaceIndex = firstWhitespaceMatch?.index; + + const key = firstWhitespaceIndex === undefined ? trimmedPart : trimmedPart.slice(0, firstWhitespaceIndex).trim(); + const value = firstWhitespaceIndex === undefined ? '' : trimmedPart.slice(firstWhitespaceIndex + 1).trim(); + + // Validate the key starts with -- followed by a non-whitespace characters + if (/^--\S+/.test(key)) { + if (args[key] !== undefined) { + args[key].push(value); + } else { + args[key] = [value]; + } + } else { + throw new Error(`Invalid argument key: '${key}'. Argument keys must start with '--' and contain no spaces.`); + } + }); + if (argString.trim() && !args) { + throw new Error(`Unable to parse args provided: '${argString}'`); + } + + return args; +} + export function createOpenSearchIAMAccessPolicy(partition: string, region: string, accountId: string): PolicyStatement { return new PolicyStatement({ effect: Effect.ALLOW, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts index 8620a50c96..f2e4793dcf 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts @@ -8,7 +8,7 @@ import {StreamingSourceType} from "../streaming-source-type"; import { MigrationSSMParameter, createMSKProducerIAMPolicies, - getMigrationStringParameterValue, + getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs, } from "../common-utilities"; import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar"; @@ -62,10 +62,21 @@ export class CaptureProxyESStack extends MigrationServiceCore { ...props, parameter: MigrationSSMParameter.KAFKA_BROKERS, }); - let command = `/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml` - command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command - command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command - command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command + + let command = "/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy" + const extraArgsDict = parseArgsToDict(props.extraArgs) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", "https://localhost:19200") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination", "https://localhost:19200") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml") + if (props.streamingSourceType !== StreamingSourceType.DISABLED) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints) + } + if (props.streamingSourceType === StreamingSourceType.AWS_MSK) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth") + } + if (props.otelCollectorEnabled) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint()) + } command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command this.createService({ diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts index ec51d65c60..abd31794a4 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts @@ -9,7 +9,7 @@ import { MigrationSSMParameter, createMSKProducerIAMPolicies, getCustomStringParameterValue, - getMigrationStringParameterValue, + getMigrationStringParameterValue, parseArgsToDict, appendArgIfNotInExtraArgs, } from "../common-utilities"; import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar"; @@ -121,10 +121,22 @@ export class CaptureProxyStack extends MigrationServiceCore { const destinationEndpoint = getDestinationEndpoint(this, props.destinationConfig, props); - let command = `/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --destinationUri ${destinationEndpoint} --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml` - command = props.streamingSourceType !== StreamingSourceType.DISABLED ? command.concat(` --kafkaConnection ${brokerEndpoints}`) : command - command = props.streamingSourceType === StreamingSourceType.AWS_MSK ? command.concat(" --enableMSKAuth") : command - command = props.otelCollectorEnabled ? command.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : command + let command = "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy" + + const extraArgsDict = parseArgsToDict(props.extraArgs) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--destinationUri", destinationEndpoint) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecureDestination") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--listenPort", "9200") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sslConfigFile", "/usr/share/elasticsearch/config/proxy_tls.yml") + if (props.streamingSourceType !== StreamingSourceType.DISABLED) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafkaConnection", brokerEndpoints) + } + if (props.streamingSourceType === StreamingSourceType.AWS_MSK) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--enableMSKAuth") + } + if (props.otelCollectorEnabled) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint()) + } command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command this.createService({ diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts index 60f3b21596..cc4d693d2c 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts @@ -11,7 +11,7 @@ import { createOpenSearchServerlessIAMAccessPolicy, getTargetPasswordAccessPolicy, getMigrationStringParameterValue, - ClusterAuth + ClusterAuth, parseArgsToDict, appendArgIfNotInExtraArgs } from "../common-utilities"; import { RFSBackfillYaml, SnapshotYaml } from "../migration-services-yaml"; import { OtelCollectorSidecar } from "./migration-otel-collector-sidecar"; @@ -68,21 +68,40 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { parameter: MigrationSSMParameter.OS_CLUSTER_ENDPOINT, }); const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`; - let rfsCommand = `/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"${s3Uri}\" --s3-region ${this.region} --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ${osClusterEndpoint}` - rfsCommand = props.clusterAuthDetails.sigv4 ? rfsCommand.concat(`--target-aws-service-signing-name ${props.clusterAuthDetails.sigv4.serviceSigningName} --target-aws-region ${props.clusterAuthDetails.sigv4.region}`) : rfsCommand - rfsCommand = props.otelCollectorEnabled ? rfsCommand.concat(` --otel-collector-endpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : rfsCommand - rfsCommand = props.sourceClusterVersion ? rfsCommand.concat(` --source-version \"${props.sourceClusterVersion}\"`) : rfsCommand - // TODO: This approach with extraArgs may not work with the entryPoint env arg processing that is occurring here. https://opensearch.atlassian.net/browse/MIGRATIONS-2025 - rfsCommand = props.extraArgs ? rfsCommand.concat(` ${props.extraArgs}`) : rfsCommand + let command = "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments" + const extraArgsDict = parseArgsToDict(props.extraArgs) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", "/tmp/s3_files") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", "/lucene") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint) + if (props.clusterAuthDetails.sigv4) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region) + } + if (props.otelCollectorEnabled) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otel-collector-endpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint()) + } + if (props.sourceClusterVersion) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--source-version", `"${props.sourceClusterVersion}"`) + } let targetUser = ""; let targetPassword = ""; let targetPasswordArn = ""; if (props.clusterAuthDetails.basicAuth) { - targetUser = props.clusterAuthDetails.basicAuth.username - targetPassword = props.clusterAuthDetails.basicAuth.password ?? "" - targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? "" - }; + // Only set user or password if not overridden in extraArgs + if (extraArgsDict["--target-username"] === undefined) { + targetUser = props.clusterAuthDetails.basicAuth.username + } + if (extraArgsDict["--target-password"] === undefined) { + targetPassword = props.clusterAuthDetails.basicAuth.password ?? "" + targetPasswordArn = props.clusterAuthDetails.basicAuth.password_from_secret_arn ?? "" + } + } + command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command + const sharedLogFileSystem = new SharedLogFileSystem(this, props.stage, props.defaultDeployId); const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account); const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account); @@ -108,7 +127,7 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { taskMemoryLimitMiB: 4096, ephemeralStorageGiB: 200, environment: { - "RFS_COMMAND": rfsCommand, + "RFS_COMMAND": command, "RFS_TARGET_USER": targetUser, "RFS_TARGET_PASSWORD": targetPassword, "RFS_TARGET_PASSWORD_ARN": targetPasswordArn, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts index f1bf7ecb06..aa363fe0b1 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts @@ -11,7 +11,7 @@ import { createMSKConsumerIAMPolicies, createOpenSearchIAMAccessPolicy, createOpenSearchServerlessIAMAccessPolicy, - getMigrationStringParameterValue + getMigrationStringParameterValue, appendArgIfNotInExtraArgs, parseArgsToDict } from "../common-utilities"; import {StreamingSourceType} from "../streaming-source-type"; import { Duration } from "aws-cdk-lib"; @@ -80,21 +80,37 @@ export class TrafficReplayerStack extends MigrationServiceCore { }); const groupId = props.customKafkaGroupId ? props.customKafkaGroupId : `logging-group-${deployId}` - let replayerCommand = `/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer ${osClusterEndpoint} --insecure --kafka-traffic-brokers ${brokerEndpoints} --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id \"${groupId}\"` + let command = `/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer ${osClusterEndpoint}` + const extraArgsDict = parseArgsToDict(props.extraArgs) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--insecure") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafka-traffic-brokers", brokerEndpoints) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafka-traffic-topic", "logging-traffic-topic") + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafka-traffic-group-id", groupId) + if (props.clusterAuthDetails.basicAuth) { - replayerCommand = replayerCommand.concat(` --auth-header-user-and-secret \"${props.clusterAuthDetails.basicAuth.username}\" \"${props.clusterAuthDetails.basicAuth.password_from_secret_arn}\""`) + const bashSafeUserAndSecret = `"${props.clusterAuthDetails.basicAuth.username}" "${props.clusterAuthDetails.basicAuth.password_from_secret_arn}"` + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--auth-header-user-and-secret", bashSafeUserAndSecret) + } + if (props.streamingSourceType === StreamingSourceType.AWS_MSK) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--kafka-traffic-enable-msk-auth") + } + if (props.userAgentSuffix) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--user-agent", `"${props.userAgentSuffix}"`) + } + if (props.clusterAuthDetails.sigv4) { + const sigv4AuthHeaderServiceRegion = `${props.clusterAuthDetails.sigv4.serviceSigningName},${props.clusterAuthDetails.sigv4.region}` + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--sigv4-auth-header-service-region", sigv4AuthHeaderServiceRegion) + } + if (props.otelCollectorEnabled) { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--otelCollectorEndpoint", OtelCollectorSidecar.getOtelLocalhostEndpoint()) } - replayerCommand = props.streamingSourceType === StreamingSourceType.AWS_MSK ? replayerCommand.concat(" --kafka-traffic-enable-msk-auth") : replayerCommand - replayerCommand = props.userAgentSuffix ? replayerCommand.concat(` --user-agent \"${props.userAgentSuffix}\"`) : replayerCommand - replayerCommand = props.clusterAuthDetails.sigv4 ? replayerCommand.concat(` --sigv4-auth-header-service-region ${props.clusterAuthDetails.sigv4.serviceSigningName},${props.clusterAuthDetails.sigv4.region}`) : replayerCommand - replayerCommand = props.otelCollectorEnabled ? replayerCommand.concat(` --otelCollectorEndpoint ${OtelCollectorSidecar.getOtelLocalhostEndpoint()}`) : replayerCommand - replayerCommand = props.extraArgs ? replayerCommand.concat(` ${props.extraArgs}`) : replayerCommand + command = props.extraArgs?.trim() ? command.concat(` ${props.extraArgs.trim()}`) : command this.createService({ serviceName: `traffic-replayer-${deployId}`, taskInstanceCount: 0, dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficReplayer"), - dockerImageCommand: ['/bin/sh', '-c', replayerCommand], + dockerImageCommand: ['/bin/sh', '-c', command], securityGroups: securityGroups, volumes: [sharedLogFileSystem.asVolume()], mountPoints: [sharedLogFileSystem.asMountPoint()], diff --git a/deployment/cdk/opensearch-service-migration/test/common-utilities.test.ts b/deployment/cdk/opensearch-service-migration/test/common-utilities.test.ts index 51ab120fe9..1ce27c069e 100644 --- a/deployment/cdk/opensearch-service-migration/test/common-utilities.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/common-utilities.test.ts @@ -1,7 +1,213 @@ import {CpuArchitecture} from "aws-cdk-lib/aws-ecs"; -import {parseClusterDefinition, validateFargateCpuArch} from "../lib/common-utilities"; +import { + parseClusterDefinition, + validateFargateCpuArch, + parseArgsToDict, + appendArgIfNotInExtraArgs +} from "../lib/common-utilities"; import {describe, test, expect} from '@jest/globals'; +describe('appendArgIfNotInExtraArgs', () => { + + // Test when the arg is not present in extraArgsDict and has a value + test('appends arg and value when arg is not in extraArgsDict', () => { + const baseCommand = 'command'; + const extraArgsDict = { + "--arg1": ["value1"] + }; + const result = appendArgIfNotInExtraArgs(baseCommand, extraArgsDict, '--arg2', 'value2'); + expect(result).toBe('command --arg2 value2'); + }); + + // Test when the arg is not present in extraArgsDict and has no value + test('appends arg without value when arg is not in extraArgsDict', () => { + const baseCommand = 'command'; + const extraArgsDict = { + "--arg1": ["value1"] + }; + const result = appendArgIfNotInExtraArgs(baseCommand, extraArgsDict, '--flag'); + expect(result).toBe('command --flag'); + }); + + // Test when the arg is already present in extraArgsDict (should not append) + test('does not append arg and value when arg is in extraArgsDict', () => { + const baseCommand = 'command'; + const extraArgsDict = { + "--arg1": ["value1"] + }; + const result = appendArgIfNotInExtraArgs(baseCommand, extraArgsDict, '--arg1', 'value1'); + expect(result).toBe('command'); // baseCommand should remain unchanged + }); + + // Test when extraArgsDict is empty (should append arg and value) + test('appends arg and value when extraArgsDict is empty', () => { + const baseCommand = 'command'; + const extraArgsDict = {}; + const result = appendArgIfNotInExtraArgs(baseCommand, extraArgsDict, '--arg1', 'value1'); + expect(result).toBe('command --arg1 value1'); + }); + + // Test when extraArgsDict is empty and arg has no value (should append only arg) + test('appends only arg when extraArgsDict is empty and value is null', () => { + const baseCommand = 'command'; + const extraArgsDict = {}; + const result = appendArgIfNotInExtraArgs(baseCommand, extraArgsDict, '--flag'); + expect(result).toBe('command --flag'); + }); +}); + +describe('parseArgsToDict', () => { + + // Test valid input with multiple arguments + test('parses valid input with multiple arguments', () => { + const input = "--valid-arg some value --another-arg more values"; + const expectedOutput = { + "--valid-arg": ["some value"], + "--another-arg": ["more values"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test valid input with special characters in values + test('parses arguments with special characters in values', () => { + const input = "--valid-arg some!@--#$%^&*() value --another-arg value with spaces"; + const expectedOutput = { + "--valid-arg": ["some!@--#$%^&*() value"], + "--another-arg": ["value with spaces"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test when there are multiple spaces between argument and value + test('parses input with multiple spaces between argument and value', () => { + const input = "--valid-arg some value with spaces --another-arg more spaces"; + const expectedOutput = { + "--valid-arg": ["some value with spaces"], + "--another-arg": ["more spaces"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with no value after an argument + test('handles argument with no value', () => { + const input = "--valid-arg --another-arg some value"; + const expectedOutput = { + "--valid-arg": [""], + "--another-arg": ["some value"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with argument at the start of the string + test('parses input with argument at the start of the string', () => { + const input = "--valid-arg start value --another-arg after value"; + const expectedOutput = { + "--valid-arg": ["start value"], + "--another-arg": ["after value"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with argument preceded by spaces + test('parses input where arguments are preceded by spaces', () => { + const input = " --valid-arg start value --another-arg after value"; + const expectedOutput = { + "--valid-arg": ["start value"], + "--another-arg": ["after value"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with empty string + test('returns empty object for empty input', () => { + const input = ""; + const expectedOutput = {}; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with argument -- + test('throws error for argument --', () => { + const input = "-- invalid arg some value"; + expect(() => parseArgsToDict(input)).toThrow("Invalid argument key: '--'. Argument keys must start with '--' and contain no spaces."); + }); + + // Test input with missing argument flag + test('throws error for missing argument flag', () => { + const input = "valid-arg some value"; + expect(() => parseArgsToDict(input)).toThrow("Invalid argument key: 'valid-arg'. Argument keys must start with '--' and contain no spaces."); + }); + + // Test valid input with multiple special characters and whitespace + test('handles multiple spaces and special characters in value', () => { + const input = "--arg1 value with @#$%^&*! --arg2 multiple spaces"; + const expectedOutput = { + "--arg1": ["value with @#$%^&*!"], + "--arg2": ["multiple spaces"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with leading and trailing whitespace + test('trims leading and trailing whitespace from arguments and values', () => { + const input = " --valid-arg some value with spaces --another-arg more values "; + const expectedOutput = { + "--valid-arg": ["some value with spaces"], + "--another-arg": ["more values"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with only flags, no values + test('handles input with only flags and no values', () => { + const input = "--flag1 --flag2"; + const expectedOutput = { + "--flag1": [""], + "--flag2": [""] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Test input with no space between flag and value + test('handles input with no space between flag and value', () => { + const input = "--flag1value --flag2"; + const expectedOutput = { + "--flag1value": [""], + "--flag2": [""] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Handles multiple occurrences of the same key + test('handles multiple occurrences of the same key', () => { + const input = "--key1 value1 --key1 value2 --key2 value3"; + const expectedOutput = { + "--key1": ["value1", "value2"], + "--key2": ["value3"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Handles multiple occurrences of the same key with empty values + test('handles multiple occurrences of the same key with empty values', () => { + const input = "--key1 --key1 value2 --key2 value3"; + const expectedOutput = { + "--key1": ["", "value2"], + "--key2": ["value3"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); + + // Handles multiple occurrences of different keys + test('handles multiple occurrences of different keys', () => { + const input = "--key1 value1 --key2 value2 --key1 value3 --key2 value4"; + const expectedOutput = { + "--key1": ["value1", "value3"], + "--key2": ["value2", "value4"] + }; + expect(parseArgsToDict(input)).toEqual(expectedOutput); + }); +}); + describe('validateFargateCpuArch', () => { test('Test valid fargate cpu arch strings can be parsed', () => { const cpuArch1 = "arm64" diff --git a/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts index b5d7ce26ce..273892b673 100644 --- a/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts @@ -115,7 +115,7 @@ describe('ReindexFromSnapshotStack Tests', () => { Value: { "Fn::Join": [ "", - [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ", + [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir /lucene --target-host ", { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", }, @@ -182,11 +182,11 @@ describe('ReindexFromSnapshotStack Tests', () => { Value: { "Fn::Join": [ "", - [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ", + [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir /lucene --target-host ", { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", }, - "--target-aws-service-signing-name aoss --target-aws-region eu-west-1", + " --target-aws-service-signing-name aoss --target-aws-region eu-west-1", ], ], } @@ -235,7 +235,7 @@ describe('ReindexFromSnapshotStack Tests', () => { expect(reindexStack.rfsSnapshotYaml.snapshot_name).toBe('rfs-snapshot'); }); - test('ReindexFromSnapshotStack correctly appends extraArgs', () => { + test('ReindexFromSnapshotStack correctly overrides with extraArgs', () => { const contextOptions = { vpcEnabled: true, reindexFromSnapshotServiceEnabled: true, @@ -271,7 +271,7 @@ describe('ReindexFromSnapshotStack Tests', () => { Value: { "Fn::Join": [ "", - [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir '/lucene' --target-host ", + [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir /tmp/s3_files --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --lucene-dir /lucene --target-host ", { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", },