Added partial subscribe and unsubscribe logic

This commit is contained in:
Ziver Koc 2018-11-08 22:09:54 +01:00
parent a1caa8b0e1
commit f175830fae
6 changed files with 164 additions and 21 deletions

View file

@ -2,6 +2,9 @@ package zutil.net.mqtt;
import zutil.log.LogUtil;
import zutil.net.mqtt.packet.*;
import zutil.net.mqtt.packet.MqttPacketSubscribe.MqttSubscribePayload;
import zutil.net.mqtt.packet.MqttPacketSubscribeAck.MqttSubscribeAckPayload;
import zutil.net.mqtt.packet.MqttPacketUnsubscribe.MqttUnsubscribePayload;
import zutil.net.threaded.ThreadedTCPNetworkServer;
import zutil.net.threaded.ThreadedTCPNetworkServerThread;
import zutil.parser.binary.BinaryStructInputStream;
@ -9,6 +12,7 @@ import zutil.parser.binary.BinaryStructOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -24,25 +28,70 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
public static final int MQTT_PORT_TLS = 8883;
public static final int MQTT_PROTOCOL_VERSION = 0x04; // MQTT 3.1.1
private Map<String, List<MqttSubscriptionListener>> subscriptions;
public MqttBroker() {
super(MQTT_PORT);
subscriptions = new HashMap<>();
}
@Override
protected ThreadedTCPNetworkServerThread getThreadInstance(Socket s) throws IOException {
return new MqttConnectionThread(s);
}
protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread {
public synchronized void subscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
return;
if (!subscriptions.containsKey(topic)) {
logger.fine("Creating new topic: " + topic);
subscriptions.put(topic, new ArrayList<>());
}
List topicSubscriptions = subscriptions.get(topic);
if (topicSubscriptions.contains(listener)) {
logger.finer("New subscriber on topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
topicSubscriptions.add(listener);
}
}
public synchronized void unsubscribe(MqttSubscriptionListener listener) {
if (listener == null)
return;
for (String topic : subscriptions.keySet()){
unsubscribe(topic, listener);
}
}
public synchronized void unsubscribe(String topic, MqttSubscriptionListener listener) {
if (topic == null || topic.isEmpty() || listener == null)
return;
if (!subscriptions.containsKey(topic))
return;
List topicSubscriptions = subscriptions.get(topic);
if (topicSubscriptions.remove(listener)){
logger.finer("Subscriber unsubscribed from topic (" + topic + "), subscriber count: " + topicSubscriptions.size());
if (topicSubscriptions.isEmpty()) {
logger.fine("Removing empty topic: " + topic);
subscriptions.remove(topic);
}
}
}
protected static class MqttConnectionThread implements ThreadedTCPNetworkServerThread, MqttSubscriptionListener {
private Socket socket;
private BinaryStructInputStream in;
private BinaryStructOutputStream out;
private boolean shutdown = false;
protected MqttConnectionThread() {} // Test constructor
public MqttConnectionThread(Socket s) throws IOException {
@ -85,10 +134,7 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
if (packet == null)
return;
MqttPacketHeader packetRsp = handlePacket(packet);
if (packetRsp != null)
sendPacket(packetRsp);
handlePacket(packet);
}
socket.close();
@ -103,36 +149,69 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
}
}
public MqttPacketHeader handlePacket(MqttPacketHeader packet) throws IOException {
public void handlePacket(MqttPacketHeader packet) throws IOException {
// TODO: QOS
switch (packet.type) {
// TODO: Publish
case MqttPacketHeader.PACKET_TYPE_PUBLISH:
break;
// TODO: Subscribe
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
MqttPacketSubscribe subscribePacket = (MqttPacketSubscribe) packet;
MqttPacketSubscribeAck subscribeAckPacket = new MqttPacketSubscribeAck();
subscribeAckPacket.packetId = subscribePacket.packetId;
for (MqttSubscribePayload payload : subscribePacket.payload) {
// TODO: subscribe(payload.topicFilter, this)
MqttSubscribeAckPayload ackPayload = new MqttSubscribeAckPayload();
ackPayload.returnCode = MqttSubscribeAckPayload.RETCODE_SUCESS_MAX_QOS_0;
subscribeAckPacket.payload.add(ackPayload);
}
sendPacket(subscribeAckPacket);
break;
// TODO: Unsubscribe
case MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE:
MqttPacketUnsubscribe unsubscribePacket = (MqttPacketUnsubscribe) packet;
for (MqttUnsubscribePayload payload : unsubscribePacket.payload) {
// TODO: unsubscribe(payload.topicFilter, this)
}
MqttPacketUnsubscribeAck unsubscribeAckPacket = new MqttPacketUnsubscribeAck();
unsubscribeAckPacket.packetId = unsubscribePacket.packetId;
sendPacket(unsubscribeAckPacket);
break;
// Ping
case MqttPacketHeader.PACKET_TYPE_PINGREQ:
return new MqttPacketPingResp();
sendPacket(new MqttPacketPingResp());
break;
// Close connection
default:
logger.warning("Received unknown packet type: " + packet.type);
case MqttPacketHeader.PACKET_TYPE_DISCONNECT:
shutdown = true;
break;
}
return null;
}
public void sendPacket(MqttPacketHeader packet) throws IOException {
@Override
public void dataPublished(String topic, String data) {
}
public synchronized void sendPacket(MqttPacketHeader packet) throws IOException {
MqttPacket.write(out, packet);
}
public boolean isShutdown() {
return shutdown;
}
}
}

View file

@ -0,0 +1,34 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2018 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.mqtt;
/**
* Interface defining methods that will be
* called when data is published to a topic
*/
public interface MqttSubscriptionListener {
public void dataPublished(String topic, String data);
}

View file

@ -2,6 +2,7 @@ package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.LinkedList;
import java.util.List;
/**
@ -24,7 +25,7 @@ public class MqttPacketSubscribe extends MqttPacketHeader{
// Payload
public List<MqttSubscribePayload> payload;
public List<MqttSubscribePayload> payload = new LinkedList<>();

View file

@ -2,6 +2,7 @@ package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.LinkedList;
import java.util.List;
/**
@ -25,7 +26,7 @@ public class MqttPacketSubscribeAck extends MqttPacketHeader{
// Payload
public List<MqttSubscribeAckPayload> payload;
public List<MqttSubscribeAckPayload> payload = new LinkedList<>();

View file

@ -2,6 +2,7 @@ package zutil.net.mqtt.packet;
import zutil.parser.binary.BinaryStruct;
import java.util.LinkedList;
import java.util.List;
/**
@ -14,7 +15,7 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{
// Header
{
type = MqttPacketHeader.PACKET_TYPE_UNSUBACK;
type = MqttPacketHeader.PACKET_TYPE_UNSUBSCRIBE;
}
// Variable Header
@ -24,7 +25,7 @@ public class MqttPacketUnsubscribe extends MqttPacketHeader{
// Payload
public List<MqttUnsubscribePayload> payload;
public List<MqttUnsubscribePayload> payload = new LinkedList<>();

View file

@ -2,10 +2,7 @@ package zutil.net.mqtt;
import org.junit.Test;
import zutil.net.mqtt.MqttBroker.MqttConnectionThread;
import zutil.net.mqtt.packet.MqttPacketDisconnect;
import zutil.net.mqtt.packet.MqttPacketHeader;
import zutil.net.mqtt.packet.MqttPacketPingReq;
import zutil.net.mqtt.packet.MqttPacketPingResp;
import zutil.net.mqtt.packet.*;
import java.io.IOException;
import java.util.LinkedList;
@ -24,12 +21,40 @@ public class MqttBrokerTest {
}
}
@Test
public void subscribeEmpty() throws IOException {
MqttConnectionMockThread thread = new MqttConnectionMockThread();
MqttPacketSubscribe subscribePacket = new MqttPacketSubscribe();
subscribePacket.packetId = (int)(Math.random()*1000);
thread.handlePacket(subscribePacket);
MqttPacketHeader responsePacket = thread.sentPackets.poll();
assertEquals(MqttPacketSubscribeAck.class, responsePacket.getClass());
assertEquals(subscribePacket.packetId, ((MqttPacketSubscribeAck)responsePacket).packetId);
}
@Test
public void unsubscribe() throws IOException {
MqttConnectionMockThread thread = new MqttConnectionMockThread();
MqttPacketUnsubscribe unsubscribePacket = new MqttPacketUnsubscribe();
unsubscribePacket.packetId = (int)(Math.random()*1000);
thread.handlePacket(unsubscribePacket);
MqttPacketHeader responsePacket = thread.sentPackets.poll();
assertEquals(MqttPacketUnsubscribeAck.class, responsePacket.getClass());
assertEquals(unsubscribePacket.packetId, ((MqttPacketUnsubscribeAck)responsePacket).packetId);
}
@Test
public void ping() throws IOException {
MqttConnectionMockThread thread = new MqttConnectionMockThread();
MqttPacketPingReq pingPacket = new MqttPacketPingReq();
assertEquals(MqttPacketPingResp.class, thread.handlePacket(pingPacket).getClass());
thread.handlePacket(pingPacket);
assertEquals(MqttPacketPingResp.class, thread.sentPackets.poll().getClass());
}
@Test
@ -37,7 +62,9 @@ public class MqttBrokerTest {
MqttConnectionMockThread thread = new MqttConnectionMockThread();
MqttPacketDisconnect disconnectPacket = new MqttPacketDisconnect();
assertEquals(null, thread.handlePacket(disconnectPacket));
thread.handlePacket(disconnectPacket);
assertEquals(null, thread.sentPackets.poll());
assertTrue(thread.isShutdown());
}
}