Skip to content

Commit

Permalink
161 signal errors via MQTT and Plugin API (example: Empty pellet cont…
Browse files Browse the repository at this point in the history
…ainer) (#162)

* Add a new MQTT channel for notifications, a new extension for plugins to allow them react on rika errors
That way we can react to RIKA errors in a more convenient way
  • Loading branch information
sebastienvermeille authored Jan 20, 2024
1 parent 9d4baac commit 09b10a8
Show file tree
Hide file tree
Showing 21 changed files with 565 additions and 13 deletions.
5 changes: 3 additions & 2 deletions .code/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ services:
#optional properties
# - MQTT_URI_SCHEME=tcp://
# - MQTT_PORT=1883
# - MQTT_COMMAND_TOPIC_NAME=cmnd/rika2mqtt
# - MQTT_TELEMETRY_REPORT_TOPIC_NAME=tele/rika2mqtt
# - MQTT_COMMAND_TOPIC_NAME=rika2mqtt/commands
# - MQTT_TELEMETRY_REPORT_TOPIC_NAME=rika2mqtt/telemetry
# - MQTT_NOTIFICATION_TOPIC_NAME=rika2mqtt/notifications
# - PLUGINS=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar


Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ Default: `cmnd/rika2mqtt`
The MQTT topic used by RIKA2MQTT to publish RIKA status
Default: `tele/rika2mqtt`

## MQTT_ERROR_TOPIC_NAME (Optional)
The MQTT topic used by RIKA2MQTT to publish RIKA errors
Default: `tele/rika2mqtt-errors`

## MQTT_PORT (Optional)

The port of your MQTT instance
Expand Down
16 changes: 16 additions & 0 deletions bridge/src/main/java/dev/cookiecode/rika2mqtt/bridge/Bridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import dev.cookiecode.rika2mqtt.bridge.misc.EmailObfuscator;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.Rika2MqttPluginService;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.event.PolledStoveStatusEvent;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.event.StoveErrorEvent;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.mapper.StoveErrorMapper;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.mapper.StoveStatusMapper;
import dev.cookiecode.rika2mqtt.rika.firenet.RikaFirenetService;
import dev.cookiecode.rika2mqtt.rika.firenet.exception.CouldNotAuthenticateToRikaFirenetException;
Expand Down Expand Up @@ -81,6 +83,7 @@ public class Bridge {
private final EmailObfuscator emailObfuscator;
private final Gson gson;
private final StoveStatusMapper stoveStatusMapper;
private final StoveErrorMapper stoveErrorMapper;

private final Rika2MqttPluginService pluginManager;

Expand Down Expand Up @@ -140,6 +143,19 @@ void publishToMqtt() {
PolledStoveStatusEvent.builder()
.stoveStatus(stoveStatusMapper.toApiStoveStatus(status))
.build());

status
.getError()
.ifPresent(
stoveError -> {
final var enrichedStoveError =
stoveErrorMapper.toApiStoveError(stoveId.id(), stoveError);
final var jsonError = gson.toJson(enrichedStoveError);
mqttService.publishNotification(jsonError);

applicationEventPublisher.publishEvent(
StoveErrorEvent.builder().stoveError(enrichedStoveError).build());
});
} catch (InvalidStoveIdException e) {
// TODO: could occurs if a stove is added later (after deployment of this rika2mqtt
// instance, might be valuable to perform a reload of stoves "periodically") -> should
Expand Down
1 change: 1 addition & 0 deletions bridge/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mqtt:
username: ${MQTT_USERNAME:""}
password: ${MQTT_PASSWORD:""}
telemetry-report-topic-name: ${MQTT_TELEMETRY_REPORT_TOPIC_NAME:tele/rika2mqtt}
notification-topic-name: ${MQTT_NOTIFICATION_TOPIC_NAME:tele/rika2mqtt-notifications}
command-topic-name: ${MQTT_COMMAND_TOPIC_NAME:cmnd/rika2mqtt}
bridge:
reportInterval: PT60S
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ void publishMqttMessageFromBridgeShouldEffectivelyPublishAMessageToMqtt() {
() -> mqttService.publish(message));
}

@Test
void publishErrorMqttMessageFromBridgeShouldEffectivelyPublishAnErrorMessageToMqtt() {
String message = "some error";

// Here, another mqtt client connect to the telemetry topic
// after using the bridge mqttService to publish to mqtt,
// the MQTT test client (outside the rika2mqtt bridge) should be able to receive that message
getMqttTestClient()
.assertThatMessageWasPublishedToMqttTopic(
message,
mqttConfigProperties.getNotificationTopicName(),
() -> mqttService.publishNotification(message));
}

private MqttTestClient getMqttTestClient() {
try {
var client = getRandomMqttClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,23 @@
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import com.google.gson.Gson;
import dev.cookiecode.rika2mqtt.bridge.misc.EmailObfuscator;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.Rika2MqttPluginService;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.mapper.StoveErrorMapper;
import dev.cookiecode.rika2mqtt.plugins.internal.v1.mapper.StoveStatusMapper;
import dev.cookiecode.rika2mqtt.rika.firenet.RikaFirenetService;
import dev.cookiecode.rika2mqtt.rika.firenet.model.StoveError;
import dev.cookiecode.rika2mqtt.rika.firenet.model.StoveId;
import dev.cookiecode.rika2mqtt.rika.firenet.model.StoveStatus;
import dev.cookiecode.rika2mqtt.rika.mqtt.MqttService;
import dev.cookiecode.rika2mqtt.rika.mqtt.event.MqttCommandEvent;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -70,6 +71,7 @@ class BridgeTest {
@Mock EmailObfuscator emailObfuscator;
@Mock Gson gson;
@Mock StoveStatusMapper stoveStatusMapper;
@Mock StoveErrorMapper stoveErrorMapper;

@Mock Rika2MqttPluginService pluginManager;

Expand All @@ -82,10 +84,11 @@ void setUp() {
}

@Test
void initShouldInitStovesWithRetrieveStovesFromRikaFirenet() {
void initShouldInitStovesWithRetrieveStovesFromRikaFirenet() throws Exception {
// GIVEN
List<StoveId> stoveIds = List.of(StoveId.of(15L));
when(rikaFirenetService.getStoves()).thenReturn(stoveIds);
when(rikaFirenetService.getStatus(any())).thenReturn(mock(StoveStatus.class));

// WHEN
bridge.init();
Expand All @@ -96,9 +99,10 @@ void initShouldInitStovesWithRetrieveStovesFromRikaFirenet() {
}

@Test
void initShouldInvokePrintStartupMessages() {
void initShouldInvokePrintStartupMessages() throws Exception {
// GIVEN
when(rikaFirenetService.getStoves()).thenReturn(List.of(StoveId.of(15L)));
when(rikaFirenetService.getStatus(any())).thenReturn(mock(StoveStatus.class));

// WHEN
bridge.init();
Expand Down Expand Up @@ -168,10 +172,11 @@ void printStartupMessagesShouldPrintLogMessageWhenStovesAreLinkedToTheAccount(
}

@Test
void publishToMqttShouldInvokeMqttServicePublishForEachStove() {
void publishToMqttShouldInvokeMqttServicePublishForEachStove() throws Exception {
// GIVEN
final var stoves = List.of(StoveId.of(1L), StoveId.of(2L), StoveId.of(3L));
bridge.initStoves(stoves);
when(rikaFirenetService.getStatus(any())).thenReturn(mock(StoveStatus.class));

// WHEN
bridge.publishToMqtt();
Expand All @@ -188,6 +193,7 @@ void publishToMqttShouldInvokeRikaFirenetServiceGetStatusForEachStove() throws E
final var thirdStove = StoveId.of(3L);
final var stoves = List.of(firstStove, secondStove, thirdStove);
bridge.initStoves(stoves);
when(rikaFirenetService.getStatus(any())).thenReturn(mock(StoveStatus.class));

// WHEN
bridge.publishToMqtt();
Expand All @@ -198,6 +204,43 @@ void publishToMqttShouldInvokeRikaFirenetServiceGetStatusForEachStove() throws E
verify(rikaFirenetService).getStatus(thirdStove);
}

@Test
void publishToMqttShouldInvokeMqttServicePublishErrorForEachStoveHavingAnError()
throws Exception {
// GIVEN
final var stoves = List.of(StoveId.of(1L), StoveId.of(2L), StoveId.of(3L));
bridge.initStoves(stoves);
final var stoveStatus = mock(StoveStatus.class);
when(stoveStatus.getError())
.thenReturn(Optional.of(StoveError.builder().statusError(1).statusSubError(12).build()));

when(rikaFirenetService.getStatus(any())).thenReturn(stoveStatus);

// WHEN
bridge.publishToMqtt();

// THEN
verify(mqttService, times(stoves.size())).publishNotification(any());
}

@Test
void publishToMqttShouldNotInvokeMqttServicePublishErrorForEachStoveHavingNoError()
throws Exception {
// GIVEN
final var stoves = List.of(StoveId.of(1L), StoveId.of(2L), StoveId.of(3L));
bridge.initStoves(stoves);
final var stoveStatus = mock(StoveStatus.class);
when(stoveStatus.getError()).thenReturn(Optional.empty());

when(rikaFirenetService.getStatus(any())).thenReturn(stoveStatus);

// WHEN
bridge.publishToMqtt();

// THEN
verify(mqttService, never()).publishNotification(any());
}

@Test
void onReceiveMqttCommandShouldInvokeUpdateControls() throws Exception {

Expand Down
6 changes: 5 additions & 1 deletion docs/docs/getting-started/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ Default: `cmnd/rika2mqtt`
The MQTT topic used by RIKA2MQTT to publish RIKA status
Default: `tele/rika2mqtt`

## MQTT_NOTIFICATION_TOPIC_NAME (Optional)
The MQTT topic used by RIKA2MQTT to publish RIKA errors, warnings (i.e. Empty pellet container)
Default: `tele/rika2mqtt-notifications`

## MQTT_URI_SCHEME (Optional)
The uri scheme to be used with MQTT_HOST (i.e: `tcp://`, `ssl://`)
Default: `tcp://`
Default: `tcp://`
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@
public interface MqttService {

void publish(String message);

void publishNotification(String message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,18 @@ public class MqttServiceImpl implements MqttService {
@Qualifier("mqttConfiguration.MqttGateway")
private final MqttConfiguration.MqttGateway mqttGateway;

@Qualifier("mqttConfiguration.MqttNotificationGateway")
private final MqttConfiguration.MqttGateway mqttNotificationGateway;

@Override
public void publish(final String message) {
log.atInfo().log("Publish to mqtt:\n%s", message);
mqttGateway.sendToMqtt(message);
}

@Override
public void publishNotification(String message) {
log.atInfo().log("Publish error to mqtt:\n%s", message);
mqttNotificationGateway.sendToMqtt(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ public class MqttConfigProperties {

private String telemetryReportTopicName = "tele/rika2mqtt";

private String notificationTopicName = "tele/rika2mqtt-notifications";

private String commandTopicName = "cmnd/rika2mqtt";
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public MessageHandler mqttOutbound() {
return messageHandler;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundNotificationChannel", autoStartup = "true")
public MessageHandler mqttOutboundNotification() {
var messageHandler =
new MqttPahoMessageHandler(mqttConfigProperties.getClientName(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfigProperties.getNotificationTopicName());
return messageHandler;
}

/**
* @implNote this is using a workaround found <a
* href="https://stackoverflow.com/a/41241824">here</a> the doc simply mention to do: `return
Expand All @@ -102,12 +112,25 @@ public MessageChannel mqttOutboundChannel() {
return dc;
}

@Bean
public MessageChannel mqttOutboundNotificationChannel() {
var dc = new DirectChannel();
dc.subscribe(mqttOutboundNotification());
return dc;
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

void sendToMqtt(String data);
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundNotificationChannel")
public interface MqttNotificationGateway extends MqttGateway {

void sendToMqtt(String data);
}

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* The MIT License
* Copyright © 2022 Sebastien Vermeille
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package dev.cookiecode.rika2mqtt.plugins.api.v1;

import dev.cookiecode.rika2mqtt.plugins.api.Beta;
import dev.cookiecode.rika2mqtt.plugins.api.v1.model.StoveError;
import org.pf4j.ExtensionPoint;

/**
* @author Sebastien Vermeille
*/
@Beta
public interface StoveErrorExtension extends ExtensionPoint {

/**
* When an error is displayed on the RIKA Stove screen, it is also triggered here
*
* @param stoveError the error triggered
*/
void onStoveError(StoveError stoveError);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* The MIT License
* Copyright © 2022 Sebastien Vermeille
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package dev.cookiecode.rika2mqtt.plugins.api.v1.model;

import dev.cookiecode.rika2mqtt.plugins.api.Beta;
import lombok.Data;

/**
* @author Sebastien Vermeille
*/
@Data
@Beta
public class StoveError {

private StoveId stoveId;
private String errorCode;
}
Loading

0 comments on commit 09b10a8

Please sign in to comment.