diff --git a/src/zutil/net/mqtt/MqttBroker.java b/src/zutil/net/mqtt/MqttBroker.java index 140d419..202d365 100755 --- a/src/zutil/net/mqtt/MqttBroker.java +++ b/src/zutil/net/mqtt/MqttBroker.java @@ -55,6 +55,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } + /** + * Add the listener as a subscriber of the specific topic. + */ public synchronized void subscribe(String topic, MqttSubscriptionListener listener) { if (topic == null || topic.isEmpty() || listener == null) return; @@ -71,6 +74,26 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } } + /** + * Publish data to the specific topic + */ + public void publish(String topic, byte[] data) { + if (!subscriptions.containsKey(topic)) { + logger.fine("Data was published to topic (" + topic + ") with no subscribers."); + return; + } + + logger.finer("Data has been published to topic (" + topic + ")"); + List topicSubscriptions = subscriptions.get(topic); + + for (MqttSubscriptionListener subscriber : topicSubscriptions) { + subscriber.dataPublished(topic, data); + } + } + + /** + * Unsubscribe the listener from all available MQTT topics. + */ public synchronized void unsubscribe(MqttSubscriptionListener listener) { if (listener == null) return; @@ -79,6 +102,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer { unsubscribe(topic, listener); } } + + /** + * Unsubscribe the listener from the specific MQTT topic. + */ public synchronized void unsubscribe(String topic, MqttSubscriptionListener listener) { if (topic == null || topic.isEmpty() || listener == null) return; @@ -150,8 +177,32 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } } + private void handleConnect(MqttPacketHeader connectPacket) throws IOException { + // Unexpected packet? + if (!(connectPacket instanceof MqttPacketConnect)) + throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass()); + MqttPacketConnect conn = (MqttPacketConnect) connectPacket; + + // Reply + MqttPacketConnectAck connectAck = new MqttPacketConnectAck(); + + // Incorrect protocol version? + if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) { + connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; + sendPacket(connectAck); + return; + } else { + connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK; + } + + // TODO: authenticate + // TODO: clean session + sendPacket(connectAck); + } + protected void handlePacket(MqttPacketHeader packet) throws IOException { - // TODO: QOS + // TODO: QOS 1 + // TODO: QOS 2 switch (packet.type) { case MqttPacketHeader.PACKET_TYPE_PUBLISH: @@ -180,31 +231,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer { } } - private void handleConnect(MqttPacketHeader connectPacket) throws IOException { - // Unexpected packet? - if (!(connectPacket instanceof MqttPacketConnect)) - throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass()); - MqttPacketConnect conn = (MqttPacketConnect) connectPacket; - - // Reply - MqttPacketConnectAck connectAck = new MqttPacketConnectAck(); - - // Incorrect protocol version? - if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) { - connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR; - sendPacket(connectAck); - return; - } else { - connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK; - } - - // TODO: authenticate - // TODO: clean session - sendPacket(connectAck); - } private void handlePublish(MqttPacketPublish publishPacket) throws IOException { - // TODO: Publish + if (publishPacket.getFlagQoS() != 0) + throw new UnsupportedOperationException("QoS larger then 0 not supported."); + + broker.publish(publishPacket.topicName, publishPacket.payload); } private void handleSubscribe(MqttPacketSubscribe subscribePacket) throws IOException { @@ -236,8 +268,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer { @Override - public void dataPublished(String topic, String data) { - + public void dataPublished(String topic, byte[] data) { + // Data has been published to a subscribed topic. } public synchronized void sendPacket(MqttPacketHeader packet) throws IOException { diff --git a/src/zutil/net/mqtt/MqttSubscriptionListener.java b/src/zutil/net/mqtt/MqttSubscriptionListener.java index 392261a..1dc7db8 100644 --- a/src/zutil/net/mqtt/MqttSubscriptionListener.java +++ b/src/zutil/net/mqtt/MqttSubscriptionListener.java @@ -30,5 +30,5 @@ package zutil.net.mqtt; */ public interface MqttSubscriptionListener { - void dataPublished(String topic, String data); + void dataPublished(String topic, byte[] data); } diff --git a/src/zutil/net/mqtt/packet/MqttPacketPublish.java b/src/zutil/net/mqtt/packet/MqttPacketPublish.java index 7e34fe2..c20bc28 100755 --- a/src/zutil/net/mqtt/packet/MqttPacketPublish.java +++ b/src/zutil/net/mqtt/packet/MqttPacketPublish.java @@ -1,6 +1,8 @@ package zutil.net.mqtt.packet; +import zutil.ByteUtil; + /** * A PUBLISH Control Packet is sent from a Client to a Server * @@ -14,17 +16,14 @@ public class MqttPacketPublish extends MqttPacketHeader { type = MqttPacketHeader.PACKET_TYPE_PUBLISH; } -/* - @BinaryField(index = 2000, length = 1) - private int flagDup; - @BinaryField(index = 2001, length = 2) - private int flagQoS; - @BinaryField(index = 2002, length = 1) - private int flagRetain; -*/ + + private byte flagDupBitmask = ByteUtil.getBitMask(3, 1); + private byte flagQoSBitmask = ByteUtil.getBitMask(1, 2); + private byte flagRetainBitmask = ByteUtil.getBitMask(0, 1); + // Variable Header - @BinaryField(index = 2101, length = 16) + @BinaryField(index = 2001, length = 16) private int topicNameLength; /** * The Topic Name identifies the information channel to which controlHeader data is published. @@ -32,11 +31,28 @@ public class MqttPacketPublish extends MqttPacketHeader { @VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength") public String topicName; - @BinaryField(index = 2102, length = 16) + @BinaryField(index = 2002, length = 16) public int packetId; // Payload // - Application data + @BinaryField(index = 3001, length = 100000) + public byte[] payload; + + + // Util methods + + public boolean getFlagDup() { + return (flags & flagDupBitmask) != 0; + } + + public byte getFlagQoS() { + return (byte) ((flags & flagQoSBitmask) >> 1); + } + + public boolean getFlagRetain() { + return (flags & flagRetainBitmask) != 0; + } } diff --git a/test/zutil/net/mqtt/MqttBrokerTest.java b/test/zutil/net/mqtt/MqttBrokerTest.java index 97703f3..771694b 100644 --- a/test/zutil/net/mqtt/MqttBrokerTest.java +++ b/test/zutil/net/mqtt/MqttBrokerTest.java @@ -91,6 +91,35 @@ public class MqttBrokerTest { assertEquals(1, broker.getSubscriberCount("topic2")); } + @Test + public void publish() throws IOException { + // Setup subscriber + final String[] recivedTopic = new String[1]; + final byte[] recievedPayload = new byte[1]; + MqttSubscriptionListener subscriber = new MqttSubscriptionListener() { + public void dataPublished(String topic, byte[] data) { + recivedTopic[0] = topic; + recievedPayload[0] = data[0]; + } + }; + + // Setup broker + MqttBroker broker = new MqttBroker(); + MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker); + broker.subscribe("test/topic", subscriber); + + // Setup publish + MqttPacketPublish publish = new MqttPacketPublish(); + publish.topicName = "test/topic"; + publish.payload = new byte[]{42}; + + thread.handlePacket(publish); + + // Check response + assertEquals("test/topic", recivedTopic[0]); + assertEquals((byte) 42, recievedPayload[0]); + } + @Test public void unsubscribeEmpty() throws IOException { MqttBroker broker = new MqttBroker();