Skip to content

Commit

Permalink
Merge pull request #3961 from p-kimberley/elastic-java-api-client
Browse files Browse the repository at this point in the history
Migrate to Elasticsearch Java API Client
  • Loading branch information
stroomdev66 committed Jul 18, 2024
2 parents c00366d + 1804053 commit 7686a7f
Show file tree
Hide file tree
Showing 28 changed files with 900 additions and 664 deletions.
10 changes: 3 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ ext.versions = [
bnd : '7.0.0',
curator : '4.2.0', // Curator 4 works with ZK 3.4.x in soft compatibility mode, i.e. you must exlude its dep on ZK and explicitly add one for 3.4.x
dropwizard : '4.0.4', // used to set the dropwizard-bom version, that controls lots of dependency versions
elasticsearch : '7.17.14',
elasticsearch : '8.11.2',
httpcore : '4.4.16', // Transient dependency of Elasticsearch
flyway : '10.0.0',
guice : '7.0.0',
Expand Down Expand Up @@ -196,10 +196,7 @@ ext.libs = [
dropwizard_validation : "io.dropwizard:dropwizard-validation", // version controlled by dropwizard-dependencies
//dropwizard_websockets : "com.liveperson:dropwizard-websockets:1.3.14",
eclipse_transformer_cli : "org.eclipse.transformer:org.eclipse.transformer.cli:0.5.0",
elasticsearch_rest_high_level_client: "org.elasticsearch.client:elasticsearch-rest-high-level-client:$versions.elasticsearch",
elasticsearch_rest_client : "org.elasticsearch.client:elasticsearch-rest-client:$versions.elasticsearch",
elasticsearch : "org.elasticsearch:elasticsearch:$versions.elasticsearch",
elasticsearch_core : "org.elasticsearch:elasticsearch-core:$versions.elasticsearch",
elasticsearch_java : "co.elastic.clients:elasticsearch-java:$versions.elasticsearch",
fast_infoset : "com.sun.xml.fastinfoset:FastInfoset:2.1.1",
flyway_core : "org.flywaydb:flyway-core:${versions.flyway}",
flyway_mysql : "org.flywaydb:flyway-mysql:${versions.flyway}",
Expand All @@ -220,9 +217,7 @@ ext.libs = [
hibernate_jpa : "org.hibernate.javax.persistence:hibernate-jpa-2.1-api:1.0.0.Final", // remove when legacy code is removed
hikari : "com.zaxxer:HikariCP:5.1.0",
http_client : "org.apache.httpcomponents.client5:httpclient5", // version controlled by dropwizard-dependencies
http_client__elastic : "org.apache.httpcomponents:httpclient:4.5.10", // version controlled by dropwizard-dependencies
httpcore : "org.apache.httpcomponents.core5:httpcore5:$versions.httpcore",
httpcore__elastic : "org.apache.httpcomponents:httpcore:4.4.12",
jBCrypt : "de.svenkubiak:jBCrypt:0.4.3",
jackson_annotations : "com.fasterxml.jackson.core:jackson-annotations", // version controlled by dropwizard-dependencies
//jackson_core : "com.fasterxml.jackson.core:jackson-core:2.9.10", // version controlled by dropwizard-dependencies
Expand All @@ -233,6 +228,7 @@ ext.libs = [
jakarta_activation : "jakarta.activation:jakarta.activation-api", // version controlled by dropwizard-dependencies
jakarta_annotation_api : "jakarta.annotation:jakarta.annotation-api", // version controlled by dropwizard-dependencies
jakarta_el : "org.glassfish:jakarta.el", // version controlled by dropwizard-dependencies
jakarta_json_api : "jakarta.json:jakarta.json-api:2.0.1",
jakarta_servlet_api : "jakarta.servlet:jakarta.servlet-api", // version controlled by dropwizard-dependencies
jakarta_validation_api : "jakarta.validation:jakarta.validation-api", // version controlled by dropwizard-dependencies
java_diff_utils : "io.github.java-diff-utils:java-diff-utils:4.11",
Expand Down
1 change: 0 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ include 'stroom-receive:stroom-receive-common'
include 'stroom-receive:stroom-receive-rules-impl'

// Search
include 'stroom-search:elastic-shaded'
include 'stroom-search:stroom-expression-matcher'
include 'stroom-search:stroom-search-api'
include 'stroom-search:stroom-search-elastic'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ appConfig:
initialRetryBackoffPeriodMs: 1000
maxNestedElementDepth: 10
retryCount: 10
retention:
scrollSize: 10000
search:
highlight: true
scrollDuration: "PT1M"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,15 @@ stroom.search.elastic.search.ElasticSearchConfig getElasticSearchConfig(
stroom.search.elastic.search.ElasticSearchConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.search.elastic.ElasticRetentionConfig getElasticRetentionConfig(
final ConfigMapper configMapper) {
return configMapper.getConfigObject(
stroom.search.elastic.ElasticRetentionConfig.class);
}

@Generated("stroom.config.global.impl.GenerateConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public interface ElasticIndexConstants {
String STREAM_ID = "StreamId";
String EVENT_ID = "EventId";
String FEED_ID = "FeedId";
String INDEX_ID = "_index";
String INDEX_NAME = "_index";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ElasticNativeTypes {
NATIVE_TYPE_MAP.put("byte", FieldType.INTEGER);
NATIVE_TYPE_MAP.put("version", FieldType.INTEGER);

NATIVE_TYPE_MAP.put("long", FieldType.LONG);
NATIVE_TYPE_MAP.put("unsigned_long", FieldType.LONG);

NATIVE_TYPE_MAP.put("float", FieldType.FLOAT);
Expand All @@ -26,6 +27,7 @@ public class ElasticNativeTypes {
NATIVE_TYPE_MAP.put("double", FieldType.DOUBLE);

NATIVE_TYPE_MAP.put("date", FieldType.DATE);
NATIVE_TYPE_MAP.put("date_nanos", FieldType.DATE);

NATIVE_TYPE_MAP.put("text", FieldType.TEXT);

Expand All @@ -39,12 +41,13 @@ public class ElasticNativeTypes {
/**
* Given a native Elasticsearch data type, return an equivalent Stroom field type
*/
public static FieldType fromNativeType(final String fieldName, final String nativeType) {
public static FieldType fromNativeType(final String fieldName, final String nativeType)
throws UnsupportedTypeException {
if (NATIVE_TYPE_MAP.containsKey(nativeType)) {
return NATIVE_TYPE_MAP.get(nativeType);
}

throw new IllegalArgumentException("Field '" + fieldName + "' has an unsupported mapping type '" +
throw new UnsupportedTypeException("Field '" + fieldName + "' has an unsupported mapping type '" +
nativeType + "'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package stroom.search.elastic.shared;

public class UnsupportedTypeException extends Exception {

public UnsupportedTypeException(final String message) {
super(message);
}
}
38 changes: 38 additions & 0 deletions stroom-core-shared/src/test/java/TestElasticIndexFieldType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import stroom.datasource.api.v2.FieldType;
import stroom.search.elastic.shared.ElasticNativeTypes;
import stroom.search.elastic.shared.UnsupportedTypeException;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestElasticIndexFieldType {

@Test
public void testFromNativeType() throws UnsupportedTypeException {
assertSame(FieldType.BOOLEAN,
ElasticNativeTypes.fromNativeType("name", "boolean"));
assertSame(FieldType.INTEGER,
ElasticNativeTypes.fromNativeType("name", "integer"));
assertSame(FieldType.LONG,
ElasticNativeTypes.fromNativeType("name", "long"));
assertSame(FieldType.FLOAT,
ElasticNativeTypes.fromNativeType("name", "float"));
assertSame(FieldType.DOUBLE,
ElasticNativeTypes.fromNativeType("name", "double"));
assertSame(FieldType.DATE,
ElasticNativeTypes.fromNativeType("name", "date"));
assertSame(FieldType.TEXT,
ElasticNativeTypes.fromNativeType("name", "text"));
}

@Test
public void testIsNumeric() throws UnsupportedTypeException {
assertTrue(FieldType.INTEGER.isNumeric());
assertTrue(FieldType.LONG.isNumeric());
assertFalse(FieldType.FLOAT.isNumeric());
assertFalse(FieldType.DOUBLE.isNumeric());
}
}
37 changes: 0 additions & 37 deletions stroom-search/elastic-shaded/build.gradle

This file was deleted.

5 changes: 2 additions & 3 deletions stroom-search/stroom-search-elastic/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencies {
implementation project(':stroom-query:stroom-query-api')
implementation project(':stroom-query:stroom-query-common')
implementation project(':stroom-suggestions:stroom-suggestions-api')
implementation project(path: ":stroom-search:elastic-shaded", configuration: 'shadow') // Use shadow JAR
implementation project(':stroom-search:stroom-search-extraction')
implementation project(':stroom-search:stroom-search-api')
implementation project(':stroom-security:stroom-security-api')
Expand All @@ -29,12 +28,12 @@ dependencies {
implementation project(':stroom-util-shared')

implementation libs.dropwizard_metrics_annotation
implementation libs.elasticsearch_java
implementation libs.eventLogging
implementation libs.guice
implementation libs.httpcore__elastic
implementation libs.http_client__elastic
implementation libs.jackson_annotations
implementation libs.jakarta_inject
implementation libs.jakarta_json_api
implementation libs.restygwt
implementation libs.saxon_he
implementation libs.slf4j_api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import stroom.search.elastic.shared.ElasticConnectionConfig;

import org.elasticsearch.client.RestHighLevelClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;

import java.util.function.Consumer;
import java.util.function.Function;

public interface ElasticClientCache {
void context(ElasticConnectionConfig key, Consumer<RestHighLevelClient> consumer);
void context(ElasticConnectionConfig key, Consumer<ElasticsearchClient> consumer);

<R> R contextResult(ElasticConnectionConfig key, Function<RestHighLevelClient, R> function);
<R> R contextResult(ElasticConnectionConfig key, Function<ElasticsearchClient, R> function);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.Clearable;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.IdentityHashMap;
Expand All @@ -40,8 +40,8 @@ public class ElasticClientCacheImpl implements ElasticClientCache, Clearable {
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ElasticClientCacheImpl.class);
private static final String CACHE_NAME = "Elastic Client Cache";

private final LoadingStroomCache<ElasticConnectionConfig, RestHighLevelClient> cache;
private final IdentityHashMap<RestHighLevelClient, State> useMap = new IdentityHashMap<>();
private final LoadingStroomCache<ElasticConnectionConfig, ElasticsearchClient> cache;
private final IdentityHashMap<ElasticsearchClient, State> useMap = new IdentityHashMap<>();

private final Provider<ElasticConfig> elasticConfigProvider;

Expand All @@ -56,7 +56,7 @@ public class ElasticClientCacheImpl implements ElasticClientCache, Clearable {
this::destroy);
}

private RestHighLevelClient create(ElasticConnectionConfig elasticConnectionConfig) {
private ElasticsearchClient create(ElasticConnectionConfig elasticConnectionConfig) {
if (elasticConnectionConfig == null) {
throw new NullPointerException("Elasticsearch connection config not provided");
}
Expand All @@ -66,7 +66,7 @@ private RestHighLevelClient create(ElasticConnectionConfig elasticConnectionConf
elasticConfigProvider.get().getClientConfig());
}

private void destroy(final ElasticConnectionConfig key, final RestHighLevelClient value) {
private void destroy(final ElasticConnectionConfig key, final ElasticsearchClient value) {
synchronized (this) {
final State state = useMap.get(value);
state.stale = true;
Expand All @@ -78,8 +78,8 @@ private void destroy(final ElasticConnectionConfig key, final RestHighLevelClien
}

@Override
public void context(final ElasticConnectionConfig key, final Consumer<RestHighLevelClient> consumer) {
final RestHighLevelClient client = borrowClient(key);
public void context(final ElasticConnectionConfig key, final Consumer<ElasticsearchClient> consumer) {
final ElasticsearchClient client = borrowClient(key);
try {
consumer.accept(client);
} finally {
Expand All @@ -88,24 +88,24 @@ public void context(final ElasticConnectionConfig key, final Consumer<RestHighLe
}

@Override
public <R> R contextResult(final ElasticConnectionConfig key, final Function<RestHighLevelClient, R> function) {
final RestHighLevelClient client = borrowClient(key);
public <R> R contextResult(final ElasticConnectionConfig key, final Function<ElasticsearchClient, R> function) {
final ElasticsearchClient client = borrowClient(key);
try {
return function.apply(client);
} finally {
returnClient(client);
}
}

private RestHighLevelClient borrowClient(final ElasticConnectionConfig key) {
final RestHighLevelClient client = cache.get(key);
private ElasticsearchClient borrowClient(final ElasticConnectionConfig key) {
final ElasticsearchClient client = cache.get(key);
synchronized (this) {
useMap.computeIfAbsent(client, k -> new State()).increment();
}
return client;
}

private void returnClient(final RestHighLevelClient client) {
private void returnClient(final ElasticsearchClient client) {
synchronized (this) {
final State state = useMap.get(client);
state.decrement();
Expand All @@ -116,9 +116,9 @@ private void returnClient(final RestHighLevelClient client) {
}
}

private void close(final RestHighLevelClient client) {
private void close(final ElasticsearchClient client) {
try {
client.close();
client._transport().close();
} catch (final RuntimeException | IOException e) {
LOGGER.error(e::getMessage, e);
}
Expand Down
Loading

0 comments on commit 7686a7f

Please sign in to comment.