diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 846ee54..e96394a 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -339,6 +339,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer { handlePublish((MqttPacketPublish) packet); break; + case MqttPacketHeader.PACKET_TYPE_PUBREC: + break; + case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE: handleSubscribe((MqttPacketSubscribe) packet); break; @@ -368,6 +371,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { private void handlePublish(MqttPacketPublish publishPacket) { if (publishPacket.getFlagQoS() != 0) throw new UnsupportedOperationException("QoS larger then 0 not supported."); + if (publishPacket.getFlagRetain()) + throw new UnsupportedOperationException("Publish retain is not supported."); logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName); broker.publish(publishPacket.topicName, publishPacket.payload); @@ -405,13 +410,25 @@ public class MqttBroker extends ThreadedTCPNetworkServer { @Override public void dataPublished(String topic, byte[] data) { - // Data has been published to a subscribed topic. + try { + // Prepare response + MqttPacketPublish publishPacket = new MqttPacketPublish(); + publishPacket.setFlagQoS(MqttPacketPublish.PUBLISH_QOS_0); + publishPacket.topicName = topic; + publishPacket.payload = data; + sendPacket(publishPacket); + + // TODO: QOS 1 + // TODO: QOS 2 + } catch (IOException e) { + logger.log(Level.WARNING, "Was unable to publish topic to client " + clientId, e); + } } private void sendWillPacket() throws IOException { if (willPacket != null) { logger.fine("[" + clientId + "] Publishing will packet."); - sendPacket(willPacket); + broker.publish(willPacket.topicName, willPacket.payload); willPacket = null; } } diff --git a/src/zutil/net/mqtt/packet/MqttPacketHeader.java b/src/zutil/net/mqtt/packet/MqttPacketHeader.java index 6841d40..bc29f5a 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketHeader.java +++ b/src/zutil/net/mqtt/packet/MqttPacketHeader.java @@ -39,10 +39,10 @@ public class MqttPacketHeader implements BinaryStruct { public static final int PACKET_TYPE_CONN = 1; public static final int PACKET_TYPE_CONNACK = 2; public static final int PACKET_TYPE_PUBLISH = 3; - public static final int PACKET_TYPE_PUBACK = 4; - public static final int PACKET_TYPE_PUBREC = 5; - public static final int PACKET_TYPE_PUBREL = 6; - public static final int PACKET_TYPE_PUBCOMP = 7; + public static final int PACKET_TYPE_PUBACK = 4; // QoS 1 + public static final int PACKET_TYPE_PUBREC = 5; // QoS 2 + public static final int PACKET_TYPE_PUBREL = 6; // QoS 2 + public static final int PACKET_TYPE_PUBCOMP = 7; // QoS 2 public static final int PACKET_TYPE_SUBSCRIBE = 8; public static final int PACKET_TYPE_SUBACK = 9; public static final int PACKET_TYPE_UNSUBSCRIBE = 10; @@ -58,7 +58,7 @@ public class MqttPacketHeader implements BinaryStruct { @BinaryField(index = 1, length = 4) public byte type; @BinaryField(index = 2, length = 4) - public byte flags; + public byte flags = 0; @CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class) public int variableHeaderAndPayloadLength; diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 3510015..a515e7c 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -42,6 +42,11 @@ import java.io.OutputStream; */ public class MqttPacketPublish extends MqttPacketHeader { + public static final int PUBLISH_QOS_0 = 0b00; + public static final int PUBLISH_QOS_1 = 0b01; + public static final int PUBLISH_QOS_2 = 0b10; + public static final int PUBLISH_QOS_RESERVED = 0b11; + // Header { @@ -49,9 +54,9 @@ public class MqttPacketPublish extends MqttPacketHeader { } - private static final byte FLAG_DUP_BITMASK = ByteUtil.getBitMask(3, 1); - private static final byte FLAG_QOS_BITMASK = ByteUtil.getBitMask(1, 2); - private static final byte FLAG_RETAIN_BITMASK = ByteUtil.getBitMask(0, 1); + private static final byte FLAG_DUP_BITMASK = 0b1000; + private static final byte FLAG_QOS_BITMASK = 0b0110; + private static final byte FLAG_RETAIN_BITMASK = 0b0001; // ------------------------------------------ // Variable Header @@ -98,14 +103,23 @@ public class MqttPacketPublish extends MqttPacketHeader { public boolean getFlagDup() { return (flags & FLAG_DUP_BITMASK) != 0; } + public void setFlagDup(boolean isRedelivery) { + flags |= (byte) (FLAG_DUP_BITMASK & (isRedelivery ? 1 : 0)); + } public byte getFlagQoS() { return (byte) ((flags & FLAG_QOS_BITMASK) >> 1); } + public void setFlagQoS(int qos) { + flags |= (byte) (FLAG_QOS_BITMASK & qos); + } public boolean getFlagRetain() { return (flags & FLAG_RETAIN_BITMASK) != 0; } + public void setFlagRetain(boolean retain) { + flags |= (byte) (FLAG_RETAIN_BITMASK & (retain ? 1 : 0)); + } private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer { diff --git a/src/zutil/plugin/PluginManager.java b/src/zutil/plugin/PluginManager.java index c9481d2..05bdc23 100755 --- a/src/zutil/plugin/PluginManager.java +++ b/src/zutil/plugin/PluginManager.java @@ -135,25 +135,22 @@ public class PluginManager implements Iterable{ /** * @return a List of enabled plugins. */ - public List toArray() { + public List getEnabledPlugins() { return toList(iterator()); } + /** + * @return a List of all plugins, independently on if they are enabled or disabled. + */ + public List getAllPlugins() { + return toList(iteratorAll()); + } /** * @return a list of enabled plugins that has specified the provided interface in their definition. */ - public List toArray(Class intf) { + public List getEnabledPluginSingletons(Class intf) { return toList(getSingletonIterator(intf)); } - /** - * @return a List of all plugins, independently on if they are enabled or disabled. - */ - public List toArrayAll() { - return toList(iteratorAll()); - } - - - private List toList(Iterator it) { ArrayList list = new ArrayList<>(); @@ -166,7 +163,7 @@ public class PluginManager implements Iterable{ * @return the PluginData representing the given plugin by name, returns null if * there is no plugin by that name. */ - public PluginData getPluginData(String pluginName) { + public PluginData getPlugin(String pluginName) { return plugins.get(pluginName); }