Improved function name and fixed incorrect MQTT will handling
This commit is contained in:
parent
186714d0b2
commit
c45809048a
4 changed files with 50 additions and 22 deletions
|
|
@ -339,6 +339,9 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
handlePublish((MqttPacketPublish) packet);
|
handlePublish((MqttPacketPublish) packet);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case MqttPacketHeader.PACKET_TYPE_PUBREC:
|
||||||
|
break;
|
||||||
|
|
||||||
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
|
case MqttPacketHeader.PACKET_TYPE_SUBSCRIBE:
|
||||||
handleSubscribe((MqttPacketSubscribe) packet);
|
handleSubscribe((MqttPacketSubscribe) packet);
|
||||||
break;
|
break;
|
||||||
|
|
@ -368,6 +371,8 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
private void handlePublish(MqttPacketPublish publishPacket) {
|
private void handlePublish(MqttPacketPublish publishPacket) {
|
||||||
if (publishPacket.getFlagQoS() != 0)
|
if (publishPacket.getFlagQoS() != 0)
|
||||||
throw new UnsupportedOperationException("QoS larger then 0 not supported.");
|
throw new UnsupportedOperationException("QoS larger then 0 not supported.");
|
||||||
|
if (publishPacket.getFlagRetain())
|
||||||
|
throw new UnsupportedOperationException("Publish retain is not supported.");
|
||||||
|
|
||||||
logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName);
|
logger.finer("[" + clientId + "] Publishing to topic: " + publishPacket.topicName);
|
||||||
broker.publish(publishPacket.topicName, publishPacket.payload);
|
broker.publish(publishPacket.topicName, publishPacket.payload);
|
||||||
|
|
@ -405,13 +410,25 @@ public class MqttBroker extends ThreadedTCPNetworkServer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dataPublished(String topic, byte[] data) {
|
public void dataPublished(String topic, byte[] data) {
|
||||||
// Data has been published to a subscribed topic.
|
try {
|
||||||
|
// Prepare response
|
||||||
|
MqttPacketPublish publishPacket = new MqttPacketPublish();
|
||||||
|
publishPacket.setFlagQoS(MqttPacketPublish.PUBLISH_QOS_0);
|
||||||
|
publishPacket.topicName = topic;
|
||||||
|
publishPacket.payload = data;
|
||||||
|
sendPacket(publishPacket);
|
||||||
|
|
||||||
|
// TODO: QOS 1
|
||||||
|
// TODO: QOS 2
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.log(Level.WARNING, "Was unable to publish topic to client " + clientId, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendWillPacket() throws IOException {
|
private void sendWillPacket() throws IOException {
|
||||||
if (willPacket != null) {
|
if (willPacket != null) {
|
||||||
logger.fine("[" + clientId + "] Publishing will packet.");
|
logger.fine("[" + clientId + "] Publishing will packet.");
|
||||||
sendPacket(willPacket);
|
broker.publish(willPacket.topicName, willPacket.payload);
|
||||||
willPacket = null;
|
willPacket = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,10 +39,10 @@ public class MqttPacketHeader implements BinaryStruct {
|
||||||
public static final int PACKET_TYPE_CONN = 1;
|
public static final int PACKET_TYPE_CONN = 1;
|
||||||
public static final int PACKET_TYPE_CONNACK = 2;
|
public static final int PACKET_TYPE_CONNACK = 2;
|
||||||
public static final int PACKET_TYPE_PUBLISH = 3;
|
public static final int PACKET_TYPE_PUBLISH = 3;
|
||||||
public static final int PACKET_TYPE_PUBACK = 4;
|
public static final int PACKET_TYPE_PUBACK = 4; // QoS 1
|
||||||
public static final int PACKET_TYPE_PUBREC = 5;
|
public static final int PACKET_TYPE_PUBREC = 5; // QoS 2
|
||||||
public static final int PACKET_TYPE_PUBREL = 6;
|
public static final int PACKET_TYPE_PUBREL = 6; // QoS 2
|
||||||
public static final int PACKET_TYPE_PUBCOMP = 7;
|
public static final int PACKET_TYPE_PUBCOMP = 7; // QoS 2
|
||||||
public static final int PACKET_TYPE_SUBSCRIBE = 8;
|
public static final int PACKET_TYPE_SUBSCRIBE = 8;
|
||||||
public static final int PACKET_TYPE_SUBACK = 9;
|
public static final int PACKET_TYPE_SUBACK = 9;
|
||||||
public static final int PACKET_TYPE_UNSUBSCRIBE = 10;
|
public static final int PACKET_TYPE_UNSUBSCRIBE = 10;
|
||||||
|
|
@ -58,7 +58,7 @@ public class MqttPacketHeader implements BinaryStruct {
|
||||||
@BinaryField(index = 1, length = 4)
|
@BinaryField(index = 1, length = 4)
|
||||||
public byte type;
|
public byte type;
|
||||||
@BinaryField(index = 2, length = 4)
|
@BinaryField(index = 2, length = 4)
|
||||||
public byte flags;
|
public byte flags = 0;
|
||||||
|
|
||||||
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
|
@CustomBinaryField(index = 3, serializer = MqttVariableIntSerializer.class)
|
||||||
public int variableHeaderAndPayloadLength;
|
public int variableHeaderAndPayloadLength;
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,11 @@ import java.io.OutputStream;
|
||||||
*/
|
*/
|
||||||
public class MqttPacketPublish extends MqttPacketHeader {
|
public class MqttPacketPublish extends MqttPacketHeader {
|
||||||
|
|
||||||
|
public static final int PUBLISH_QOS_0 = 0b00;
|
||||||
|
public static final int PUBLISH_QOS_1 = 0b01;
|
||||||
|
public static final int PUBLISH_QOS_2 = 0b10;
|
||||||
|
public static final int PUBLISH_QOS_RESERVED = 0b11;
|
||||||
|
|
||||||
// Header
|
// Header
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
@ -49,9 +54,9 @@ public class MqttPacketPublish extends MqttPacketHeader {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final byte FLAG_DUP_BITMASK = ByteUtil.getBitMask(3, 1);
|
private static final byte FLAG_DUP_BITMASK = 0b1000;
|
||||||
private static final byte FLAG_QOS_BITMASK = ByteUtil.getBitMask(1, 2);
|
private static final byte FLAG_QOS_BITMASK = 0b0110;
|
||||||
private static final byte FLAG_RETAIN_BITMASK = ByteUtil.getBitMask(0, 1);
|
private static final byte FLAG_RETAIN_BITMASK = 0b0001;
|
||||||
|
|
||||||
// ------------------------------------------
|
// ------------------------------------------
|
||||||
// Variable Header
|
// Variable Header
|
||||||
|
|
@ -98,14 +103,23 @@ public class MqttPacketPublish extends MqttPacketHeader {
|
||||||
public boolean getFlagDup() {
|
public boolean getFlagDup() {
|
||||||
return (flags & FLAG_DUP_BITMASK) != 0;
|
return (flags & FLAG_DUP_BITMASK) != 0;
|
||||||
}
|
}
|
||||||
|
public void setFlagDup(boolean isRedelivery) {
|
||||||
|
flags |= (byte) (FLAG_DUP_BITMASK & (isRedelivery ? 1 : 0));
|
||||||
|
}
|
||||||
|
|
||||||
public byte getFlagQoS() {
|
public byte getFlagQoS() {
|
||||||
return (byte) ((flags & FLAG_QOS_BITMASK) >> 1);
|
return (byte) ((flags & FLAG_QOS_BITMASK) >> 1);
|
||||||
}
|
}
|
||||||
|
public void setFlagQoS(int qos) {
|
||||||
|
flags |= (byte) (FLAG_QOS_BITMASK & qos);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean getFlagRetain() {
|
public boolean getFlagRetain() {
|
||||||
return (flags & FLAG_RETAIN_BITMASK) != 0;
|
return (flags & FLAG_RETAIN_BITMASK) != 0;
|
||||||
}
|
}
|
||||||
|
public void setFlagRetain(boolean retain) {
|
||||||
|
flags |= (byte) (FLAG_RETAIN_BITMASK & (retain ? 1 : 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer<byte[]> {
|
private static class MqttPacketPublishPayloadSerializer implements BinaryFieldSerializer<byte[]> {
|
||||||
|
|
|
||||||
|
|
@ -135,25 +135,22 @@ public class PluginManager<T> implements Iterable<PluginData>{
|
||||||
/**
|
/**
|
||||||
* @return a List of enabled plugins.
|
* @return a List of enabled plugins.
|
||||||
*/
|
*/
|
||||||
public List<PluginData> toArray() {
|
public List<PluginData> getEnabledPlugins() {
|
||||||
return toList(iterator());
|
return toList(iterator());
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* @return a List of all plugins, independently on if they are enabled or disabled.
|
||||||
|
*/
|
||||||
|
public List<PluginData> getAllPlugins() {
|
||||||
|
return toList(iteratorAll());
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* @return a list of enabled plugins that has specified the provided interface in their definition.
|
* @return a list of enabled plugins that has specified the provided interface in their definition.
|
||||||
*/
|
*/
|
||||||
public <K> List<K> toArray(Class<K> intf) {
|
public <K> List<K> getEnabledPluginSingletons(Class<K> intf) {
|
||||||
return toList(getSingletonIterator(intf));
|
return toList(getSingletonIterator(intf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a List of all plugins, independently on if they are enabled or disabled.
|
|
||||||
*/
|
|
||||||
public List<PluginData> toArrayAll() {
|
|
||||||
return toList(iteratorAll());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private <K> List<K> toList(Iterator<K> it) {
|
private <K> List<K> toList(Iterator<K> it) {
|
||||||
ArrayList<K> list = new ArrayList<>();
|
ArrayList<K> list = new ArrayList<>();
|
||||||
|
|
@ -166,7 +163,7 @@ public class PluginManager<T> implements Iterable<PluginData>{
|
||||||
* @return the PluginData representing the given plugin by name, returns null if
|
* @return the PluginData representing the given plugin by name, returns null if
|
||||||
* there is no plugin by that name.
|
* there is no plugin by that name.
|
||||||
*/
|
*/
|
||||||
public PluginData getPluginData(String pluginName) {
|
public PluginData getPlugin(String pluginName) {
|
||||||
return plugins.get(pluginName);
|
return plugins.get(pluginName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue