Implemented publish logic for Mqtt

This commit is contained in:
Ziver Koc 2019-01-20 16:07:51 +01:00
parent 9ffe4c6652
commit 6e3edc2619
4 changed files with 114 additions and 37 deletions

View file

@ -55,6 +55,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
/**
* Add the listener as a subscriber of the specific topic.
*/
public synchronized void subscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
return;
@ -71,6 +74,26 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
}
/**
* Publish data to the specific topic
*/
public void publish(String topic, byte[] data) {
if (!subscriptions.containsKey(topic)) {
logger.fine("Data was published to topic (" + topic + ") with no subscribers.");
return;
}
logger.finer("Data has been published to topic (" + topic + ")");
List<MqttSubscriptionListener> topicSubscriptions = subscriptions.get(topic);
for (MqttSubscriptionListener subscriber : topicSubscriptions) {
subscriber.dataPublished(topic, data);
}
}
/**
* Unsubscribe the listener from all available MQTT topics.
*/
public synchronized void unsubscribe(MqttSubscriptionListener listener) {
if (listener == null)
return;
@ -79,6 +102,10 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
unsubscribe(topic, listener);
}
}
/**
* Unsubscribe the listener from the specific MQTT topic.
*/
public synchronized void unsubscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
return;
@ -150,8 +177,32 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
}
private void handleConnect(MqttPacketHeader connectPacket) throws IOException {
// Unexpected packet?
if (!(connectPacket instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass());
MqttPacketConnect conn = (MqttPacketConnect) connectPacket;
// Reply
MqttPacketConnectAck connectAck = new MqttPacketConnectAck();
// Incorrect protocol version?
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
sendPacket(connectAck);
return;
} else {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
}
// TODO: authenticate
// TODO: clean session
sendPacket(connectAck);
}
protected void handlePacket(MqttPacketHeader packet) throws IOException {
// TODO: QOS
// TODO: QOS 1
// TODO: QOS 2
switch (packet.type) {
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
@ -180,31 +231,12 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
}
private void handleConnect(MqttPacketHeader connectPacket) throws IOException {
// Unexpected packet?
if (!(connectPacket instanceof MqttPacketConnect))
throw new IOException("Expected MqttPacketConnect but received " + connectPacket.getClass());
MqttPacketConnect conn = (MqttPacketConnect) connectPacket;
// Reply
MqttPacketConnectAck connectAck = new MqttPacketConnectAck();
// Incorrect protocol version?
if (conn.protocolLevel != MQTT_PROTOCOL_VERSION) {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_PROT_VER_ERROR;
sendPacket(connectAck);
return;
} else {
connectAck.returnCode = MqttPacketConnectAck.RETCODE_OK;
}
// TODO: authenticate
// TODO: clean session
sendPacket(connectAck);
}
private void handlePublish(MqttPacketPublish publishPacket) throws IOException {
// TODO: Publish
if (publishPacket.getFlagQoS() != 0)
throw new UnsupportedOperationException("QoS larger then 0 not supported.");
broker.publish(publishPacket.topicName, publishPacket.payload);
}
private void handleSubscribe(MqttPacketSubscribe subscribePacket) throws IOException {
@ -236,8 +268,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
@Override
public void dataPublished(String topic, String data) {
public void dataPublished(String topic, byte[] data) {
// Data has been published to a subscribed topic.
}
public synchronized void sendPacket(MqttPacketHeader packet) throws IOException {

View file

@ -30,5 +30,5 @@ package zutil.net.mqtt;
*/
public interface MqttSubscriptionListener {
void dataPublished(String topic, String data);
void dataPublished(String topic, byte[] data);
}

View file

@ -1,6 +1,8 @@
package zutil.net.mqtt.packet;
import zutil.ByteUtil;
/**
* A PUBLISH Control Packet is sent from a Client to a Server
*
@ -14,17 +16,14 @@ public class MqttPacketPublish extends MqttPacketHeader {
type = MqttPacketHeader.PACKET_TYPE_PUBLISH;
}
/*
@BinaryField(index = 2000, length = 1)
private int flagDup;
@BinaryField(index = 2001, length = 2)
private int flagQoS;
@BinaryField(index = 2002, length = 1)
private int flagRetain;
*/
private byte flagDupBitmask = ByteUtil.getBitMask(3, 1);
private byte flagQoSBitmask = ByteUtil.getBitMask(1, 2);
private byte flagRetainBitmask = ByteUtil.getBitMask(0, 1);
// Variable Header
@BinaryField(index = 2101, length = 16)
@BinaryField(index = 2001, length = 16)
private int topicNameLength;
/**
* The Topic Name identifies the information channel to which controlHeader data is published.
@ -32,11 +31,28 @@ public class MqttPacketPublish extends MqttPacketHeader {
@VariableLengthBinaryField(index = 2102, lengthField = "topicNameLength")
public String topicName;
@BinaryField(index = 2102, length = 16)
@BinaryField(index = 2002, length = 16)
public int packetId;
// Payload
// - Application data
@BinaryField(index = 3001, length = 100000)
public byte[] payload;
// Util methods
public boolean getFlagDup() {
return (flags & flagDupBitmask) != 0;
}
public byte getFlagQoS() {
return (byte) ((flags & flagQoSBitmask) >> 1);
}
public boolean getFlagRetain() {
return (flags & flagRetainBitmask) != 0;
}
}

View file

@ -91,6 +91,35 @@ public class MqttBrokerTest {
assertEquals(1, broker.getSubscriberCount("topic2"));
}
@Test
public void publish() throws IOException {
// Setup subscriber
final String[] recivedTopic = new String[1];
final byte[] recievedPayload = new byte[1];
MqttSubscriptionListener subscriber = new MqttSubscriptionListener() {
public void dataPublished(String topic, byte[] data) {
recivedTopic[0] = topic;
recievedPayload[0] = data[0];
}
};
// Setup broker
MqttBroker broker = new MqttBroker();
MqttConnectionThreadMock thread = new MqttConnectionThreadMock(broker);
broker.subscribe("test/topic", subscriber);
// Setup publish
MqttPacketPublish publish = new MqttPacketPublish();
publish.topicName = "test/topic";
publish.payload = new byte[]{42};
thread.handlePacket(publish);
// Check response
assertEquals("test/topic", recivedTopic[0]);
assertEquals((byte) 42, recievedPayload[0]);
}
@Test
public void unsubscribeEmpty() throws IOException {
MqttBroker broker = new MqttBroker();