Skip to content

Commit

Permalink
Merge branch 'main' into remove-awscli-dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
akanshi-elastic authored Jun 21, 2023
2 parents 5c0d8b4 + d7172db commit 261b3dd
Show file tree
Hide file tree
Showing 38 changed files with 1,274 additions and 812 deletions.
4 changes: 2 additions & 2 deletions .buildkite/nightly_steps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ steps:
SKIP_AARCH64: "true"
artifact_paths:
- "perf8-report-*/**/*"
- label: "🔨 Sharepoint"
- label: "🔨 Sharepoint Server"
command:
- ".buildkite/run_nigthly.sh sharepoint extra_small"
- ".buildkite/run_nigthly.sh sharepoint_server extra_small"
artifact_paths:
- "perf8-report-*/**/*"
- label: "🔨 Microsoft SQL"
Expand Down
9 changes: 5 additions & 4 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ service:
job_cleanup_interval: 300
log_level: INFO

connector_id: 'changeme'
service_type: 'changeme'

extraction_service:
enabled: false
host: http://localhost:8090
text_extraction:
use_file_pointers: false
method: 'tika'

connector_id: 'changeme'
service_type: 'changeme'

sources:
mongodb: connectors.sources.mongo:MongoDataSource
s3: connectors.sources.s3:S3DataSource
Expand All @@ -46,7 +47,7 @@ sources:
azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
postgresql: connectors.sources.postgresql:PostgreSQLDataSource
oracle: connectors.sources.oracle:OracleDataSource
sharepoint: connectors.sources.sharepoint:SharepointDataSource
sharepoint_server: connectors.sources.sharepoint_server:SharepointServerDataSource
mssql: connectors.sources.mssql:MSSQLDataSource
jira: connectors.sources.jira:JiraDataSource
confluence: connectors.sources.confluence:ConfluenceDataSource
Expand Down
3 changes: 2 additions & 1 deletion connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ async def _deferred_index(self, lazy_download, doc_id, doc, operation):
self.total_downloads += 1
data.pop("_id", None)
data.pop(TIMESTAMP_FIELD, None)
data.pop("_tempfile_suffix", None)
doc.update(data)

doc.pop("_original_filename", None)

await self.queue.put(
{
"_op_type": operation,
Expand Down
35 changes: 35 additions & 0 deletions connectors/preflight_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# you may not use this file except in compliance with the Elastic License 2.0.
#

import aiohttp

from connectors.es import ESClient
from connectors.logger import logger
from connectors.protocol import CONNECTORS_INDEX, JOBS_INDEX
Expand All @@ -15,6 +17,7 @@ def __init__(self, config):
self.config = config
self.elastic_config = config["elasticsearch"]
self.service_config = config["service"]
self.extraction_config = config["extraction_service"]
self.es_client = ESClient(self.elastic_config)
self.preflight_max_attempts = int(
self.service_config.get("preflight_max_attempts", 10)
Expand All @@ -41,6 +44,8 @@ async def run(self):
logger.critical(f"{self.elastic_config['host']} seem down. Bye!")
return False

await self._check_local_extraction_setup()

valid_configuration = self._validate_configuration()
available_system_indices = await self._check_system_indices_with_retries()
return valid_configuration and available_system_indices
Expand All @@ -49,6 +54,36 @@ async def run(self):
if self.es_client is not None:
await self.es_client.close()

async def _check_local_extraction_setup(self):
if not self.extraction_config.get("enabled", False):
return

timeout = aiohttp.ClientTimeout(total=5)
session = aiohttp.ClientSession(timeout=timeout)

try:
async with session.get(
f"{self.extraction_config['host']}/ping/"
) as response:
if response.status != 200:
logger.warning(
f"Data extraction service was found at {self.extraction_config['host']} but health-check returned `{response.status}'."
)
else:
logger.info(
f"Data extraction service found at {self.extraction_config['host']}."
)
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError) as e:
logger.critical(
f"Expected to find a running instance of data extraction service at {self.extraction_config['host']} but failed. {e}."
)
except Exception as e:
logger.critical(
f"Unexpected error occurred while attempting to connect to data extraction service at {self.extraction_config['host']}. {e}."
)
finally:
await session.close()

async def _check_system_indices_with_retries(self):
attempts = 0
while self.running:
Expand Down
4 changes: 3 additions & 1 deletion connectors/services/job_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class JobCleanUpService(BaseService):
def __init__(self, config):
super().__init__(config)
self.idling = int(self.service_config.get("job_cleanup_interval", 60 * 5))
self.native_service_types = self.config.get("native_service_types", [])
self.native_service_types = self.config.get("native_service_types")
if self.native_service_types is None:
self.native_service_types = []
if "connector_id" in self.config:
self.connector_ids = [self.config.get("connector_id")]
else:
Expand Down
4 changes: 3 additions & 1 deletion connectors/services/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ async def _run(self):
self.connector_index = ConnectorIndex(self.es_config)
self.sync_job_index = SyncJobIndex(self.es_config)

native_service_types = self.config.get("native_service_types", [])
native_service_types = self.config.get("native_service_types")
if native_service_types is None:
native_service_types = []
logger.debug(f"Native support for {', '.join(native_service_types)}")

# TODO: we can support multiple connectors but Ruby can't so let's use a
Expand Down
4 changes: 3 additions & 1 deletion connectors/services/job_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ async def _run(self):
self.connector_index = ConnectorIndex(self.es_config)
self.sync_job_index = SyncJobIndex(self.es_config)

native_service_types = self.config.get("native_service_types", [])
native_service_types = self.config.get("native_service_types")
if native_service_types is None:
native_service_types = []
logger.debug(f"Native support for {', '.join(native_service_types)}")

# TODO: we can support multiple connectors but Ruby can't so let's use a
Expand Down
Loading

0 comments on commit 261b3dd

Please sign in to comment.