Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export metrics about the import process #703

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The application imports the data from Zeebe using the [Hazelcast exporter](https

### Upgrading from a prior version

See the [upgrade instructions](./UPGRADE.md).
See the [upgrade instructions](UPGRADE.md).

### Docker

Expand Down Expand Up @@ -297,6 +297,15 @@ Refer to the [docker-compose file](docker/docker-compose.yml) for a sample confi
Please be aware that when connecting to a Redis cluster you must activate
the `useClusterClient` option.

## Metrics
The monitor exports a couple of metrics via the usual `/actuator/prometheus` endpoint.

In addition to the default metrics that are available via Spring Boot, there are some metrics exported specific
- for the import process (e.g. number of imported process instances)
- Hazelcast's ringbuffer.

All metrics are prefixed with `zeebemonitor_importer`.

## Code of Conduct

This project adheres to the Contributor Covenant [Code of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import com.hazelcast.core.HazelcastInstance;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter;
Expand All @@ -30,21 +28,9 @@ public class HazelcastImportService {
@Autowired private TimerProtobufImporter timerImporter;
@Autowired private ErrorProtobufImporter errorImporter;

@Autowired private HazelcastConfigRepository hazelcastConfigRepository;
@Autowired private HazelcastStateService hazelcastStateService;

public ZeebeHazelcast importFrom(final HazelcastInstance hazelcast) {

final var hazelcastConfig =
hazelcastConfigRepository
.findById("cfg")
.orElseGet(
() -> {
final var config = new HazelcastConfig();
config.setId("cfg");
config.setSequence(-1);
return config;
});

final var builder =
ZeebeHazelcast.newBuilder(hazelcast)
.addProcessListener(
Expand Down Expand Up @@ -79,13 +65,11 @@ record ->
messageSubscriptionImporter::importMessageStartEventSubscription))
.addErrorListener(errorImporter::importError)
.postProcessListener(
sequence -> {
hazelcastConfig.setSequence(sequence);
hazelcastConfigRepository.save(hazelcastConfig);
});
hazelcastStateService::saveSequenceNumber);

if (hazelcastConfig.getSequence() >= 0) {
builder.readFrom(hazelcastConfig.getSequence());
final var lastSequence = hazelcastStateService.getLastSequenceNumber();
if (lastSequence >= 0) {
builder.readFrom(lastSequence);
} else {
builder.readFromHead();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.zeebe.monitor.zeebe.hazelcast;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

/**
* The HazelcastStateService manages the current pointer of the Hazelcast import process.
* <p>
* That pointer is required to read the next-relevant message from the RingBuffer
* <p>
* Usually, that RingBuffer is read 1 by 1, but sometimes, the RingBuffer may overrun by the export process,
* and in that case, the Import process will set the sequence to the current position of the RingBuffer.
*/
@Component
public class HazelcastStateService {

private final HazelcastConfigRepository hazelcastConfigRepository;
private final Counter sequenceCounter;

@Autowired
public HazelcastStateService(HazelcastConfigRepository hazelcastConfigRepository, MeterRegistry meterRegistry) {
this.hazelcastConfigRepository = hazelcastConfigRepository;

sequenceCounter = Counter.builder("zeebemonitor_importer_ringbuffer_sequences_read").
description("number of items read from Hazelcast's ringbuffer (sequence counter)").
register(meterRegistry);
}

public long getLastSequenceNumber() {
return getHazelcastConfig().getSequence();
}

@Transactional
public void saveSequenceNumber(long sequence) {
HazelcastConfig config = getHazelcastConfig();

long prev = config.getSequence();

config.setSequence(sequence);

hazelcastConfigRepository.save(config);

sequenceCounter.increment(sequence - prev);
}

private HazelcastConfig getHazelcastConfig() {
return hazelcastConfigRepository
.findById("cfg")
.orElseGet(
() -> {
final var config = new HazelcastConfig();
config.setId("cfg");
config.setSequence(-1);
return config;
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.ErrorEntity;
import io.zeebe.monitor.repository.ErrorRepository;
Expand All @@ -9,7 +11,15 @@
@Component
public class ErrorProtobufImporter {

@Autowired private ErrorRepository errorRepository;
private final ErrorRepository errorRepository;
private final Counter counter;

@Autowired
public ErrorProtobufImporter(ErrorRepository errorRepository, MeterRegistry meterRegistry) {
this.errorRepository = errorRepository;

this.counter = Counter.builder("zeebemonitor_importer_error").description("number of processed errors").register(meterRegistry);
}

public void importError(final Schema.ErrorRecord record) {

Expand All @@ -32,5 +42,7 @@ public void importError(final Schema.ErrorRecord record) {
});

errorRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.IncidentEntity;
import io.zeebe.monitor.repository.IncidentRepository;
Expand All @@ -10,7 +12,17 @@
@Component
public class IncidentProtobufImporter {

@Autowired private IncidentRepository incidentRepository;
private final IncidentRepository incidentRepository;
private final Counter createdCounter;
private final Counter resolvedCounter;

@Autowired
public IncidentProtobufImporter(IncidentRepository incidentRepository, MeterRegistry meterRegistry) {
this.incidentRepository = incidentRepository;

createdCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry);
resolvedCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "resolved").description("number of processed incidents").register(meterRegistry);
}

public void importIncident(final Schema.IncidentRecord record) {

Expand Down Expand Up @@ -39,9 +51,13 @@ public void importIncident(final Schema.IncidentRecord record) {
entity.setCreated(timestamp);
incidentRepository.save(entity);

createdCounter.increment();

} else if (intent == IncidentIntent.RESOLVED) {
entity.setResolved(timestamp);
incidentRepository.save(entity);

resolvedCounter.increment();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.JobEntity;
import io.zeebe.monitor.repository.JobRepository;
Expand All @@ -10,7 +12,15 @@
@Component
public class JobProtobufImporter {

@Autowired private JobRepository jobRepository;
private final JobRepository jobRepository;
private final Counter counter;

@Autowired
public JobProtobufImporter(JobRepository jobRepository, MeterRegistry meterRegistry) {
this.jobRepository = jobRepository;

this.counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry);
}

public void importJob(final Schema.JobRecord record) {

Expand All @@ -36,5 +46,7 @@ public void importJob(final Schema.JobRecord record) {
entity.setWorker(record.getWorker());
entity.setRetries(record.getRetries());
jobRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.MessageEntity;
import io.zeebe.monitor.repository.MessageRepository;
Expand All @@ -10,7 +12,15 @@
@Component
public class MessageProtobufImporter {

@Autowired private MessageRepository messageRepository;
private final MessageRepository messageRepository;
private final Counter counter;

@Autowired
public MessageProtobufImporter(MessageRepository messageRepository, MeterRegistry meterRegistry) {
this.messageRepository = messageRepository;

this.counter = Counter.builder("zeebemonitor_importer_message").description("number of processed messages").register(meterRegistry);
}

public void importMessage(final Schema.MessageRecord record) {

Expand All @@ -35,5 +45,7 @@ public void importMessage(final Schema.MessageRecord record) {
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.MessageSubscriptionEntity;
import io.zeebe.monitor.repository.MessageSubscriptionRepository;
Expand All @@ -12,7 +14,18 @@
@Component
public class MessageSubscriptionProtobufImporter {

@Autowired private MessageSubscriptionRepository messageSubscriptionRepository;
private final MessageSubscriptionRepository messageSubscriptionRepository;
private final Counter subsCounter;
private final Counter eventCounter;

@Autowired
public MessageSubscriptionProtobufImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) {
this.messageSubscriptionRepository = messageSubscriptionRepository;

this.subsCounter =
Counter.builder("zeebemonitor_importer_message_subscription").description("number of processed message subscriptions").register(meterRegistry);
this.eventCounter = Counter.builder("zeebemonitor_importer_message_start_event_subscription").description("number of processed message start events").register(meterRegistry);
}

public void importMessageSubscription(final Schema.MessageSubscriptionRecord record) {

Expand All @@ -39,6 +52,9 @@ public void importMessageSubscription(final Schema.MessageSubscriptionRecord rec
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageSubscriptionRepository.save(entity);


subsCounter.increment();
}

public void importMessageStartEventSubscription(
Expand Down Expand Up @@ -66,6 +82,8 @@ public void importMessageStartEventSubscription(
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageSubscriptionRepository.save(entity);

eventCounter.increment();
}

private String generateId() {
Expand Down
Loading
Loading