From 0218552ee06e7914427b935d21444758252cfe0a Mon Sep 17 00:00:00 2001 From: Arnaud Mengus <44212923+isontheline@users.noreply.github.com> Date: Tue, 28 May 2024 22:16:15 +0200 Subject: [PATCH] Init Subscribing events for Ingestor repository --- .../dragon/repository/AbstractRepository.java | 14 ++++++++++++ ...orService.java => IngestorJobService.java} | 22 ++++++++++++++++++- .../java/ai/dragon/service/JobService.java | 16 +++++++++++++- 3 files changed, 50 insertions(+), 2 deletions(-) rename backend/src/main/java/ai/dragon/service/{IngestorService.java => IngestorJobService.java} (50%) diff --git a/backend/src/main/java/ai/dragon/repository/AbstractRepository.java b/backend/src/main/java/ai/dragon/repository/AbstractRepository.java index de526b23..82bd1dda 100644 --- a/backend/src/main/java/ai/dragon/repository/AbstractRepository.java +++ b/backend/src/main/java/ai/dragon/repository/AbstractRepository.java @@ -6,7 +6,9 @@ import java.util.UUID; import org.dizitart.no2.Nitrite; +import org.dizitart.no2.collection.events.CollectionEventInfo; import org.dizitart.no2.collection.events.CollectionEventListener; +import org.dizitart.no2.collection.events.EventType; import org.dizitart.no2.filters.Filter; import org.dizitart.no2.filters.FluentFilter; import org.dizitart.no2.repository.Cursor; @@ -114,6 +116,18 @@ public void subscribe(CollectionEventListener listener) { repository.subscribe(listener); } + public void subscribe(EventType eventType, CollectionEventListener listener) { + CollectionEventListener filterListener = new CollectionEventListener() { + @Override + public void onEvent(CollectionEventInfo collectionEventInfo) { + if (collectionEventInfo.getEventType() == eventType) { + listener.onEvent(collectionEventInfo); + } + } + }; + subscribe(filterListener); + } + public void unsubscribe(CollectionEventListener listener) { Nitrite db = databaseService.getNitriteDB(); ObjectRepository repository = db.getRepository(getGenericSuperclass()); diff --git a/backend/src/main/java/ai/dragon/service/IngestorService.java b/backend/src/main/java/ai/dragon/service/IngestorJobService.java similarity index 50% rename from backend/src/main/java/ai/dragon/service/IngestorService.java rename to backend/src/main/java/ai/dragon/service/IngestorJobService.java index 084c7e0c..41611681 100644 --- a/backend/src/main/java/ai/dragon/service/IngestorService.java +++ b/backend/src/main/java/ai/dragon/service/IngestorJobService.java @@ -1,19 +1,39 @@ package ai.dragon.service; +import org.dizitart.no2.collection.events.CollectionEventInfo; +import org.dizitart.no2.collection.events.CollectionEventListener; +import org.dizitart.no2.collection.events.EventType; import org.jobrunr.jobs.annotations.Job; import org.jobrunr.jobs.annotations.Recurring; import org.jobrunr.jobs.context.JobContext; import org.jobrunr.jobs.context.JobRunrDashboardLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import ai.dragon.repository.IngestorRepository; + @Service -public class IngestorService { +public class IngestorJobService { private final Logger logger = new JobRunrDashboardLogger(LoggerFactory.getLogger(this.getClass())); public static final String INGESTOR_RECURRING_JOB_ID = "ingestor-recurring-job"; + @Autowired + private IngestorRepository ingestorRepository; + + public void onApplicationStartup() { + ingestorRepository.subscribe(new CollectionEventListener() { + @Override + public void onEvent(CollectionEventInfo collectionEventInfo) { + if(EventType.Insert.equals(collectionEventInfo.getEventType())) { + + } + } + }); + } + @Recurring(id = INGESTOR_RECURRING_JOB_ID, cron = "* * * * *") @Job(name = "Ingestor Recurring Job") public void executeSampleJob(JobContext jobContext) { diff --git a/backend/src/main/java/ai/dragon/service/JobService.java b/backend/src/main/java/ai/dragon/service/JobService.java index 4253627f..7f4e3462 100644 --- a/backend/src/main/java/ai/dragon/service/JobService.java +++ b/backend/src/main/java/ai/dragon/service/JobService.java @@ -15,8 +15,11 @@ public class JobService { @Autowired private StorageProvider storageProvider; + @Autowired + private IngestorJobService ingestorJobService; + public void onApplicationStartup() { - this.triggerRecurringJob(IngestorService.INGESTOR_RECURRING_JOB_ID); + ingestorJobService.onApplicationStartup(); } public void triggerRecurringJob(String id) { @@ -30,6 +33,17 @@ public void triggerRecurringJob(String id) { storageProvider.save(job); } + public void stopRecurringJob(String id) { + storageProvider.deleteRecurringJob(id); + } + + public void stopAllRecurringJobs() { + recurringJobResults() + .stream() + .map(RecurringJob::getId) + .forEach(storageProvider::deleteRecurringJob); + } + private RecurringJobsResult recurringJobResults() { if (recurringJobsResult == null || storageProvider.recurringJobsUpdated(recurringJobsResult.getLastModifiedHash())) {