diff --git a/src/zutil/net/nio/NioClient.java b/src/zutil/net/nio/NioClient.java old mode 100644 new mode 100755 index 6ad6bf8..7f9e240 --- a/src/zutil/net/nio/NioClient.java +++ b/src/zutil/net/nio/NioClient.java @@ -25,32 +25,28 @@ package zutil.net.nio; import zutil.net.nio.message.Message; -import zutil.net.nio.message.type.ResponseRequestMessage; +import zutil.net.nio.message.RequestResponseMessage; import zutil.net.nio.response.ResponseEvent; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; public class NioClient extends NioNetwork{ - private SocketChannel serverSocket; + private InetSocketAddress remoteAddress; /** * Creates a NioClient that connects to a server * - * @param hostAddress The server address - * @param port The port to listen on + * @param remoteAddress the server address + * @param remotePort the port to listen on */ - public NioClient(InetAddress serverAddress, int port) throws IOException { - super(InetAddress.getLocalHost(), port, NetworkType.CLIENT); - serverSocket = initiateConnection(new InetSocketAddress(serverAddress, port)); - Thread thread = new Thread(this); - thread.setDaemon(false); - thread.start(); + public NioClient(InetAddress remoteAddress, int remotePort) throws IOException { + this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort); + connect(this.remoteAddress); } protected Selector initSelector() throws IOException { @@ -61,21 +57,19 @@ public class NioClient extends NioNetwork{ /** * Sends a Message to the default server * - * @param data The data to be sent - * @throws IOException Something got wrong + * @param data the data to be sent */ public void send(Message data) throws IOException { - send(serverSocket, data); + send(remoteAddress, data); } /** * This method is for the Client to send a message to the server * - * @param handler The response handler - * @param data The data to send - * @throws IOException + * @param handler the response handler + * @param data the data to send */ - public void send(ResponseEvent handler, ResponseRequestMessage data) throws IOException { - send(serverSocket, handler, data); + public void send(ResponseEvent handler, RequestResponseMessage data) throws IOException { + send(remoteAddress, handler, data); } } diff --git a/src/zutil/net/nio/NioNetwork.java b/src/zutil/net/nio/NioNetwork.java index 5e36f08..7bf686d 100755 --- a/src/zutil/net/nio/NioNetwork.java +++ b/src/zutil/net/nio/NioNetwork.java @@ -24,13 +24,10 @@ package zutil.net.nio; -import zutil.Encrypter; import zutil.converter.Converter; -import zutil.io.DynamicByteArrayStream; -import zutil.io.MultiPrintStream; import zutil.log.LogUtil; -import zutil.net.nio.message.type.ResponseRequestMessage; -import zutil.net.nio.message.type.SystemMessage; +import zutil.net.nio.message.RequestResponseMessage; +import zutil.net.nio.message.SystemMessage; import zutil.net.nio.response.ResponseEvent; import zutil.net.nio.server.ChangeRequest; import zutil.net.nio.server.ClientData; @@ -38,8 +35,8 @@ import zutil.net.nio.worker.SystemWorker; import zutil.net.nio.worker.Worker; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -51,136 +48,130 @@ import java.util.logging.Logger; public abstract class NioNetwork implements Runnable { private static Logger logger = LogUtil.getLogger(); - public static enum NetworkType {SERVER, CLIENT}; - - private NetworkType type; - - // The host:port combination to listen on - protected InetAddress address; - protected int port; + protected SocketAddress localAddress; // The channel on which we'll accept connections protected ServerSocketChannel serverChannel; - // The selector we'll be monitoring + // The selector we will be monitoring private Selector selector; // The buffer into which we'll read data when it's available private ByteBuffer readBuffer = ByteBuffer.allocate(8192); protected Worker worker; protected SystemWorker systemWorker; - // This map contains all the clients that are conncted + // This map contains all the clients that are connected protected Map clients = new HashMap(); // A list of PendingChange instances private List pendingChanges = new LinkedList(); // Maps a SocketChannel to a list of ByteBuffer instances private Map> pendingWriteData = new HashMap>(); - private Map pendingReadData = new HashMap(); - // The encrypter - private Encrypter encrypter; + + /** - * Create a nio network class - * - * @param hostAddress The host address - * @param port The port - * @param type The type of network host - * @throws IOException + * Create a client based Network object */ - public NioNetwork(InetAddress address, int port, NetworkType type) throws IOException { - this.port = port; - this.address = address; - this.type = type; - this.selector = initSelector(); - this.systemWorker = new SystemWorker(this); + public NioNetwork() throws IOException { + this(null); } + /** + * Create a server based Network object + * + * @param localAddress the address the server will listen on + */ + public NioNetwork(SocketAddress localAddress) throws IOException { + this.localAddress = localAddress; + // init selector + this.selector = initSelector(); + this.systemWorker = new SystemWorker(this); + // init traffic thread + new Thread(this).start(); + } + protected abstract Selector initSelector() throws IOException; + + /** - * Sets the Worker for the network messages + * Sets the default worker for non System messages. * - * @param worker The worker that handles the incoming messages + * @param worker the worker that should handle incoming messages */ public void setDefaultWorker(Worker worker){ this.worker = worker; } - /** - * Sets the encrypter to use in the network - * - * @param enc The encrypter to use or null fo no encryption - */ - public void setEncrypter(Encrypter enc){ - encrypter = enc; - MultiPrintStream.out.println("Network Encryption "+ - (encrypter != null ? "Enabled("+encrypter.getAlgorithm()+")" : "Disabled")+"!!"); - } - public void send(SocketChannel socket, Object data) throws IOException{ - send(socket, Converter.toBytes(data)); - } + /** + * Connect to a remote Server. + */ + protected void connect(SocketAddress address) throws IOException { + logger.fine("Connecting to: "+address); + // Create a non-blocking socket channel + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.socket().setReuseAddress(true); + socketChannel.configureBlocking(false); + // Establish the Connection + socketChannel.connect(address); - public void send(InetSocketAddress address, Object data) throws IOException{ + // Queue a channel registration + synchronized(this.pendingChanges) { + pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); + } + selector.wakeup(); + } + + + public void send(SocketAddress address, Object data) throws IOException{ send(address, Converter.toBytes(data)); } - public void send(InetSocketAddress address, byte[] data){ - send(getSocketChannel(address), data); - } + public void send(SocketAddress address, ResponseEvent handler, RequestResponseMessage data) throws IOException { + // Register the response handler + systemWorker.addResponseHandler(handler, data); - public void send(SocketChannel socket, ResponseEvent handler, ResponseRequestMessage data) throws IOException { - // Register the response handler - systemWorker.addResponseHandler(handler, data); + send(address, Converter.toBytes(data)); + } - queueSend(socket,Converter.toBytes(data)); - } + /** + * Queues a message to be sent + * + * @param address the target address where the message should be sent + * @param data the data to send + */ + public void send(SocketAddress address, byte[] data){ + logger.finest("Sending Queue..."); + SocketChannel socket = getSocketChannel(address); - /** - * This method sends data true the given socket - * - * @param socket The socket - * @param data The data to send - */ - public void send(SocketChannel socket, byte[] data) { - queueSend(socket,data); - } - - /** - * Queues the message to be sent and wakeups the selector - * - * @param socket The socet to send the message thrue - * @param data The data to send - */ - protected void queueSend(SocketChannel socket, byte[] data){ - logger.finest("Sending Queue..."); // And queue the data we want written synchronized (pendingWriteData) { List queue = pendingWriteData.get(socket); if (queue == null) { - queue = new ArrayList(); + queue = new ArrayList<>(); pendingWriteData.put(socket, queue); } - //encrypts - if(encrypter != null) - queue.add(ByteBuffer.wrap(encrypter.encrypt(data))); - else queue.add(ByteBuffer.wrap(data)); + queue.add(ByteBuffer.wrap(data)); } // Changing the key state to write synchronized (pendingChanges) { // Indicate we want the interest ops set changed pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); } - logger.finest("selector.wakeup();"); // Finally, wake up our selecting thread so it can make the required changes selector.wakeup(); } + + + + public void run() { - logger.fine("NioNetwork Started!!!"); + logger.fine("NioNetwork Started."); while (true) { try { - // Process any pending changes + // Handle any pending changes synchronized (pendingChanges) { Iterator changes = pendingChanges.iterator(); while (changes.hasNext()) { @@ -200,16 +191,16 @@ public abstract class NioNetwork implements Runnable { pendingChanges.clear(); } - // Wait for an event one of the registered channels + // Wait for an event from one of the channels selector.select(); logger.finest("selector is awake"); // Iterate over the set of keys for which events are available - Iterator selectedKeys = selector.selectedKeys().iterator(); - while (selectedKeys.hasNext()) { - SelectionKey key = (SelectionKey) selectedKeys.next(); - selectedKeys.remove(); - logger.finest("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ); + Iterator selectedKeys = selector.selectedKeys().iterator(); + while (selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + logger.finest("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectible: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ); if (key.isValid()) { // Check what event is available and deal with it @@ -218,8 +209,8 @@ public abstract class NioNetwork implements Runnable { accept(key); } else if (key.isConnectable()) { - logger.finest("Finnishing Connection!!"); - finishConnection(key); + logger.finest("Establishing Connection!!"); + establishConnection(key); } else if (key.isWritable()) { logger.finest("Writing"); @@ -238,7 +229,7 @@ public abstract class NioNetwork implements Runnable { } /** - * Server + * Handle an accept event from a remote host. Channel can only be a server socket. */ private void accept(SelectionKey key) throws IOException { // For an accept to be pending the channel must be a server socket channel. @@ -254,15 +245,67 @@ public abstract class NioNetwork implements Runnable { socketChannel.register(selector, SelectionKey.OP_READ); // adds the client to the clients list - InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); - if(!clients.containsValue(remoteAdr)){ - clients.put(remoteAdr, new ClientData(socketChannel)); - logger.fine("New Connection("+remoteAdr+")!!! Count: "+clients.size()); - } + registerSocketChannel(socketChannel); + logger.fine("New Connection("+socketChannel.getRemoteAddress()+")!!! Count: "+clients.size()); } + /** + * Finnish an ongoing remote connection establishment procedure + */ + private void establishConnection(SelectionKey key){ + SocketChannel socketChannel = (SocketChannel) key.channel(); + + try { + // Finalize/Finish the connection. + socketChannel.finishConnect(); + + // Register an interest in writing on this channel + key.interestOps(SelectionKey.OP_WRITE); + + registerSocketChannel(socketChannel); + logger.fine("Connection established("+socketChannel.getRemoteAddress()+")"); + } catch (IOException e) { + // Cancel the channel's registration with our selector + e.printStackTrace(); + key.cancel(); + } + } + + + /** + * Writes data pending message into a specific socket defined by the key + */ + private void write(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + synchronized (pendingWriteData) { + List queue = pendingWriteData.get(socketChannel); + if(queue == null){ + queue = new ArrayList<>(); + pendingWriteData.put(socketChannel, queue); + } + + // Write until there's no more data ... + while (!queue.isEmpty()) { + ByteBuffer buf = queue.get(0); + socketChannel.write(buf); + if (buf.remaining() > 0) { + logger.finest("Write Buffer Full!"); + break; + } + queue.remove(0); + } + + if (queue.isEmpty()) { + // All data written, change selector interest + logger.finest("No more Data to write!"); + key.interestOps(SelectionKey.OP_READ); + } + } + } + /** - * Client and Server + * Handle a read event from a socket specified by the key. */ private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); @@ -281,12 +324,9 @@ public abstract class NioNetwork implements Runnable { key.cancel(); socketChannel.close(); clients.remove(remoteAdr); - pendingReadData.remove(socketChannel); pendingWriteData.remove(socketChannel); - logger.fine("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size()); - if(type == NetworkType.CLIENT) - throw new IOException("Server Closed The Connection!!!"); - return; + logger.fine("Connection forcibly closed("+remoteAdr+")! Remaining connections: "+clients.size()); + throw new IOException("Remote forcibly closed the connection"); } if (numRead == -1) { @@ -295,169 +335,80 @@ public abstract class NioNetwork implements Runnable { key.channel().close(); key.cancel(); clients.remove(remoteAdr); - pendingReadData.remove(socketChannel); pendingWriteData.remove(socketChannel); - logger.fine("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size()); - if(type == NetworkType.CLIENT) - throw new IOException("Server Closed The Connection!!!"); - return; + logger.fine("Connection Closed("+remoteAdr+")! Remaining connections: "+clients.size()); + throw new IOException("Remote closed the connection"); } - // Make a correctly sized copy of the data before handing it - // to the client + // Make a correctly sized copy of the data before handing it to the client byte[] rspByteData = new byte[numRead]; System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead); - if(encrypter != null)// Encryption - rspByteData = encrypter.decrypt(rspByteData); - - // Message Count 1m: 36750 - // Message Count 1s: 612 - if(!pendingReadData.containsKey(socketChannel)){ - pendingReadData.put(socketChannel, new DynamicByteArrayStream()); - } - DynamicByteArrayStream dynBuf = pendingReadData.get(socketChannel); - dynBuf.append(rspByteData); - - - Object rspData = null; + try{ - //rspData = Converter.toObject(rspByteData); - rspData = Converter.toObject(dynBuf); - handleRecivedMessage(socketChannel, rspData); - dynBuf.clear(); + Object rspData = Converter.toObject(rspByteData); + handleReceivedMessage(socketChannel, rspData); }catch(Exception e){ e.printStackTrace(); - dynBuf.reset(); } } - /** - * Client and Server - */ - private void write(SelectionKey key) throws IOException { - SocketChannel socketChannel = (SocketChannel) key.channel(); - synchronized (pendingWriteData) { - List queue = pendingWriteData.get(socketChannel); - if(queue == null){ - queue = new ArrayList(); - pendingWriteData.put(socketChannel, queue); - } + private void handleReceivedMessage(SocketChannel socketChannel, Object rspData){ + logger.finer("Handling incoming message..."); - // Write until there's not more data ... - while (!queue.isEmpty()) { - ByteBuffer buf = queue.get(0); - socketChannel.write(buf); - if (buf.remaining() > 0) { - // ... or the socket's buffer fills up - logger.finest("Write Buffer Full!!"); - break; - } - queue.remove(0); - } - - if (queue.isEmpty()) { - // We wrote away all data, so we're no longer interested - // in writing on this socket. Switch back to waiting for - // data. - logger.finest("No more Data to write!!"); - key.interestOps(SelectionKey.OP_READ); - } - } - } - - private void handleRecivedMessage(SocketChannel socketChannel, Object rspData){ - logger.finer("Handling incomming message..."); - - if(rspData instanceof SystemMessage){ - if(systemWorker != null){ - logger.finest("System Message!!!"); - systemWorker.processData(this, socketChannel, rspData); - } - else{ - logger.finer("Unhandled System Message!!!"); - } - } - else{ - // Hand the data off to our worker thread - if(worker != null){ - logger.finest("Worker Message!!!"); - worker.processData(this, socketChannel, rspData); - } - else{ - logger.fine("Unhandled Worker Message!!!"); - } - } - } - - /** - * Initializes a socket to a server - */ - protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException { - // Create a non-blocking socket channel - SocketChannel socketChannel = SocketChannel.open(); - socketChannel.socket().setReuseAddress(true); - socketChannel.configureBlocking(false); - logger.fine("Connecting to: "+address); - - // Kick off connection establishment - socketChannel.connect(address); - - // Queue a channel registration since the caller is not the - // selecting thread. As part of the registration we'll register - // an interest in connection events. These are raised when a channel - // is ready to complete connection establishment. - synchronized(this.pendingChanges) { - pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); - } - - return socketChannel; - } - - protected SocketChannel getSocketChannel(InetSocketAddress address){ - return clients.get(address).getSocketChannel(); - } - - /** - * Client - */ - private void finishConnection(SelectionKey key){ - SocketChannel socketChannel = (SocketChannel) key.channel(); - - // Finish the connection. If the connection operation failed - // this will raise an IOException. try { - socketChannel.finishConnect(); - } catch (IOException e) { - // Cancel the channel's registration with our selector - e.printStackTrace(); - key.cancel(); - return; - } - - // Register an interest in writing on this channel - key.interestOps(SelectionKey.OP_WRITE); + if (rspData instanceof SystemMessage) { + if (systemWorker != null) { + logger.finest("Handling system message"); + systemWorker.processData(this, socketChannel.getRemoteAddress(), rspData); + } else { + logger.finer("Unhandled system message!"); + } + } else { + // Hand the data off to our worker thread + if (worker != null) { + logger.finest("Handling generic worker message"); + worker.processData(this, socketChannel.getRemoteAddress(), rspData); + } else { + logger.fine("Unhandled message!"); + } + } + }catch (IOException e){ + e.printStackTrace(); + } } - /** - * Client - * @throws IOException - */ - protected void closeConnection(SocketChannel socketChannel) throws IOException{ + + private ClientData registerSocketChannel(SocketChannel socket){ + InetSocketAddress remoteAdr = (InetSocketAddress) socket.socket().getRemoteSocketAddress(); + if (!clients.containsKey(remoteAdr)) { + ClientData clientData = new ClientData(socket); + clients.put(remoteAdr, clientData); + } + return clients.get(remoteAdr); + } + private SocketChannel getSocketChannel(SocketAddress address){ + return clients.get(address).getSocketChannel(); + } + + + + + /** + * Close a ongoing connection + */ + protected void closeConnection(InetSocketAddress address) throws IOException{ + closeConnection(getSocketChannel(address)); + } + + private void closeConnection(SocketChannel socketChannel) throws IOException{ socketChannel.close(); socketChannel.keyFor(selector).cancel(); } - /** - * Client - * @throws IOException - */ - protected void closeConnection(InetSocketAddress address) throws IOException{ - closeConnection(getSocketChannel(address)); - } - /* - public void close() throws IOException{ + + /*public void close() throws IOException{ if(serverChannel != null){ serverChannel.close(); serverChannel.keyFor(selector).cancel(); @@ -465,7 +416,4 @@ public abstract class NioNetwork implements Runnable { selector.close(); }*/ - public NetworkType getType(){ - return type; - } } diff --git a/src/zutil/net/nio/NioServer.java b/src/zutil/net/nio/NioServer.java old mode 100644 new mode 100755 index 14f1192..93ca788 --- a/src/zutil/net/nio/NioServer.java +++ b/src/zutil/net/nio/NioServer.java @@ -34,11 +34,12 @@ import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class NioServer extends NioNetwork{ + /** * Creates a NioServer object which listens on localhost * - * @param port The port to listen to + * @param port the port to listen to */ public NioServer(int port) throws IOException { this(null, port); @@ -47,12 +48,11 @@ public class NioServer extends NioNetwork{ /** * Creates a NioServer object which listens to a specific address * - * @param address The address to listen to - * @param port The port to listen to + * @param address the address to listen to + * @param port the port to listen to */ public NioServer(InetAddress address, int port) throws IOException { - super(address, port, NetworkType.SERVER); - new Thread(this).start(); + super(new InetSocketAddress(address, port)); } protected Selector initSelector() throws IOException { @@ -65,8 +65,7 @@ public class NioServer extends NioNetwork{ serverChannel.configureBlocking(false); // Bind the server socket to the specified address and port - InetSocketAddress isa = new InetSocketAddress(address, port); - serverChannel.socket().bind(isa); + serverChannel.socket().bind(localAddress); // Register the server socket channel, indicating an interest in // accepting new connections @@ -74,17 +73,16 @@ public class NioServer extends NioNetwork{ return socketSelector; } - + /** * Broadcasts the message to all the connected clients * - * @param data The data to broadcast + * @param data the data to broadcast */ public void broadcast(byte[] data){ synchronized(clients){ - Iterator it = clients.keySet().iterator(); - while(it.hasNext()){ - send(it.next(), data); + for(InetSocketAddress target : clients.keySet()){ + send(target, data); } } } diff --git a/src/zutil/net/nio/message/type/EchoMessage.java b/src/zutil/net/nio/message/EchoMessage.java old mode 100644 new mode 100755 similarity index 78% rename from src/zutil/net/nio/message/type/EchoMessage.java rename to src/zutil/net/nio/message/EchoMessage.java index 6fcc22c..53e17af --- a/src/zutil/net/nio/message/type/EchoMessage.java +++ b/src/zutil/net/nio/message/EchoMessage.java @@ -1,58 +1,51 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.message.type; - -import zutil.net.nio.message.Message; - -/** - * The reciver will echo out this message to the sender - * - * @author Ziver - */ -public abstract class EchoMessage extends Message implements SystemMessage{ - private static final long serialVersionUID = 1L; - - private boolean echo; - - public EchoMessage(){ - echo = true; - } - - /** - * This method returns if the message should be echoed - * @return If the message should be echoed - */ - public boolean echo() { - return echo; - } - - /** - * Called by the reciver to disable looping of the message - * - */ - public void recived() { - echo = false; - } -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.message; + +/** + * The reciver will echo out this message to the sender + * + * @author Ziver + */ +public abstract class EchoMessage extends Message implements SystemMessage{ + private static final long serialVersionUID = 1L; + + private boolean echo = false; + + + /** + * @return true if this message is an echo of an original message + */ + public boolean echo() { + return echo; + } + + /** + * Called by the receiver to mark this message as an echo copy + */ + public void received() { + echo = true; + } +} diff --git a/src/zutil/net/nio/message/KeepAliveMessage.java b/src/zutil/net/nio/message/KeepAliveMessage.java old mode 100644 new mode 100755 index c265c06..84aa34d --- a/src/zutil/net/nio/message/KeepAliveMessage.java +++ b/src/zutil/net/nio/message/KeepAliveMessage.java @@ -24,8 +24,6 @@ package zutil.net.nio.message; -import zutil.net.nio.message.type.SystemMessage; - /** * Tells the destination that the * source is still online diff --git a/src/zutil/net/nio/message/type/ResponseRequestMessage.java b/src/zutil/net/nio/message/RequestResponseMessage.java old mode 100644 new mode 100755 similarity index 79% rename from src/zutil/net/nio/message/type/ResponseRequestMessage.java rename to src/zutil/net/nio/message/RequestResponseMessage.java index 7429215..9f255db --- a/src/zutil/net/nio/message/type/ResponseRequestMessage.java +++ b/src/zutil/net/nio/message/RequestResponseMessage.java @@ -1,42 +1,41 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.message.type; - -/** - * This interface means that the sender - * wants a reply from the destination - * - * @author Ziver - * - */ -public interface ResponseRequestMessage { - - /** - * The id of the response to identify the response event - * @return Response id - */ - public double getResponseId(); - -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.message; + +/** + * This interface defines a request response flow where a request + * message is sent to a server and a registered handler will handle the message. + * + * @author Ziver + * + */ +public interface RequestResponseMessage { + + /** + * @return a unique id for this message + */ + long getResponseId(); + +} diff --git a/src/zutil/net/nio/message/StringMessage.java b/src/zutil/net/nio/message/StringResponseMessage.java old mode 100644 new mode 100755 similarity index 79% rename from src/zutil/net/nio/message/StringMessage.java rename to src/zutil/net/nio/message/StringResponseMessage.java index f0d8d7e..1333593 --- a/src/zutil/net/nio/message/StringMessage.java +++ b/src/zutil/net/nio/message/StringResponseMessage.java @@ -1,58 +1,54 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.message; - -import zutil.net.nio.message.type.EchoMessage; -import zutil.net.nio.message.type.ResponseRequestMessage; - - - -public class StringMessage extends EchoMessage implements ResponseRequestMessage{ - private static final long serialVersionUID = 1L; - private double responseId; - - private String msg; - - public StringMessage(String msg){ - this.msg = msg; - responseId = Math.random(); - } - - public String getString(){ - return msg; - } - - public void setString(String msg){ - this.msg = msg; - } - - public String toString(){ - return getString(); - } - - public double getResponseId() { - return responseId; - } -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.message; + + +public class StringResponseMessage extends EchoMessage implements RequestResponseMessage { + private static final long serialVersionUID = 1L; + + private long responseId; + private String msg; + + public StringResponseMessage(String msg){ + this.msg = msg; + responseId = (long)(Math.random()*Long.MAX_VALUE); + } + + public String getString(){ + return msg; + } + + public void setString(String msg){ + this.msg = msg; + } + + public String toString(){ + return getString(); + } + + public long getResponseId() { + return responseId; + } +} diff --git a/src/zutil/net/nio/message/SyncMessage.java b/src/zutil/net/nio/message/SyncMessage.java old mode 100644 new mode 100755 index 950ae78..c9c9195 --- a/src/zutil/net/nio/message/SyncMessage.java +++ b/src/zutil/net/nio/message/SyncMessage.java @@ -24,8 +24,6 @@ package zutil.net.nio.message; -import zutil.net.nio.message.type.SystemMessage; - public class SyncMessage extends Message implements SystemMessage{ private static final long serialVersionUID = 1L; public static enum MessageType { REQUEST_ID, NEW, REMOVE, SYNC }; diff --git a/src/zutil/net/nio/message/type/SystemMessage.java b/src/zutil/net/nio/message/SystemMessage.java old mode 100644 new mode 100755 similarity index 94% rename from src/zutil/net/nio/message/type/SystemMessage.java rename to src/zutil/net/nio/message/SystemMessage.java index 5fc5508..5d50598 --- a/src/zutil/net/nio/message/type/SystemMessage.java +++ b/src/zutil/net/nio/message/SystemMessage.java @@ -1,36 +1,36 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.message.type; - -/** - * A message that implements this will be - * handeld internaly by the network engine - * - * @author Ziver - * - */ -public interface SystemMessage { - -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.message; + +/** + * A message that implements this will be + * handeld internaly by the network engine + * + * @author Ziver + * + */ +public interface SystemMessage { + +} diff --git a/src/zutil/net/nio/server/ChangeRequest.java b/src/zutil/net/nio/server/ChangeRequest.java old mode 100644 new mode 100755 index a752b93..ee59c61 --- a/src/zutil/net/nio/server/ChangeRequest.java +++ b/src/zutil/net/nio/server/ChangeRequest.java @@ -26,6 +26,7 @@ package zutil.net.nio.server; import java.nio.channels.SocketChannel; + public class ChangeRequest { public static final int REGISTER = 1; public static final int CHANGEOPS = 2; @@ -33,7 +34,8 @@ public class ChangeRequest { public SocketChannel socket; public int type; public int ops; - + + public ChangeRequest(SocketChannel socket, int type, int ops) { this.socket = socket; this.type = type; diff --git a/src/zutil/net/nio/server/ClientData.java b/src/zutil/net/nio/server/ClientData.java old mode 100644 new mode 100755 index 9fc4b13..e590acb --- a/src/zutil/net/nio/server/ClientData.java +++ b/src/zutil/net/nio/server/ClientData.java @@ -30,11 +30,13 @@ import java.nio.channels.SocketChannel; public class ClientData { private SocketChannel socketChannel; private long lastMessageReceived; - + + public ClientData(SocketChannel socketChannel){ this.socketChannel = socketChannel; } - + + public SocketChannel getSocketChannel(){ return socketChannel; } diff --git a/src/zutil/net/nio/service/NetworkService.java b/src/zutil/net/nio/service/NetworkService.java deleted file mode 100644 index f06fa1e..0000000 --- a/src/zutil/net/nio/service/NetworkService.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.service; - -import zutil.net.nio.NioNetwork; -import zutil.net.nio.message.Message; - -import java.nio.channels.SocketChannel; - -public abstract class NetworkService { - protected static NetworkService instance; - protected NioNetwork nio; - - public NetworkService(NioNetwork nio){ - instance = this; - this.nio = nio; - } - - public abstract void handleMessage(Message message, SocketChannel socket); - - /** - * @return A instance of this class - */ - public static NetworkService getInstance(){ - return instance; - } -} diff --git a/src/zutil/net/nio/worker/EchoWorker.java b/src/zutil/net/nio/worker/EchoWorker.java old mode 100644 new mode 100755 index 94dbe52..d2f4ae1 --- a/src/zutil/net/nio/worker/EchoWorker.java +++ b/src/zutil/net/nio/worker/EchoWorker.java @@ -24,18 +24,16 @@ package zutil.net.nio.worker; -import zutil.io.MultiPrintStream; - import java.io.IOException; + public class EchoWorker extends ThreadedEventWorker { @Override - public void messageEvent(WorkerDataEvent dataEvent) { + public void messageEvent(WorkerEventData dataEvent) { try { // Return to sender - MultiPrintStream.out.println("Recived Msg: "+dataEvent.data); - dataEvent.network.send(dataEvent.socket, dataEvent.data); + dataEvent.network.send(dataEvent.remoteAddress, dataEvent.data); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/zutil/net/nio/worker/SystemWorker.java b/src/zutil/net/nio/worker/SystemWorker.java old mode 100644 new mode 100755 index 43590c0..b5b1979 --- a/src/zutil/net/nio/worker/SystemWorker.java +++ b/src/zutil/net/nio/worker/SystemWorker.java @@ -26,15 +26,10 @@ package zutil.net.nio.worker; import zutil.log.LogUtil; import zutil.net.nio.NioNetwork; -import zutil.net.nio.message.ChatMessage; import zutil.net.nio.message.Message; -import zutil.net.nio.message.SyncMessage; -import zutil.net.nio.message.type.EchoMessage; -import zutil.net.nio.message.type.ResponseRequestMessage; +import zutil.net.nio.message.EchoMessage; +import zutil.net.nio.message.RequestResponseMessage; import zutil.net.nio.response.ResponseEvent; -import zutil.net.nio.service.NetworkService; -import zutil.net.nio.service.chat.ChatService; -import zutil.net.nio.service.sync.SyncService; import java.util.HashMap; import java.util.Map; @@ -43,41 +38,49 @@ import java.util.logging.Logger; public class SystemWorker extends ThreadedEventWorker { private static Logger logger = LogUtil.getLogger(); + private NioNetwork nio; - // Maps a SocketChannel to a RspHandler - private Map rspEvents = new HashMap(); - // Difren services listening on specific messages - private Map, NetworkService> services = new HashMap, NetworkService>(); + // Maps a responseId to a RspHandler + private Map rspEvents = new HashMap<>(); + // Different services listening on specific messages + private Map, ThreadedEventWorker> services = new HashMap<>(); + + + /** * Creates a new SystemWorker - * @param nio The Network */ public SystemWorker(NioNetwork nio){ this.nio = nio; } + + @Override - public void messageEvent(WorkerDataEvent event) { + public void messageEvent(WorkerEventData event) { try { logger.finer("System Message: "+event.data.getClass().getName()); if(event.data instanceof Message){ if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){ - // Echos back the recived message - ((EchoMessage)event.data).recived(); + // Echos back the received message + ((EchoMessage)event.data).received(); logger.finer("Echoing Message: "+event.data); - nio.send(event.socket, event.data); + nio.send(event.remoteAddress, event.data); } - else if(event.data instanceof ResponseRequestMessage && - rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){ - // Handle the response - handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data); + else if(event.data instanceof RequestResponseMessage && + rspEvents.get(((RequestResponseMessage)event.data).getResponseId()) != null){ + long responseId = ((RequestResponseMessage)event.data).getResponseId(); + // Look up the handler for this channel + ResponseEvent handler = rspEvents.get(responseId); + // And pass the response to it + handler.handleResponse(event.data); + rspEvents.remove(responseId); logger.finer("Response Request Message: "+event.data); } else{ - //Services - if(services.containsKey(event.data.getClass()) || - !services.containsKey(event.data.getClass()) && defaultServices(event.data)){ - services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket); + // Check mapped workers + if(services.containsKey(event.data.getClass())){ + services.get(event.data.getClass()).messageEvent(event); } } } @@ -87,67 +90,29 @@ public class SystemWorker extends ThreadedEventWorker { } /** - * Registers a Service to a specific message + * Maps a Worker to a specific message * - * @param c The Message class - * @param ns The service + * @param messageClass the received message class + * @param worker the worker that should handle the specified message type */ - public void registerService(Class c, NetworkService ns){ - services.put(c, ns); + public void registerWorker(Class messageClass, ThreadedEventWorker worker){ + services.put(messageClass, worker); } /** - * Unregisters a service - * - * @param c The class + * Un-maps a message class to a worker + * + * @param messageClass the received message class */ - public void unregisterService(Class c){ - services.remove(c); + public void unregisterWorker(Class messageClass){ + services.remove(messageClass); } /** - * Connects a ResponseHandler to a specific message - * @param handler The Handler - * @param data The Message + * Connects a ResponseHandler to a specific message object */ - public void addResponseHandler(ResponseEvent handler, ResponseRequestMessage data){ + public void addResponseHandler(ResponseEvent handler, RequestResponseMessage data){ rspEvents.put(data.getResponseId(), handler); } - /** - * Client And Server ResponseEvent - */ - private void handleResponse(double responseId, Object rspData){ - // Look up the handler for this channel - ResponseEvent handler = rspEvents.get(responseId); - // And pass the response to it - handler.handleResponse(rspData); - - rspEvents.remove(responseId); - } - - /** - * Registers the default services in the engin e - * if the message needs one of them - * - * @param o The message - */ - private boolean defaultServices(Object o){ - if(o instanceof SyncMessage){ - if(SyncService.getInstance() == null) - registerService(o.getClass(), new SyncService(nio)); - else - registerService(o.getClass(), SyncService.getInstance()); - return true; - } - else if(o instanceof ChatMessage){ - if(ChatService.getInstance() == null) - registerService(o.getClass(), new ChatService(nio)); - else - registerService(o.getClass(), ChatService.getInstance()); - return true; - } - return false; - } - } diff --git a/src/zutil/net/nio/worker/ThreadedEventWorker.java b/src/zutil/net/nio/worker/ThreadedEventWorker.java old mode 100644 new mode 100755 index b93f1c4..3603ecc --- a/src/zutil/net/nio/worker/ThreadedEventWorker.java +++ b/src/zutil/net/nio/worker/ThreadedEventWorker.java @@ -24,7 +24,7 @@ package zutil.net.nio.worker; -public abstract class ThreadedEventWorker extends Worker{ +public abstract class ThreadedEventWorker extends Worker implements Runnable{ private Thread thread; public ThreadedEventWorker(){ @@ -32,27 +32,17 @@ public abstract class ThreadedEventWorker extends Worker{ thread.start(); } - public void update() { - WorkerDataEvent dataEvent; - + public void run() { while(true) { try{ // Wait for data to become available - synchronized(getEventQueue()) { - while(getEventQueue().isEmpty()) { - try { - getEventQueue().wait(); - } catch (InterruptedException e) {} - } - dataEvent = (WorkerDataEvent) getEventQueue().remove(0); - } - messageEvent(dataEvent); + messageEvent(pollEvent()); } catch (Exception e) { e.printStackTrace(); } } } - public abstract void messageEvent(WorkerDataEvent e); + public abstract void messageEvent(WorkerEventData e); } diff --git a/src/zutil/net/nio/worker/Worker.java b/src/zutil/net/nio/worker/Worker.java old mode 100644 new mode 100755 index 81cd1d4..81aada1 --- a/src/zutil/net/nio/worker/Worker.java +++ b/src/zutil/net/nio/worker/Worker.java @@ -26,30 +26,24 @@ package zutil.net.nio.worker; import zutil.net.nio.NioNetwork; -import java.nio.channels.SocketChannel; +import java.net.SocketAddress; import java.util.LinkedList; import java.util.List; -public abstract class Worker implements Runnable { - private LinkedList queue = new LinkedList(); +public abstract class Worker { + private LinkedList queue = new LinkedList(); - public void processData(NioNetwork server, SocketChannel socket, Object data) { + public void processData(NioNetwork server, SocketAddress remote, Object data) { synchronized(queue) { - queue.add(new WorkerDataEvent(server, socket, data)); + queue.add(new WorkerEventData(server, remote, data)); queue.notify(); } } + /** - * @return The event queue - */ - protected List getEventQueue(){ - return queue; - } - - /** - * @return If there is a event in the queue + * @return true if there is a event in the queue */ protected boolean hasEvent(){ return !queue.isEmpty(); @@ -57,20 +51,17 @@ public abstract class Worker implements Runnable { /** * Polls a event from the list or waits until there is a event - * @return The next event + * + * @return the next event */ - protected WorkerDataEvent pollEvent(){ - while(queue.isEmpty()) { - try { - this.wait(); - } catch (InterruptedException e) {} + protected WorkerEventData pollEvent(){ + synchronized(queue) { + while (queue.isEmpty()) { + try { + queue.wait(); + } catch (InterruptedException e) {} + } } return queue.poll(); } - - public void run(){ - update(); - } - - public abstract void update(); } diff --git a/src/zutil/net/nio/worker/WorkerDataEvent.java b/src/zutil/net/nio/worker/WorkerEventData.java old mode 100644 new mode 100755 similarity index 85% rename from src/zutil/net/nio/worker/WorkerDataEvent.java rename to src/zutil/net/nio/worker/WorkerEventData.java index 38f1f7f..67f83d5 --- a/src/zutil/net/nio/worker/WorkerDataEvent.java +++ b/src/zutil/net/nio/worker/WorkerEventData.java @@ -1,42 +1,43 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.worker; - -import zutil.net.nio.NioNetwork; - -import java.nio.channels.SocketChannel; - - -public class WorkerDataEvent { - public NioNetwork network; - public SocketChannel socket; - public Object data; - - public WorkerDataEvent(NioNetwork server, SocketChannel socket, Object data) { - this.network = server; - this.socket = socket; - this.data = data; - } -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.worker; + +import zutil.net.nio.NioNetwork; + +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; + + +public class WorkerEventData { + public NioNetwork network; + public SocketAddress remoteAddress; + public Object data; + + public WorkerEventData(NioNetwork server, SocketAddress remoteAddress, Object data) { + this.network = server; + this.remoteAddress = remoteAddress; + this.data = data; + } +} diff --git a/src/zutil/net/nio/service/chat/ChatListener.java b/src/zutil/net/nio/worker/chat/ChatListener.java old mode 100644 new mode 100755 similarity index 94% rename from src/zutil/net/nio/service/chat/ChatListener.java rename to src/zutil/net/nio/worker/chat/ChatListener.java index 2c6cd09..3d99d9c --- a/src/zutil/net/nio/service/chat/ChatListener.java +++ b/src/zutil/net/nio/worker/chat/ChatListener.java @@ -1,34 +1,34 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.service.chat; - -/** - * Tis is a listener class for new chat messages - * @author Ziver - * - */ -public interface ChatListener { - public void messageAction(String msg, String room); -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.worker.chat; + +/** + * Tis is a listener class for new chat messages + * @author Ziver + * + */ +public interface ChatListener { + public void messageAction(String msg, String room); +} diff --git a/src/zutil/net/nio/service/chat/ChatService.java b/src/zutil/net/nio/worker/chat/ChatService.java old mode 100644 new mode 100755 similarity index 59% rename from src/zutil/net/nio/service/chat/ChatService.java rename to src/zutil/net/nio/worker/chat/ChatService.java index 55f7e1c..23e1bb6 --- a/src/zutil/net/nio/service/chat/ChatService.java +++ b/src/zutil/net/nio/worker/chat/ChatService.java @@ -1,157 +1,149 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.service.chat; - -import zutil.log.LogUtil; -import zutil.net.nio.NioNetwork; -import zutil.net.nio.message.ChatMessage; -import zutil.net.nio.message.Message; -import zutil.net.nio.service.NetworkService; - -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.logging.Logger; - -/** - * A simple chat service with users and rooms - * - * @author Ziver - */ -public class ChatService extends NetworkService{ - private static Logger logger = LogUtil.getLogger(); - private HashMap> rooms; - private ChatListener listener; - - public ChatService(NioNetwork nio){ - super(nio); - rooms = new HashMap>(); - } - - @Override - public void handleMessage(Message message, SocketChannel socket) { - try { - // New message - if(message instanceof ChatMessage){ - ChatMessage chatmessage = (ChatMessage)message; - //is this a new message - if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){ - // Is this the server - if(nio.getType() == NioNetwork.NetworkType.SERVER){ - if(rooms.containsKey(chatmessage.room)){ - LinkedList tmpList = rooms.get(chatmessage.room); - - // Broadcast the message - for(SocketChannel s : tmpList){ - if(s.isConnected()){ - nio.send(s, chatmessage); - } - else{ - unRegisterUser(chatmessage.room, s); - } - } - } - } - logger.finer("New Chat Message: "+chatmessage.msg); - listener.messageAction(chatmessage.msg, chatmessage.room); - } - // register to a room - else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){ - registerUser(chatmessage.room, socket); - } - // unregister to a room - else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){ - unRegisterUser(chatmessage.room, socket); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - * Registers a user to the main room - * - * @param socket The socket to the user - */ - public void registerUser(SocketChannel socket){ - registerUser("", socket); - } - - /** - * Registers the given user to a specific room - * - * @param room The room - * @param socket The socket to the user - */ - public void registerUser(String room, SocketChannel socket){ - addRoom(room); - logger.fine("New Chat User: "+socket); - rooms.get(room).add(socket); - } - - /** - * Unregisters a user from a room and removes the room if its empty - * - * @param room The room - * @param socket The socket to the user - */ - public void unRegisterUser(String room, SocketChannel socket){ - if(rooms.containsKey(room)){ - logger.fine("Remove Chat User: "+socket); - rooms.get(room).remove(socket); - removeRoom(room); - } - } - - /** - * Adds a room into the list - * - * @param room The name of the room - */ - public void addRoom(String room){ - if(!rooms.containsKey(room)){ - logger.fine("New Chat Room: "+room); - rooms.put(room, new LinkedList()); - } - } - - /** - * Removes the given room if its empty - * - * @param room The room - */ - public void removeRoom(String room){ - if(rooms.get(room).isEmpty()){ - logger.fine("Remove Chat Room: "+room); - rooms.remove(room); - } - } - - public static ChatService getInstance(){ - return (ChatService)instance; - } -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.worker.chat; + +import zutil.log.LogUtil; +import zutil.net.nio.NioNetwork; +import zutil.net.nio.message.ChatMessage; +import zutil.net.nio.message.Message; +import zutil.net.nio.worker.ThreadedEventWorker; +import zutil.net.nio.worker.WorkerEventData; + +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.logging.Logger; + +/** + * A simple chat service with users and rooms + * + * @author Ziver + */ +public class ChatService extends ThreadedEventWorker{ + private static Logger logger = LogUtil.getLogger(); + + private HashMap> rooms = new HashMap<>(); + private ChatListener listener; + + + + + + @Override + public void messageEvent(WorkerEventData event) { + try { + // New message + if(event.data instanceof ChatMessage){ + ChatMessage chatmessage = (ChatMessage)event.data; + //is this a new message + if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){ + // Is this the server + if(rooms.containsKey(chatmessage.room)){ + LinkedList tmpList = rooms.get(chatmessage.room); + + // Broadcast the message + for(SocketAddress remote : tmpList){ + event.network.send(remote, chatmessage); // TODO: should not be done for clients + } + } + logger.finer("New Chat Message: "+chatmessage.msg); + listener.messageAction(chatmessage.msg, chatmessage.room); + } + // register to a room + else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){ + registerUser(chatmessage.room, event.remoteAddress); + } + // unregister to a room + else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){ + unRegisterUser(chatmessage.room, event.remoteAddress); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * Registers a user to the main room + * + * @param remoteAddress the address of the remote user + */ + public void registerUser(SocketAddress remoteAddress){ + registerUser("", remoteAddress); + } + + /** + * Registers the given user to a specific room + * + * @param room the room name + * @param remoteAddress the address of the remote user + */ + public void registerUser(String room, SocketAddress remoteAddress){ + addRoom(room); + logger.fine("New Chat User: "+remoteAddress); + rooms.get(room).add(remoteAddress); + } + + /** + * Unregisters a user from a room and removes the room if its empty + * + * @param room the room name + * @param remoteAddress the address of the remote user + */ + public void unRegisterUser(String room, SocketAddress remoteAddress){ + if(rooms.containsKey(room)){ + logger.fine("Remove Chat User: "+remoteAddress); + rooms.get(room).remove(remoteAddress); + removeRoom(room); + } + } + + /** + * Adds a room into the list + * + * @param room The name of the room + */ + private void addRoom(String room){ + if(!rooms.containsKey(room)){ + logger.fine("New Chat Room: "+room); + rooms.put(room, new LinkedList<>()); + } + } + + /** + * Removes the given room if its empty + * + * @param room The room + */ + private void removeRoom(String room){ + if(rooms.get(room).isEmpty()){ + logger.fine("Remove Chat Room: "+room); + rooms.remove(room); + } + } + +} diff --git a/src/zutil/net/nio/worker/grid/GridClient.java b/src/zutil/net/nio/worker/grid/GridClient.java old mode 100644 new mode 100755 index efb7810..b3b5172 --- a/src/zutil/net/nio/worker/grid/GridClient.java +++ b/src/zutil/net/nio/worker/grid/GridClient.java @@ -28,7 +28,7 @@ import zutil.io.MultiPrintStream; import zutil.net.nio.NioClient; import zutil.net.nio.message.GridMessage; import zutil.net.nio.worker.ThreadedEventWorker; -import zutil.net.nio.worker.WorkerDataEvent; +import zutil.net.nio.worker.WorkerEventData; import java.io.IOException; import java.util.LinkedList; @@ -51,8 +51,8 @@ public class GridClient extends ThreadedEventWorker { * Creates a new GridClient object and registers itself at the server * and sets itself as a worker in NioClient * - * @param thread the Thread interface to run for the jobs - * @param network the NioClient to use to communicate to the server + * @param thread the Thread interface to run for the jobs + * @param network the NioClient to use to communicate to the server */ public GridClient(GridThread thread, NioClient network){ jobQueue = new LinkedList(); @@ -64,7 +64,6 @@ public class GridClient extends ThreadedEventWorker { /** * Starts up the client and a couple of GridThreads. * And registers itself as a worker in NioClient - * @throws IOException */ public void initiate() throws IOException{ network.setDefaultWorker(this); @@ -77,7 +76,7 @@ public class GridClient extends ThreadedEventWorker { } @Override - public void messageEvent(WorkerDataEvent e) { + public void messageEvent(WorkerEventData e) { // ignores other messages than GridMessage if(e.data instanceof GridMessage){ GridMessage msg = (GridMessage)e.data; @@ -96,10 +95,9 @@ public class GridClient extends ThreadedEventWorker { /** * Register whit the server that the job is done * - * @param jobID is the job id - * @param correct if the answer was right - * @param result the result of the computation - * @throws IOException + * @param jobID is the job id + * @param correct if the answer was right + * @param result the result of the computation */ public static void jobDone(int jobID, boolean correct, Object result) throws IOException{ if(correct) @@ -112,17 +110,16 @@ public class GridClient extends ThreadedEventWorker { * Registers with the server that there was an * error when computing this job * - * @param jobID is the job id + * @param jobId is the job id */ - public static void jobError(int jobID){ + public static void jobError(int jobId){ try{ - network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID)); + network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobId)); }catch(Exception e){e.printStackTrace();} } /** * @return a new job to compute - * @throws IOException */ public static synchronized GridJob getNextJob() throws IOException{ if(jobQueue.isEmpty()){ diff --git a/src/zutil/net/nio/worker/grid/GridJob.java b/src/zutil/net/nio/worker/grid/GridJob.java old mode 100644 new mode 100755 index f77a971..df21443 --- a/src/zutil/net/nio/worker/grid/GridJob.java +++ b/src/zutil/net/nio/worker/grid/GridJob.java @@ -33,13 +33,15 @@ public class GridJob{ public int jobID; public Object job; public long timestamp; - + + public GridJob(int jobID, Object job){ this.jobID = jobID; this.job = job; renewTimeStamp(); } - + + public void renewTimeStamp(){ timestamp = System.currentTimeMillis(); } diff --git a/src/zutil/net/nio/worker/grid/GridJobGenerator.java b/src/zutil/net/nio/worker/grid/GridJobGenerator.java old mode 100644 new mode 100755 index 0f69084..e1b9808 --- a/src/zutil/net/nio/worker/grid/GridJobGenerator.java +++ b/src/zutil/net/nio/worker/grid/GridJobGenerator.java @@ -34,9 +34,9 @@ public interface GridJobGenerator { /** * @return static and final values that do not change for every job */ - public Object initValues(); + Object initValues(); /** * @return a new generated job */ - public T generateJob(); + T generateJob(); } diff --git a/src/zutil/net/nio/worker/grid/GridResultHandler.java b/src/zutil/net/nio/worker/grid/GridResultHandler.java old mode 100644 new mode 100755 index 61d8b8b..d83cd78 --- a/src/zutil/net/nio/worker/grid/GridResultHandler.java +++ b/src/zutil/net/nio/worker/grid/GridResultHandler.java @@ -31,5 +31,5 @@ package zutil.net.nio.worker.grid; */ public interface GridResultHandler { - public void resultEvent(int jobID, boolean correct, T result); + void resultEvent(int jobID, boolean correct, T result); } diff --git a/src/zutil/net/nio/worker/grid/GridServerWorker.java b/src/zutil/net/nio/worker/grid/GridServerWorker.java old mode 100644 new mode 100755 index 0efefea..7fa6b63 --- a/src/zutil/net/nio/worker/grid/GridServerWorker.java +++ b/src/zutil/net/nio/worker/grid/GridServerWorker.java @@ -26,7 +26,7 @@ package zutil.net.nio.worker.grid; import zutil.net.nio.message.GridMessage; import zutil.net.nio.worker.ThreadedEventWorker; -import zutil.net.nio.worker.WorkerDataEvent; +import zutil.net.nio.worker.WorkerEventData; import java.io.IOException; import java.util.HashMap; @@ -41,41 +41,42 @@ import java.util.Queue; @SuppressWarnings({ "unchecked", "rawtypes" }) public class GridServerWorker extends ThreadedEventWorker{ // Job timeout after 30 min - public static int JOB_TIMEOUT = 1000*60*30; - + public int jobTimeout = 1000*60*30; private HashMap jobs; // contains all the ongoing jobs - private Queue reSendjobQueue; // Contains all the jobs that will be recalculated + private Queue resendJobQueue; // Contains all the jobs that will be recalculated private GridJobGenerator jobGenerator; // The job generator private GridResultHandler resHandler; private int nextJobID; + public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){ this.resHandler = resHandler; this.jobGenerator = jobGenerator; nextJobID = 0; - jobs = new HashMap(); - reSendjobQueue = new LinkedList(); + jobs = new HashMap<>(); + resendJobQueue = new LinkedList<>(); GridMaintainer maintainer = new GridMaintainer(); maintainer.start(); } + @Override - public void messageEvent(WorkerDataEvent e) { + public void messageEvent(WorkerEventData e) { try { // ignores other messages than GridMessage if(e.data instanceof GridMessage){ GridMessage msg = (GridMessage)e.data; - GridJob job = null; + GridJob job; switch(msg.messageType()){ case GridMessage.REGISTER: - e.network.send(e.socket, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues())); + e.network.send(e.remoteAddress, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues())); break; // Sending new data to compute to the client case GridMessage.NEW_DATA: - if(!reSendjobQueue.isEmpty()){ // checks first if there is a job for recalculation - job = reSendjobQueue.poll(); + if(!resendJobQueue.isEmpty()){ // checks first if there is a job for recalculation + job = resendJobQueue.poll(); job.renewTimeStamp(); } else{ // generates new job @@ -85,7 +86,7 @@ public class GridServerWorker extends ThreadedEventWorker{ nextJobID++; } GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job); - e.network.send(e.socket, newMsg); + e.network.send(e.remoteAddress, newMsg); break; // Received computation results @@ -97,7 +98,7 @@ public class GridServerWorker extends ThreadedEventWorker{ break; case GridMessage.COMP_ERROR: // marks the job for recalculation job = jobs.get(msg.getJobQueueID()); - reSendjobQueue.add(job); + resendJobQueue.add(job); break; } } @@ -108,23 +109,24 @@ public class GridServerWorker extends ThreadedEventWorker{ /** * Changes the job timeout value - * @param min is the timeout in minutes + * + * @param timeout is the timeout in minutes */ - public static void setJobTimeOut(int min){ - JOB_TIMEOUT = 1000*60*min; + public void setJobTimeout(int timeout){ + jobTimeout = 1000*60*timeout; } class GridMaintainer extends Thread{ /** * Runs some behind the scenes stuff - * like job timeout. + * like job garbage collection. */ public void run(){ while(true){ long time = System.currentTimeMillis(); for(int jobID : jobs.keySet()){ - if(time-jobs.get(jobID).timestamp > JOB_TIMEOUT){ - reSendjobQueue.add(jobs.get(jobID)); + if(time-jobs.get(jobID).timestamp > jobTimeout){ + resendJobQueue.add(jobs.get(jobID)); } } try{Thread.sleep(1000*60*1);}catch(Exception e){}; diff --git a/src/zutil/net/nio/worker/grid/GridThread.java b/src/zutil/net/nio/worker/grid/GridThread.java old mode 100644 new mode 100755 index 240a887..28d6103 --- a/src/zutil/net/nio/worker/grid/GridThread.java +++ b/src/zutil/net/nio/worker/grid/GridThread.java @@ -32,8 +32,7 @@ package zutil.net.nio.worker.grid; */ public abstract class GridThread implements Runnable{ /** - * The initial static and final data will be sent to this - * method. + * The initial static and final data will be sent to this method. * * @param data is the static and or final data */ @@ -55,8 +54,7 @@ public abstract class GridThread implements Runnable{ } /** - * Compute the given data and return - * @param data + * Compute the given data and return */ public abstract void compute(GridJob data) throws Exception; } diff --git a/src/zutil/net/nio/service/sync/ObjectSync.java b/src/zutil/net/nio/worker/sync/ObjectSync.java old mode 100644 new mode 100755 similarity index 93% rename from src/zutil/net/nio/service/sync/ObjectSync.java rename to src/zutil/net/nio/worker/sync/ObjectSync.java index 5b5c2d6..32aaf66 --- a/src/zutil/net/nio/service/sync/ObjectSync.java +++ b/src/zutil/net/nio/worker/sync/ObjectSync.java @@ -1,52 +1,51 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.service.sync; - -import zutil.net.nio.message.SyncMessage; - -public abstract class ObjectSync { - public String id; - - public ObjectSync(String id){ - this.id = id; - } - - /** - * Sends sync message if the object has bean changed - */ - public abstract void sendSync(); - - /** - * Applies the SyncMessage to the object - * @param message - * @param object - */ - public abstract void syncObject(SyncMessage message); - - /** - * Called when the object is removed from the sync list - */ - public abstract void remove(); -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.worker.sync; + +import zutil.net.nio.message.SyncMessage; + +public abstract class ObjectSync { + public String id; + + public ObjectSync(String id){ + this.id = id; + } + + /** + * Sends sync message if the object has bean changed + */ + public abstract void sendSync(); + + /** + * Applies the SyncMessage to the object + * @param message + */ + public abstract void syncObject(SyncMessage message); + + /** + * Called when the object is removed from the sync list + */ + public abstract void remove(); +} diff --git a/src/zutil/net/nio/service/sync/SyncService.java b/src/zutil/net/nio/worker/sync/SyncService.java old mode 100644 new mode 100755 similarity index 75% rename from src/zutil/net/nio/service/sync/SyncService.java rename to src/zutil/net/nio/worker/sync/SyncService.java index ff5eec4..ca55e7f --- a/src/zutil/net/nio/service/sync/SyncService.java +++ b/src/zutil/net/nio/worker/sync/SyncService.java @@ -1,84 +1,79 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Ziver Koc - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package zutil.net.nio.service.sync; - -import zutil.log.LogUtil; -import zutil.net.nio.NioNetwork; -import zutil.net.nio.message.Message; -import zutil.net.nio.message.SyncMessage; -import zutil.net.nio.service.NetworkService; - -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.logging.Logger; - -public class SyncService extends NetworkService{ - private static Logger logger = LogUtil.getLogger(); - // list of objects to sync - private HashMap sync; - - public SyncService(NioNetwork nio){ - super(nio); - sync = new HashMap(); - } - - /** - * Adds a SyncObject to the sync list - * @param os The object to sync - */ - public void addSyncObject(ObjectSync os){ - sync.put(os.id, os); - logger.fine("New Sync object: "+os); - } - - public void handleMessage(Message message, SocketChannel socket){ - if(message instanceof SyncMessage){ - SyncMessage syncMessage = (SyncMessage)message; - if(syncMessage.type == SyncMessage.MessageType.SYNC){ - ObjectSync obj = sync.get(syncMessage.id); - if(obj != null){ - logger.finer("Syncing Message..."); - obj.syncObject(syncMessage); - } - } - else if(syncMessage.type == SyncMessage.MessageType.REMOVE){ - sync.remove(syncMessage.id).remove(); - } - } - } - - /** - * Syncs all the objects whit the server - */ - public void sync(){ - for(String id : sync.keySet()){ - sync.get(id).sendSync(); - } - } - - public static SyncService getInstance(){ - return (SyncService)instance; - } -} +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Ziver Koc + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package zutil.net.nio.worker.sync; + +import zutil.log.LogUtil; +import zutil.net.nio.NioNetwork; +import zutil.net.nio.message.Message; +import zutil.net.nio.message.SyncMessage; +import zutil.net.nio.worker.ThreadedEventWorker; +import zutil.net.nio.worker.WorkerEventData; + +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.logging.Logger; + +public class SyncService extends ThreadedEventWorker{ + private static Logger logger = LogUtil.getLogger(); + + // list of objects to sync + private HashMap sync = new HashMap<>(); + + + + /** + * Adds a object to be synced + */ + public void addSyncObject(ObjectSync os){ + sync.put(os.id, os); + logger.fine("New Sync object: "+os); + } + + @Override + public void messageEvent(WorkerEventData event){ + if(event.data instanceof SyncMessage){ + SyncMessage syncMessage = (SyncMessage)event.data; + if(syncMessage.type == SyncMessage.MessageType.SYNC){ + ObjectSync obj = sync.get(syncMessage.id); + if(obj != null){ + logger.finer("Syncing Message..."); + obj.syncObject(syncMessage); + } + } + else if(syncMessage.type == SyncMessage.MessageType.REMOVE){ + sync.remove(syncMessage.id).remove(); + } + } + } + + /** + * Syncs all the objects whit the server + */ + public void sync(){ + for(String id : sync.keySet()){ + sync.get(id).sendSync(); + } + } +} diff --git a/test/zutil/net/nio/NetworkClientTest.java b/test/zutil/net/nio/NetworkClientTest.java index e99d145..28b5adf 100755 --- a/test/zutil/net/nio/NetworkClientTest.java +++ b/test/zutil/net/nio/NetworkClientTest.java @@ -24,27 +24,33 @@ package zutil.net.nio; -import zutil.net.nio.message.StringMessage; +import zutil.log.CompactLogFormatter; +import zutil.log.LogUtil; +import zutil.net.nio.message.StringResponseMessage; import zutil.net.nio.response.PrintRsp; import java.io.IOException; import java.net.InetAddress; import java.security.NoSuchAlgorithmException; +import java.util.logging.Level; @SuppressWarnings("unused") public class NetworkClientTest { - public static void main(String[] args) throws NoSuchAlgorithmException { + public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException { try { - int count = 0; + LogUtil.setGlobalLevel(Level.ALL); + LogUtil.setGlobalFormatter(new CompactLogFormatter()); + + int count = 0; long time = System.currentTimeMillis()+1000*60; NioClient client = new NioClient(InetAddress.getByName("localhost"), 6056); - //client.setEncrypter(new Encrypter("lol", Encrypter.PASSPHRASE_DES_ALGO)); + Thread.sleep(1000); while(time > System.currentTimeMillis()){ PrintRsp handler = new PrintRsp(); - client.send(handler, new StringMessage("StringMessage: "+count)); + client.send(handler, new StringResponseMessage("StringResponseMessage: "+count)); handler.waitForResponse(); - //try {Thread.sleep(100);} catch (InterruptedException e) {} + //Thread.sleep(100); //System.out.println("sending.."); count++; } diff --git a/test/zutil/net/nio/NetworkServerTest.java b/test/zutil/net/nio/NetworkServerTest.java index b987acd..2b5fb1f 100755 --- a/test/zutil/net/nio/NetworkServerTest.java +++ b/test/zutil/net/nio/NetworkServerTest.java @@ -24,16 +24,26 @@ package zutil.net.nio; +import zutil.log.CompactLogFormatter; +import zutil.log.LogUtil; + import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.logging.Level; @SuppressWarnings("unused") public class NetworkServerTest { - public static void main(String[] args) throws NoSuchAlgorithmException { + public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException { try { + LogUtil.setGlobalLevel(Level.ALL); + LogUtil.setGlobalFormatter(new CompactLogFormatter()); + NioServer server = new NioServer(6056); - //server.setEncrypter(new Encrypter("lol", Encrypter.PASSPHRASE_DES_ALGO)); + + while(true){ + Thread.sleep(1000); + } } catch (IOException e) { e.printStackTrace(); }