Skip to content

Commit

Permalink
#384 Fix schema key generation issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jemacineiras committed Nov 16, 2023
2 parents 55863cb + f0478f3 commit ad65f0d
Show file tree
Hide file tree
Showing 25 changed files with 152 additions and 195 deletions.
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.7.0</version>
<version>5.6.10</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -358,6 +358,8 @@
<slf4j-api.version>2.0.0-alpha1</slf4j-api.version>
<wiremock-junit5.version>1.3.1</wiremock-junit5.version>
<wiremock.version>2.35.1</wiremock.version>
<jackson-annotations.version>2.15.2</jackson-annotations.version>
<jackson-dataformat-yaml.version>2.15.3</jackson-dataformat-yaml.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -395,19 +397,24 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
<version>${jackson-annotations.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.15.2</version>
<version>${jackson-annotations.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${jackson-dataformat-yaml.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/sngular/kloadgen/common/tools/ApiTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private ApiTool() {
public static JsonNode findNodeValue(final JsonNode node, final String valueName) {
return node.findValue(valueName);
}

public static String getType(final JsonNode schema) {
return hasType(schema) ? getNodeAsString(schema, "type") : "";
}
Expand Down Expand Up @@ -198,7 +199,7 @@ public static boolean hasAdditionalProperties(final JsonNode schema) {
return hasNode(schema, "additionalProperties");
}

public static boolean hasFormat(JsonNode schema) {
public static boolean hasFormat(final JsonNode schema) {
return hasNode(schema, "format");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.sngular.kloadgen.common.tools;

import java.util.Objects;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.lang3.StringUtils;

Expand Down
52 changes: 0 additions & 52 deletions src/main/java/com/sngular/kloadgen/common/tools/TypeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

package com.sngular.kloadgen.common.tools;

import java.util.Set;

public final class TypeConstants {

public static final String NUMBER = "number";
Expand All @@ -18,8 +16,6 @@ public final class TypeConstants {

public static final String ARRAY = "array";

public static final String BIG_DECIMAL = "bigDecimal";

public static final String INTEGER = "integer";

public static final String DOUBLE = "double";
Expand All @@ -30,58 +26,10 @@ public final class TypeConstants {

public static final String STRING = "string";

public static final String ENUM = "enum";

public static final String LOCAL_DATE = "localdate";

public static final String LOCAL_DATETIME = "localdatetime";

public static final String ZONED_DATE = "zoneddate";

public static final String ZONED_DATETIME = "zoneddatetime";

public static final String OFFSET_DATE = "offsetdate";

public static final String OFFSET_DATETIME = "offsetdatetime";

public static final String INT_32 = "int32";

public static final String INT_64 = "int64";

public static final Set<String> BASIC_OBJECT_TYPE = Set.of(NUMBER, STRING, BOOLEAN, INTEGER, ARRAY);

public static final Set<String> NO_IMPORT_TYPE = Set.of(STRING, INTEGER, OBJECT);

public static final Set<String> ALL_TYPES = Set.of(
NUMBER,
BOOLEAN,
OBJECT,
ARRAY,
BIG_DECIMAL,
INTEGER,
DOUBLE,
FLOAT,
LONG,
STRING,
ENUM,
LOCAL_DATE,
LOCAL_DATETIME,
ZONED_DATE,
ZONED_DATETIME,
OFFSET_DATE,
OFFSET_DATETIME
);

public static boolean isBoolean(final String isBoolean) {
return Boolean.parseBoolean(isBoolean.toLowerCase());
}

public enum TimeType {
LOCAL,
ZONED,
OFFSET
}

private TypeConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public SchemaRegistryConfigElementBeanInfo() {

super(SchemaRegistryConfigElement.class);

createPropertyGroup("schema_registry_config", new String[]{SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL});
createPropertyGroup("schema_registry_config", new String[] {SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES});

final PropertyDescriptor schemaRegistryName = property(SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME);
schemaRegistryName.setPropertyEditorClass(SchemaRegistryNamePropertyEditor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
}

public static List<FieldValueMapping> flatPropertiesList(final ParsedSchema parserSchema) {
return ExtractorFactory.getExtractor(parserSchema.schemaType()).processSchema(parserSchema, SchemaRegistryEnum.CONFLUENT);
return ExtractorFactory.getExtractor(parserSchema.schemaType()).processSchema(parserSchema.rawSchema(), SchemaRegistryEnum.CONFLUENT);
}

public static String readSchemaFile(final String filePath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class AsyncApiExtractorImpl implements ApiExtractor {

@Override
public final AsyncApiFile processFile(final File apiFile) {
AsyncApiFile asyncApiFile;
final AsyncApiFile asyncApiFile;
try {
final JsonNode openApi = om.readTree(apiFile);
asyncApiFile = processNode(openApi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
Expand All @@ -12,15 +13,15 @@
import com.sngular.kloadgen.extractor.extractors.protobuff.ProtobuffExtractor;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata;
import com.sngular.kloadgen.util.JMeterHelper;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.jmeter.threads.JMeterContextService;

public final class ExtractorFactory {
private static final AvroExtractor AVRO_EXTRACTOR = new AvroExtractor();
private static final AvroExtractor AVRO_EXTRACTOR = new AvroExtractor<>();

private static final JsonExtractor JSON_EXTRACTOR = new JsonExtractor();

Expand All @@ -32,13 +33,11 @@ private ExtractorFactory() {
public static <T> ExtractorRegistry<T> getExtractor(final String schemaType) {

if (schemaType != null && EnumUtils.isValidEnum(SchemaTypeEnum.class, schemaType.toUpperCase())) {
final ExtractorRegistry<T> response = switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON -> AVRO_EXTRACTOR;
case AVRO -> JSON_EXTRACTOR;
return switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON -> JSON_EXTRACTOR;
case AVRO -> AVRO_EXTRACTOR;
case PROTOBUF -> PROTOBUFF_EXTRACTOR;
default -> throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
};
return response;
} else {
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
}
Expand All @@ -48,8 +47,6 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
final Properties properties = JMeterContextService.getContext().getProperties();
final var schemaParsed = JMeterHelper.getParsedSchema(subjectName, properties);
final String registryName = properties.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
final AbstractParsedSchemaAdapter abstractParsedSchemaAdapter = schemaParsed.getParsedSchemaAdapter();
final String schemaType = abstractParsedSchemaAdapter.getType();
final AbstractParsedSchemaAdapter parsedSchemaAdapter = schemaParsed.getParsedSchemaAdapter();
final String schemaType = parsedSchemaAdapter.getType();

Expand All @@ -60,11 +57,10 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
if (Objects.nonNull(registryName)) {
//TODO change parser
schema = switch (schemaRegistryEnum) {
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) parsedSchemaAdapter).getSchema();
case APICURIO -> ((ApicurioParsedSchemaMetadata) parsedSchemaAdapter).getSchema();
case CONFLUENT -> parsedSchemaAdapter.getRawSchema();
default -> throw new KLoadGenException("Schema Registry Type nos supported " + registryName.toUpperCase());
};
attributeList = getExtractor(schemaType, registryName.toUpperCase()).processSchema(schema, schemaRegistryEnum);
attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum));
}
return Pair.of(schemaType, attributeList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;

public class AvroExtractor implements ExtractorRegistry<Object> {
public class AvroExtractor<T> implements ExtractorRegistry<T> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());
private static final Map<SchemaRegistryEnum, Extractor> schemaRegistryMap = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());

public final List<FieldValueMapping> processSchema(final Object schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schema);
public final List<FieldValueMapping> processSchema(final T schema, final SchemaRegistryEnum registryEnum) {
return schemaRegistryMap.get(registryEnum).processSchema(schema);
}

public final ParsedSchema processSchema(final String fileContent) {
return new AvroSchema(fileContent);
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).getSchemaNameList(schema);
return schemaRegistryMap.get(registryEnum).getSchemaNameList(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import io.confluent.kafka.schemaregistry.json.JsonSchema;


public class JsonExtractor implements ExtractorRegistry<Object> {
public class JsonExtractor<T> implements ExtractorRegistry<T> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new JsonDefaultExtractor(),
private static final Map<SchemaRegistryEnum, Extractor<String>> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new JsonDefaultExtractor(),
SchemaRegistryEnum.APICURIO, new JsonDefaultExtractor());

public final List<FieldValueMapping> processSchema(final Object schemaReceived, final SchemaRegistryEnum registryEnum) {
public final List<FieldValueMapping> processSchema(final T schemaReceived, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schemaReceived.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;

public class ProtobuffExtractor implements ExtractorRegistry<Object> {
public class ProtobuffExtractor<T> implements ExtractorRegistry<T> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new ProtoBufConfluentExtractor(),
SchemaRegistryEnum.APICURIO, new ProtoBufApicurioExtractor());

public final List<FieldValueMapping> processSchema(final Object schemaReceived, final SchemaRegistryEnum registryEnum) {
public final List<FieldValueMapping> processSchema(final T schemaReceived, final SchemaRegistryEnum registryEnum) {
final var resultSchema = new ArrayList<FieldValueMapping>();
if (schemaReceived instanceof ProtoFileElement) {
resultSchema.addAll(SCHEMA_REGISTRY_MAP.get(SchemaRegistryEnum.APICURIO).processSchema(schemaReceived));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.sngular.kloadgen.processor.util.SchemaProcessorUtils;
import com.sngular.kloadgen.randomtool.generator.AvroGeneratorTool;
import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter;
Expand Down Expand Up @@ -51,8 +51,8 @@ public AvroObjectCreatorFactory(final Object schema, final BaseSchemaMetadata<?
} else if (schema instanceof BaseParsedSchema) {
final BaseParsedSchema schemaParse = (BaseParsedSchema) schema;
final AbstractParsedSchemaAdapter adapterParse = schemaParse.getParsedSchemaAdapter();
if (adapterParse instanceof ApicurioAbstractParsedSchemaMetadata) {
this.schema = (Schema) ((ApicurioAbstractParsedSchemaMetadata) adapterParse).getSchema();
if (adapterParse instanceof ApicurioParsedSchemaMetadata) {
this.schema = (Schema) ((ApicurioParsedSchemaMetadata) adapterParse).getSchema();
} else {
this.schema = adapterParse.getRawSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.UUID;

import org.apache.avro.Schema;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.jmeter.threads.JMeterContextService;

public class ValueUtils {
Expand All @@ -25,11 +26,13 @@ private ValueUtils() {
}

public static List<String> replaceValuesContext(final List<String> fieldValuesList) {
final List<String> parameterList = new ArrayList<>(fieldValuesList);

parameterList.replaceAll(fieldValue ->
fieldValue.matches("\\$\\{\\w*}")
? JMeterContextService.getContext().getVariables().get(fieldValue.substring(2, fieldValue.length() - 1)) : fieldValue);
final List<String> parameterList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(fieldValuesList)) {
parameterList.addAll(fieldValuesList);
parameterList.replaceAll(fieldValue ->
fieldValue.matches("\\$\\{\\w*}")
? JMeterContextService.getContext().getVariables().get(fieldValue.substring(2, fieldValue.length() - 1)) : fieldValue);
}
return parameterList;
}

Expand Down
Loading

0 comments on commit ad65f0d

Please sign in to comment.