Skip to content

Commit

Permalink
Init Subscribing events for Ingestor repository
Browse files Browse the repository at this point in the history
  • Loading branch information
isontheline committed May 28, 2024
1 parent dd50479 commit 0218552
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
14 changes: 14 additions & 0 deletions backend/src/main/java/ai/dragon/repository/AbstractRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.util.UUID;

import org.dizitart.no2.Nitrite;
import org.dizitart.no2.collection.events.CollectionEventInfo;
import org.dizitart.no2.collection.events.CollectionEventListener;
import org.dizitart.no2.collection.events.EventType;
import org.dizitart.no2.filters.Filter;
import org.dizitart.no2.filters.FluentFilter;
import org.dizitart.no2.repository.Cursor;
Expand Down Expand Up @@ -114,6 +116,18 @@ public void subscribe(CollectionEventListener listener) {
repository.subscribe(listener);
}

public void subscribe(EventType eventType, CollectionEventListener listener) {
CollectionEventListener filterListener = new CollectionEventListener() {
@Override
public void onEvent(CollectionEventInfo<?> collectionEventInfo) {
if (collectionEventInfo.getEventType() == eventType) {
listener.onEvent(collectionEventInfo);
}
}
};
subscribe(filterListener);
}

public void unsubscribe(CollectionEventListener listener) {
Nitrite db = databaseService.getNitriteDB();
ObjectRepository<T> repository = db.getRepository(getGenericSuperclass());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
package ai.dragon.service;

import org.dizitart.no2.collection.events.CollectionEventInfo;
import org.dizitart.no2.collection.events.CollectionEventListener;
import org.dizitart.no2.collection.events.EventType;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.jobs.annotations.Recurring;
import org.jobrunr.jobs.context.JobContext;
import org.jobrunr.jobs.context.JobRunrDashboardLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import ai.dragon.repository.IngestorRepository;

@Service
public class IngestorService {
public class IngestorJobService {
private final Logger logger = new JobRunrDashboardLogger(LoggerFactory.getLogger(this.getClass()));

public static final String INGESTOR_RECURRING_JOB_ID = "ingestor-recurring-job";

@Autowired
private IngestorRepository ingestorRepository;

public void onApplicationStartup() {
ingestorRepository.subscribe(new CollectionEventListener() {
@Override
public void onEvent(CollectionEventInfo<?> collectionEventInfo) {
if(EventType.Insert.equals(collectionEventInfo.getEventType())) {

}
}
});
}

@Recurring(id = INGESTOR_RECURRING_JOB_ID, cron = "* * * * *")
@Job(name = "Ingestor Recurring Job")
public void executeSampleJob(JobContext jobContext) {
Expand Down
16 changes: 15 additions & 1 deletion backend/src/main/java/ai/dragon/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ public class JobService {
@Autowired
private StorageProvider storageProvider;

@Autowired
private IngestorJobService ingestorJobService;

public void onApplicationStartup() {
this.triggerRecurringJob(IngestorService.INGESTOR_RECURRING_JOB_ID);
ingestorJobService.onApplicationStartup();
}

public void triggerRecurringJob(String id) {
Expand All @@ -30,6 +33,17 @@ public void triggerRecurringJob(String id) {
storageProvider.save(job);
}

public void stopRecurringJob(String id) {
storageProvider.deleteRecurringJob(id);
}

public void stopAllRecurringJobs() {
recurringJobResults()
.stream()
.map(RecurringJob::getId)
.forEach(storageProvider::deleteRecurringJob);
}

private RecurringJobsResult recurringJobResults() {
if (recurringJobsResult == null
|| storageProvider.recurringJobsUpdated(recurringJobsResult.getLastModifiedHash())) {
Expand Down

0 comments on commit 0218552

Please sign in to comment.