Fixe some buggs in MQTT, it works now

This commit is contained in:
Ziver Koc 2024-09-09 23:35:26 +02:00
parent b7ee6b16dc
commit 34e6843d91
5 changed files with 60 additions and 107 deletions

View file

@ -49,7 +49,7 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
private MqttBroker mqttBroker; private MqttBroker mqttBroker;
private List<HalMqttDetector> detectors = Collections.emptyList(); private List<HalMqttDetector> detectors = Collections.emptyList();
private HashMap<String, List<HalMqttDeviceConfig>> topics = new HashMap<>(); private Map<String, List<HalMqttDeviceConfig>> topics = new HashMap<>();
private List<HalDeviceReportListener> deviceListeners = new CopyOnWriteArrayList<>(); private List<HalDeviceReportListener> deviceListeners = new CopyOnWriteArrayList<>();
// -------------------------- // --------------------------
@ -95,10 +95,13 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
// -------------------------- // --------------------------
@Override @Override
public void dataPublished(String topic, byte[] data) { public void dataPublished(String topicName, byte[] data) {
logger.finest("MQTT data published(topic: " + topic + "): " + new String(data, StandardCharsets.UTF_8)); if (data == null)
data = new byte[0];
logger.finest("MQTT data published(topic: " + topicName + "): " + new String(data, StandardCharsets.UTF_8));
List<HalMqttDeviceConfig> registeredDevices = topics.get(topic); topicName = topicName.trim();
List<HalMqttDeviceConfig> registeredDevices = topics.get(topicName);
// Handle existing devices // Handle existing devices
@ -114,16 +117,16 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
} }
} }
// Handle new devices // Handle detection of new devices
for (HalMqttDetector detector : detectors) { for (HalMqttDetector detector : detectors) {
List<HalMqttDeviceConfig> detectedDevices = detector.parseTopic(topic, data); List<HalMqttDeviceConfig> detectedDevices = detector.parseTopic(topicName, data);
// Check if we already know the device // Check if we already know the device
if (!ObjectUtil.isEmpty(detectedDevices)) { if (!ObjectUtil.isEmpty(detectedDevices)) {
for (HalMqttDeviceConfig detectedDeviceConfig : detectedDevices) { for (HalMqttDeviceConfig detectedDeviceConfig : detectedDevices) {
// Only handle unknown devices // Only handle unknown devices
if (!registeredDevices.contains(detectedDeviceConfig)) { if (registeredDevices == null || !registeredDevices.contains(detectedDeviceConfig)) {
HalDeviceData deviceData = detectedDeviceConfig.getDeviceData(data); HalDeviceData deviceData = detectedDeviceConfig.getDeviceData(data);
if (deviceListeners != null) { if (deviceListeners != null) {
@ -150,9 +153,9 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
if (deviceConfig instanceof HalMqttDeviceConfig) { if (deviceConfig instanceof HalMqttDeviceConfig) {
HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) deviceConfig; HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) deviceConfig;
if (!topics.containsKey(mqttEvent.getTopic())) if (!topics.containsKey(mqttEvent.getTopicName()))
topics.put(mqttEvent.getTopic(), new ArrayList<>()); topics.put(mqttEvent.getTopicName(), new ArrayList<>());
topics.get(mqttEvent.getTopic()).add(mqttEvent); topics.get(mqttEvent.getTopicName()).add(mqttEvent);
} else { } else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Device config is not an instance of " + HalMqttDeviceConfig.class + ": " + deviceConfig.getClass()); "Device config is not an instance of " + HalMqttDeviceConfig.class + ": " + deviceConfig.getClass());
@ -163,8 +166,8 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
public void deregister(HalDeviceConfig deviceConfig) { public void deregister(HalDeviceConfig deviceConfig) {
if (deviceConfig instanceof HalMqttDeviceConfig) { if (deviceConfig instanceof HalMqttDeviceConfig) {
HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) deviceConfig; HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) deviceConfig;
if (topics.containsKey(mqttEvent.getTopic())) if (topics.containsKey(mqttEvent.getTopicName()))
topics.get(mqttEvent.getTopic()).remove(deviceConfig); topics.get(mqttEvent.getTopicName()).remove(deviceConfig);
} }
} }
@ -172,7 +175,7 @@ public class HalMqttController implements HalAutostartController, MqttSubscripti
public void send(HalEventConfig eventConfig, HalEventData eventData) { public void send(HalEventConfig eventConfig, HalEventData eventData) {
if (eventConfig instanceof HalMqttDeviceConfig) { if (eventConfig instanceof HalMqttDeviceConfig) {
HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) eventConfig; HalMqttDeviceConfig mqttEvent = (HalMqttDeviceConfig) eventConfig;
mqttBroker.publish(mqttEvent.getTopic(), Double.toString(eventData.getData()).getBytes()); mqttBroker.publish(mqttEvent.getTopicName(), Double.toString(eventData.getData()).getBytes());
} else } else
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Device config is not an instance of " + HalMqttDeviceConfig.class + ": " + eventConfig.getClass()); "Device config is not an instance of " + HalMqttDeviceConfig.class + ": " + eventConfig.getClass());

View file

@ -51,7 +51,7 @@ public class GenericMqttDetector implements HalMqttDetector {
if (config != null) { if (config != null) {
detectedDeviceConfigs.add(config); detectedDeviceConfigs.add(config);
return null; return detectedDeviceConfigs;
} }
} }

View file

@ -58,8 +58,8 @@ import java.util.Objects;
public abstract class HalMqttDeviceConfig implements HalSensorConfig { public abstract class HalMqttDeviceConfig implements HalSensorConfig {
@Configurator.Configurable(value = "MQTT Topic") @Configurator.Configurable(value = "MQTT Topic Name")
private String topic; private String topicName;
@Configurator.Configurable(value = "JSON Path", description = "If the value of the topic is a JSON then this parameter can be used to specify the path to the e.g. temperature value." + @Configurator.Configurable(value = "JSON Path", description = "If the value of the topic is a JSON then this parameter can be used to specify the path to the e.g. temperature value." +
"<br>THe parameter uses the JSON-Path syntax where it always starts with $ and object fields can be accessed by .<key> and a array element by [index]") "<br>THe parameter uses the JSON-Path syntax where it always starts with $ and object fields can be accessed by .<key> and a array element by [index]")
private String jsonPath; private String jsonPath;
@ -68,27 +68,27 @@ public abstract class HalMqttDeviceConfig implements HalSensorConfig {
public HalMqttDeviceConfig() {} public HalMqttDeviceConfig() {}
/** /**
* @param topic is the topic associated to this device * @param topicName is the topic associated to this device
*/ */
public HalMqttDeviceConfig(String topic) { public HalMqttDeviceConfig(String topicName) {
this.topic = topic; this.topicName = topicName;
} }
/** /**
* @param topic is the topic associated to this device. * @param topicName is the topic associated to this device.
* @param jsonPath indicates that the payload is of JSON format and the data should be extracted from this path. * @param jsonPath indicates that the payload is of JSON format and the data should be extracted from this path.
*/ */
public HalMqttDeviceConfig(String topic, String jsonPath) { public HalMqttDeviceConfig(String topicName, String jsonPath) {
this.topic = topic; this.topicName = topicName;
this.jsonPath = jsonPath; this.jsonPath = jsonPath;
} }
public String getTopic() { public String getTopicName() {
return topic; return topicName;
} }
public void setTopic(String topic) { public void setTopicName(String topicName) {
this.topic = topic; this.topicName = topicName;
} }
public String getJsonPath() { public String getJsonPath() {
@ -124,19 +124,19 @@ public abstract class HalMqttDeviceConfig implements HalSensorConfig {
HalMqttDeviceConfig that = (HalMqttDeviceConfig) o; HalMqttDeviceConfig that = (HalMqttDeviceConfig) o;
if (!Objects.equals(topic, that.topic)) return false; if (!Objects.equals(topicName, that.topicName)) return false;
return Objects.equals(jsonPath, that.jsonPath); return Objects.equals(jsonPath, that.jsonPath);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = topic != null ? topic.hashCode() : 0; int result = topicName != null ? topicName.hashCode() : 0;
result = 31 * result + (jsonPath != null ? jsonPath.hashCode() : 0); result = 31 * result + (jsonPath != null ? jsonPath.hashCode() : 0);
return result; return result;
} }
@Override @Override
public String toString() { public String toString() {
return "Topic: " + topic + ", JSON Path: " + jsonPath; return "Topic: " + topicName + ", JSON Path: " + jsonPath;
} }
} }

View file

@ -1,6 +1,7 @@
package se.hal.plugin.mqtt.detector; package se.hal.plugin.mqtt.detector;
import org.junit.Test; import org.junit.Test;
import se.hal.plugin.mqtt.device.HalMqttDeviceConfig;
import se.hal.plugin.mqtt.device.HalMqttHumidityDeviceConfig; import se.hal.plugin.mqtt.device.HalMqttHumidityDeviceConfig;
import se.hal.plugin.mqtt.device.HalMqttParticularMatterDeviceConfig; import se.hal.plugin.mqtt.device.HalMqttParticularMatterDeviceConfig;
import se.hal.plugin.mqtt.device.HalMqttTemperatureDeviceConfig; import se.hal.plugin.mqtt.device.HalMqttTemperatureDeviceConfig;
@ -11,6 +12,7 @@ import se.hal.test.MockHalDeviceReportListener;
import zutil.converter.Converter; import zutil.converter.Converter;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -19,123 +21,73 @@ public class GenericMqttDetectorTest {
@Test @Test
public void ignoredTopics() { public void ignoredTopics() {
MockHalDeviceReportListener listener = new MockHalDeviceReportListener();
GenericMqttDetector detector = new GenericMqttDetector(); GenericMqttDetector detector = new GenericMqttDetector();
detector.addListener(listener);
assertEquals(0, detector.parseTopic("", new byte[]{}).size());
listener.reset(); assertEquals(0, detector.parseTopic("invalid/topic", new byte[]{}).size());
detector.parseTopic("", new byte[]{});
assertEquals(0, listener.getNumberOfReports());
listener.reset();
detector.parseTopic("invalid/topic", new byte[]{});
assertEquals(0, listener.getNumberOfReports());
} }
@Test @Test
public void parseTemperature() { public void parseTemperature() {
MockHalDeviceReportListener listener = new MockHalDeviceReportListener();
GenericMqttDetector detector = new GenericMqttDetector(); GenericMqttDetector detector = new GenericMqttDetector();
detector.addListener(listener);
List<HalMqttDeviceConfig> devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality/temperature", "zigbee2mqtt/Kitchen air quality/temperature",
Converter.toBytes(26)); Converter.toBytes(26));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttTemperatureDeviceConfig("zigbee2mqtt/Kitchen air quality/temperature"), new HalMqttTemperatureDeviceConfig("zigbee2mqtt/Kitchen air quality/temperature"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new TemperatureSensorData(26, 0),
listener.getReport(0).data);
devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality", "zigbee2mqtt/Kitchen air quality",
"{\"temperature\": 26}".getBytes(StandardCharsets.UTF_8)); "{\"temperature\": 26}".getBytes(StandardCharsets.UTF_8));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttTemperatureDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.temperature"), new HalMqttTemperatureDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.temperature"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new TemperatureSensorData(26, 0),
listener.getReport(0).data);
} }
@Test @Test
public void parseHumidity() { public void parseHumidity() {
MockHalDeviceReportListener listener = new MockHalDeviceReportListener();
GenericMqttDetector detector = new GenericMqttDetector(); GenericMqttDetector detector = new GenericMqttDetector();
detector.addListener(listener);
List<HalMqttDeviceConfig> devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality/humidity", "zigbee2mqtt/Kitchen air quality/humidity",
Converter.toBytes(51)); Converter.toBytes(51));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttHumidityDeviceConfig("zigbee2mqtt/Kitchen air quality/humidity"), new HalMqttHumidityDeviceConfig("zigbee2mqtt/Kitchen air quality/humidity"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new HumiditySensorData(51, 0),
listener.getReport(0).data);
devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality", "zigbee2mqtt/Kitchen air quality",
"{\"humidity\": 51}".getBytes(StandardCharsets.UTF_8)); "{\"humidity\": 51}".getBytes(StandardCharsets.UTF_8));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttHumidityDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.humidity"), new HalMqttHumidityDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.humidity"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new HumiditySensorData(51, 0),
listener.getReport(0).data);
} }
@Test @Test
public void parseParticularMatter() { public void parseParticularMatter() {
MockHalDeviceReportListener listener = new MockHalDeviceReportListener();
GenericMqttDetector detector = new GenericMqttDetector(); GenericMqttDetector detector = new GenericMqttDetector();
detector.addListener(listener);
List<HalMqttDeviceConfig> devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality/pm25", "zigbee2mqtt/Kitchen air quality/pm25",
Converter.toBytes(1)); Converter.toBytes(1));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttParticularMatterDeviceConfig("zigbee2mqtt/Kitchen air quality/pm25"), new HalMqttParticularMatterDeviceConfig("zigbee2mqtt/Kitchen air quality/pm25"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new ParticulateMatterSensorData(1, 0),
listener.getReport(0).data);
devices = detector.parseTopic(
listener.reset();
detector.parseTopic(
"zigbee2mqtt/Kitchen air quality", "zigbee2mqtt/Kitchen air quality",
"{\"pm25\": 1}".getBytes(StandardCharsets.UTF_8)); "{\"pm25\": 1}".getBytes(StandardCharsets.UTF_8));
assertEquals(1, listener.getNumberOfReports()); assertEquals(1, devices.size());
assertEquals( assertEquals(
new HalMqttParticularMatterDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.pm25"), new HalMqttParticularMatterDeviceConfig("zigbee2mqtt/Kitchen air quality", "$.pm25"),
listener.getReport(0).config); devices.get(0));
listener.getReport(0).data.setTimestamp(0);
assertEquals(
new ParticulateMatterSensorData(1, 0),
listener.getReport(0).data);
} }
} }

View file

@ -1,10 +1,12 @@
package se.hal.plugin.mqtt.detector; package se.hal.plugin.mqtt.detector;
import org.junit.Test; import org.junit.Test;
import se.hal.plugin.mqtt.device.HalMqttDeviceConfig;
import se.hal.plugin.mqtt.device.HalMqttParticularMatterDeviceConfig; import se.hal.plugin.mqtt.device.HalMqttParticularMatterDeviceConfig;
import se.hal.test.MockHalDeviceReportListener; import se.hal.test.MockHalDeviceReportListener;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -13,17 +15,13 @@ public class Zigbee2mqttDetectorTest {
@Test @Test
public void ignoredTopics() { public void ignoredTopics() {
MockHalDeviceReportListener listener = new MockHalDeviceReportListener();
Zigbee2mqttDetector detector = new Zigbee2mqttDetector(); Zigbee2mqttDetector detector = new Zigbee2mqttDetector();
detector.addListener(listener);
listener.reset(); List<HalMqttDeviceConfig> devices = detector.parseTopic("", new byte[]{});
detector.parseTopic("", new byte[]{}); assertEquals(0, devices.size());
assertEquals(0, listener.getNumberOfReports());
listener.reset(); devices = detector.parseTopic("invalid/topic", new byte[]{});
detector.parseTopic("invalid/topic", new byte[]{}); assertEquals(0, devices.size());
assertEquals(0, listener.getNumberOfReports());
} }
/* /*
@Test @Test