Refactoring of NioNetwork classes, there are still some bugs

This commit is contained in:
Ziver Koc 2016-12-07 19:16:37 +01:00
parent f9949ac9ef
commit 13082f0db0
29 changed files with 887 additions and 1062 deletions

32
src/zutil/net/nio/NioClient.java Normal file → Executable file
View file

@ -25,32 +25,28 @@
package zutil.net.nio; package zutil.net.nio;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.message.type.ResponseRequestMessage; import zutil.net.nio.message.RequestResponseMessage;
import zutil.net.nio.response.ResponseEvent; import zutil.net.nio.response.ResponseEvent;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
public class NioClient extends NioNetwork{ public class NioClient extends NioNetwork{
private SocketChannel serverSocket; private InetSocketAddress remoteAddress;
/** /**
* Creates a NioClient that connects to a server * Creates a NioClient that connects to a server
* *
* @param hostAddress The server address * @param remoteAddress the server address
* @param port The port to listen on * @param remotePort the port to listen on
*/ */
public NioClient(InetAddress serverAddress, int port) throws IOException { public NioClient(InetAddress remoteAddress, int remotePort) throws IOException {
super(InetAddress.getLocalHost(), port, NetworkType.CLIENT); this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort);
serverSocket = initiateConnection(new InetSocketAddress(serverAddress, port)); connect(this.remoteAddress);
Thread thread = new Thread(this);
thread.setDaemon(false);
thread.start();
} }
protected Selector initSelector() throws IOException { protected Selector initSelector() throws IOException {
@ -61,21 +57,19 @@ public class NioClient extends NioNetwork{
/** /**
* Sends a Message to the default server * Sends a Message to the default server
* *
* @param data The data to be sent * @param data the data to be sent
* @throws IOException Something got wrong
*/ */
public void send(Message data) throws IOException { public void send(Message data) throws IOException {
send(serverSocket, data); send(remoteAddress, data);
} }
/** /**
* This method is for the Client to send a message to the server * This method is for the Client to send a message to the server
* *
* @param handler The response handler * @param handler the response handler
* @param data The data to send * @param data the data to send
* @throws IOException
*/ */
public void send(ResponseEvent handler, ResponseRequestMessage data) throws IOException { public void send(ResponseEvent handler, RequestResponseMessage data) throws IOException {
send(serverSocket, handler, data); send(remoteAddress, handler, data);
} }
} }

View file

@ -24,13 +24,10 @@
package zutil.net.nio; package zutil.net.nio;
import zutil.Encrypter;
import zutil.converter.Converter; import zutil.converter.Converter;
import zutil.io.DynamicByteArrayStream;
import zutil.io.MultiPrintStream;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.message.type.ResponseRequestMessage; import zutil.net.nio.message.RequestResponseMessage;
import zutil.net.nio.message.type.SystemMessage; import zutil.net.nio.message.SystemMessage;
import zutil.net.nio.response.ResponseEvent; import zutil.net.nio.response.ResponseEvent;
import zutil.net.nio.server.ChangeRequest; import zutil.net.nio.server.ChangeRequest;
import zutil.net.nio.server.ClientData; import zutil.net.nio.server.ClientData;
@ -38,8 +35,8 @@ import zutil.net.nio.worker.SystemWorker;
import zutil.net.nio.worker.Worker; import zutil.net.nio.worker.Worker;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
@ -51,136 +48,130 @@ import java.util.logging.Logger;
public abstract class NioNetwork implements Runnable { public abstract class NioNetwork implements Runnable {
private static Logger logger = LogUtil.getLogger(); private static Logger logger = LogUtil.getLogger();
public static enum NetworkType {SERVER, CLIENT};
private NetworkType type;
// The host:port combination to listen on
protected InetAddress address;
protected int port;
protected SocketAddress localAddress;
// The channel on which we'll accept connections // The channel on which we'll accept connections
protected ServerSocketChannel serverChannel; protected ServerSocketChannel serverChannel;
// The selector we'll be monitoring // The selector we will be monitoring
private Selector selector; private Selector selector;
// The buffer into which we'll read data when it's available // The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192); private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
protected Worker worker; protected Worker worker;
protected SystemWorker systemWorker; protected SystemWorker systemWorker;
// This map contains all the clients that are conncted // This map contains all the clients that are connected
protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>(); protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>();
// A list of PendingChange instances // A list of PendingChange instances
private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>(); private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
// Maps a SocketChannel to a list of ByteBuffer instances // Maps a SocketChannel to a list of ByteBuffer instances
private Map<SocketChannel, List<ByteBuffer>> pendingWriteData = new HashMap<SocketChannel, List<ByteBuffer>>(); private Map<SocketChannel, List<ByteBuffer>> pendingWriteData = new HashMap<SocketChannel, List<ByteBuffer>>();
private Map<SocketChannel, DynamicByteArrayStream> pendingReadData = new HashMap<SocketChannel, DynamicByteArrayStream>();
// The encrypter
private Encrypter encrypter;
/** /**
* Create a nio network class * Create a client based Network object
*
* @param hostAddress The host address
* @param port The port
* @param type The type of network host
* @throws IOException
*/ */
public NioNetwork(InetAddress address, int port, NetworkType type) throws IOException { public NioNetwork() throws IOException {
this.port = port; this(null);
this.address = address;
this.type = type;
this.selector = initSelector();
this.systemWorker = new SystemWorker(this);
} }
/**
* Create a server based Network object
*
* @param localAddress the address the server will listen on
*/
public NioNetwork(SocketAddress localAddress) throws IOException {
this.localAddress = localAddress;
// init selector
this.selector = initSelector();
this.systemWorker = new SystemWorker(this);
// init traffic thread
new Thread(this).start();
}
protected abstract Selector initSelector() throws IOException; protected abstract Selector initSelector() throws IOException;
/** /**
* Sets the Worker for the network messages * Sets the default worker for non System messages.
* *
* @param worker The worker that handles the incoming messages * @param worker the worker that should handle incoming messages
*/ */
public void setDefaultWorker(Worker worker){ public void setDefaultWorker(Worker worker){
this.worker = worker; this.worker = worker;
} }
/**
* Sets the encrypter to use in the network
*
* @param enc The encrypter to use or null fo no encryption
*/
public void setEncrypter(Encrypter enc){
encrypter = enc;
MultiPrintStream.out.println("Network Encryption "+
(encrypter != null ? "Enabled("+encrypter.getAlgorithm()+")" : "Disabled")+"!!");
}
public void send(SocketChannel socket, Object data) throws IOException{ /**
send(socket, Converter.toBytes(data)); * Connect to a remote Server.
} */
protected void connect(SocketAddress address) throws IOException {
logger.fine("Connecting to: "+address);
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().setReuseAddress(true);
socketChannel.configureBlocking(false);
// Establish the Connection
socketChannel.connect(address);
public void send(InetSocketAddress address, Object data) throws IOException{ // Queue a channel registration
synchronized(this.pendingChanges) {
pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
selector.wakeup();
}
public void send(SocketAddress address, Object data) throws IOException{
send(address, Converter.toBytes(data)); send(address, Converter.toBytes(data));
} }
public void send(InetSocketAddress address, byte[] data){ public void send(SocketAddress address, ResponseEvent handler, RequestResponseMessage data) throws IOException {
send(getSocketChannel(address), data); // Register the response handler
} systemWorker.addResponseHandler(handler, data);
public void send(SocketChannel socket, ResponseEvent handler, ResponseRequestMessage data) throws IOException { send(address, Converter.toBytes(data));
// Register the response handler }
systemWorker.addResponseHandler(handler, data);
queueSend(socket,Converter.toBytes(data)); /**
} * Queues a message to be sent
*
* @param address the target address where the message should be sent
* @param data the data to send
*/
public void send(SocketAddress address, byte[] data){
logger.finest("Sending Queue...");
SocketChannel socket = getSocketChannel(address);
/**
* This method sends data true the given socket
*
* @param socket The socket
* @param data The data to send
*/
public void send(SocketChannel socket, byte[] data) {
queueSend(socket,data);
}
/**
* Queues the message to be sent and wakeups the selector
*
* @param socket The socet to send the message thrue
* @param data The data to send
*/
protected void queueSend(SocketChannel socket, byte[] data){
logger.finest("Sending Queue...");
// And queue the data we want written // And queue the data we want written
synchronized (pendingWriteData) { synchronized (pendingWriteData) {
List<ByteBuffer> queue = pendingWriteData.get(socket); List<ByteBuffer> queue = pendingWriteData.get(socket);
if (queue == null) { if (queue == null) {
queue = new ArrayList<ByteBuffer>(); queue = new ArrayList<>();
pendingWriteData.put(socket, queue); pendingWriteData.put(socket, queue);
} }
//encrypts queue.add(ByteBuffer.wrap(data));
if(encrypter != null)
queue.add(ByteBuffer.wrap(encrypter.encrypt(data)));
else queue.add(ByteBuffer.wrap(data));
} }
// Changing the key state to write // Changing the key state to write
synchronized (pendingChanges) { synchronized (pendingChanges) {
// Indicate we want the interest ops set changed // Indicate we want the interest ops set changed
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
} }
logger.finest("selector.wakeup();");
// Finally, wake up our selecting thread so it can make the required changes // Finally, wake up our selecting thread so it can make the required changes
selector.wakeup(); selector.wakeup();
} }
public void run() { public void run() {
logger.fine("NioNetwork Started!!!"); logger.fine("NioNetwork Started.");
while (true) { while (true) {
try { try {
// Process any pending changes // Handle any pending changes
synchronized (pendingChanges) { synchronized (pendingChanges) {
Iterator<ChangeRequest> changes = pendingChanges.iterator(); Iterator<ChangeRequest> changes = pendingChanges.iterator();
while (changes.hasNext()) { while (changes.hasNext()) {
@ -200,16 +191,16 @@ public abstract class NioNetwork implements Runnable {
pendingChanges.clear(); pendingChanges.clear();
} }
// Wait for an event one of the registered channels // Wait for an event from one of the channels
selector.select(); selector.select();
logger.finest("selector is awake"); logger.finest("selector is awake");
// Iterate over the set of keys for which events are available // Iterate over the set of keys for which events are available
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) { while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next(); SelectionKey key = selectedKeys.next();
selectedKeys.remove(); selectedKeys.remove();
logger.finest("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectable: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ); logger.finest("KeyOP: "+key.interestOps()+" isAcceptable: "+SelectionKey.OP_ACCEPT+" isConnectible: "+SelectionKey.OP_CONNECT+" isWritable: "+SelectionKey.OP_WRITE+" isReadable: "+SelectionKey.OP_READ);
if (key.isValid()) { if (key.isValid()) {
// Check what event is available and deal with it // Check what event is available and deal with it
@ -218,8 +209,8 @@ public abstract class NioNetwork implements Runnable {
accept(key); accept(key);
} }
else if (key.isConnectable()) { else if (key.isConnectable()) {
logger.finest("Finnishing Connection!!"); logger.finest("Establishing Connection!!");
finishConnection(key); establishConnection(key);
} }
else if (key.isWritable()) { else if (key.isWritable()) {
logger.finest("Writing"); logger.finest("Writing");
@ -238,7 +229,7 @@ public abstract class NioNetwork implements Runnable {
} }
/** /**
* Server * Handle an accept event from a remote host. Channel can only be a server socket.
*/ */
private void accept(SelectionKey key) throws IOException { private void accept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel. // For an accept to be pending the channel must be a server socket channel.
@ -254,15 +245,67 @@ public abstract class NioNetwork implements Runnable {
socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.register(selector, SelectionKey.OP_READ);
// adds the client to the clients list // adds the client to the clients list
InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); registerSocketChannel(socketChannel);
if(!clients.containsValue(remoteAdr)){ logger.fine("New Connection("+socketChannel.getRemoteAddress()+")!!! Count: "+clients.size());
clients.put(remoteAdr, new ClientData(socketChannel));
logger.fine("New Connection("+remoteAdr+")!!! Count: "+clients.size());
}
} }
/**
* Finnish an ongoing remote connection establishment procedure
*/
private void establishConnection(SelectionKey key){
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
// Finalize/Finish the connection.
socketChannel.finishConnect();
// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
registerSocketChannel(socketChannel);
logger.fine("Connection established("+socketChannel.getRemoteAddress()+")");
} catch (IOException e) {
// Cancel the channel's registration with our selector
e.printStackTrace();
key.cancel();
}
}
/**
* Writes data pending message into a specific socket defined by the key
*/
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (pendingWriteData) {
List<ByteBuffer> queue = pendingWriteData.get(socketChannel);
if(queue == null){
queue = new ArrayList<>();
pendingWriteData.put(socketChannel, queue);
}
// Write until there's no more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
logger.finest("Write Buffer Full!");
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// All data written, change selector interest
logger.finest("No more Data to write!");
key.interestOps(SelectionKey.OP_READ);
}
}
}
/** /**
* Client and Server * Handle a read event from a socket specified by the key.
*/ */
private void read(SelectionKey key) throws IOException { private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel(); SocketChannel socketChannel = (SocketChannel) key.channel();
@ -281,12 +324,9 @@ public abstract class NioNetwork implements Runnable {
key.cancel(); key.cancel();
socketChannel.close(); socketChannel.close();
clients.remove(remoteAdr); clients.remove(remoteAdr);
pendingReadData.remove(socketChannel);
pendingWriteData.remove(socketChannel); pendingWriteData.remove(socketChannel);
logger.fine("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size()); logger.fine("Connection forcibly closed("+remoteAdr+")! Remaining connections: "+clients.size());
if(type == NetworkType.CLIENT) throw new IOException("Remote forcibly closed the connection");
throw new IOException("Server Closed The Connection!!!");
return;
} }
if (numRead == -1) { if (numRead == -1) {
@ -295,169 +335,80 @@ public abstract class NioNetwork implements Runnable {
key.channel().close(); key.channel().close();
key.cancel(); key.cancel();
clients.remove(remoteAdr); clients.remove(remoteAdr);
pendingReadData.remove(socketChannel);
pendingWriteData.remove(socketChannel); pendingWriteData.remove(socketChannel);
logger.fine("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size()); logger.fine("Connection Closed("+remoteAdr+")! Remaining connections: "+clients.size());
if(type == NetworkType.CLIENT) throw new IOException("Remote closed the connection");
throw new IOException("Server Closed The Connection!!!");
return;
} }
// Make a correctly sized copy of the data before handing it // Make a correctly sized copy of the data before handing it to the client
// to the client
byte[] rspByteData = new byte[numRead]; byte[] rspByteData = new byte[numRead];
System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead); System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead);
if(encrypter != null)// Encryption
rspByteData = encrypter.decrypt(rspByteData);
// Message Count 1m: 36750
// Message Count 1s: 612
if(!pendingReadData.containsKey(socketChannel)){
pendingReadData.put(socketChannel, new DynamicByteArrayStream());
}
DynamicByteArrayStream dynBuf = pendingReadData.get(socketChannel);
dynBuf.append(rspByteData);
Object rspData = null;
try{ try{
//rspData = Converter.toObject(rspByteData); Object rspData = Converter.toObject(rspByteData);
rspData = Converter.toObject(dynBuf); handleReceivedMessage(socketChannel, rspData);
handleRecivedMessage(socketChannel, rspData);
dynBuf.clear();
}catch(Exception e){ }catch(Exception e){
e.printStackTrace(); e.printStackTrace();
dynBuf.reset();
} }
} }
/**
* Client and Server
*/
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (pendingWriteData) { private void handleReceivedMessage(SocketChannel socketChannel, Object rspData){
List<ByteBuffer> queue = pendingWriteData.get(socketChannel); logger.finer("Handling incoming message...");
if(queue == null){
queue = new ArrayList<ByteBuffer>();
pendingWriteData.put(socketChannel, queue);
}
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
logger.finest("Write Buffer Full!!");
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
logger.finest("No more Data to write!!");
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void handleRecivedMessage(SocketChannel socketChannel, Object rspData){
logger.finer("Handling incomming message...");
if(rspData instanceof SystemMessage){
if(systemWorker != null){
logger.finest("System Message!!!");
systemWorker.processData(this, socketChannel, rspData);
}
else{
logger.finer("Unhandled System Message!!!");
}
}
else{
// Hand the data off to our worker thread
if(worker != null){
logger.finest("Worker Message!!!");
worker.processData(this, socketChannel, rspData);
}
else{
logger.fine("Unhandled Worker Message!!!");
}
}
}
/**
* Initializes a socket to a server
*/
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().setReuseAddress(true);
socketChannel.configureBlocking(false);
logger.fine("Connecting to: "+address);
// Kick off connection establishment
socketChannel.connect(address);
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
protected SocketChannel getSocketChannel(InetSocketAddress address){
return clients.get(address).getSocketChannel();
}
/**
* Client
*/
private void finishConnection(SelectionKey key){
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try { try {
socketChannel.finishConnect(); if (rspData instanceof SystemMessage) {
} catch (IOException e) { if (systemWorker != null) {
// Cancel the channel's registration with our selector logger.finest("Handling system message");
e.printStackTrace(); systemWorker.processData(this, socketChannel.getRemoteAddress(), rspData);
key.cancel(); } else {
return; logger.finer("Unhandled system message!");
} }
} else {
// Register an interest in writing on this channel // Hand the data off to our worker thread
key.interestOps(SelectionKey.OP_WRITE); if (worker != null) {
logger.finest("Handling generic worker message");
worker.processData(this, socketChannel.getRemoteAddress(), rspData);
} else {
logger.fine("Unhandled message!");
}
}
}catch (IOException e){
e.printStackTrace();
}
} }
/**
* Client private ClientData registerSocketChannel(SocketChannel socket){
* @throws IOException InetSocketAddress remoteAdr = (InetSocketAddress) socket.socket().getRemoteSocketAddress();
*/ if (!clients.containsKey(remoteAdr)) {
protected void closeConnection(SocketChannel socketChannel) throws IOException{ ClientData clientData = new ClientData(socket);
clients.put(remoteAdr, clientData);
}
return clients.get(remoteAdr);
}
private SocketChannel getSocketChannel(SocketAddress address){
return clients.get(address).getSocketChannel();
}
/**
* Close a ongoing connection
*/
protected void closeConnection(InetSocketAddress address) throws IOException{
closeConnection(getSocketChannel(address));
}
private void closeConnection(SocketChannel socketChannel) throws IOException{
socketChannel.close(); socketChannel.close();
socketChannel.keyFor(selector).cancel(); socketChannel.keyFor(selector).cancel();
} }
/**
* Client
* @throws IOException
*/
protected void closeConnection(InetSocketAddress address) throws IOException{
closeConnection(getSocketChannel(address));
}
/*
public void close() throws IOException{ /*public void close() throws IOException{
if(serverChannel != null){ if(serverChannel != null){
serverChannel.close(); serverChannel.close();
serverChannel.keyFor(selector).cancel(); serverChannel.keyFor(selector).cancel();
@ -465,7 +416,4 @@ public abstract class NioNetwork implements Runnable {
selector.close(); selector.close();
}*/ }*/
public NetworkType getType(){
return type;
}
} }

22
src/zutil/net/nio/NioServer.java Normal file → Executable file
View file

@ -34,11 +34,12 @@ import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator; import java.util.Iterator;
public class NioServer extends NioNetwork{ public class NioServer extends NioNetwork{
/** /**
* Creates a NioServer object which listens on localhost * Creates a NioServer object which listens on localhost
* *
* @param port The port to listen to * @param port the port to listen to
*/ */
public NioServer(int port) throws IOException { public NioServer(int port) throws IOException {
this(null, port); this(null, port);
@ -47,12 +48,11 @@ public class NioServer extends NioNetwork{
/** /**
* Creates a NioServer object which listens to a specific address * Creates a NioServer object which listens to a specific address
* *
* @param address The address to listen to * @param address the address to listen to
* @param port The port to listen to * @param port the port to listen to
*/ */
public NioServer(InetAddress address, int port) throws IOException { public NioServer(InetAddress address, int port) throws IOException {
super(address, port, NetworkType.SERVER); super(new InetSocketAddress(address, port));
new Thread(this).start();
} }
protected Selector initSelector() throws IOException { protected Selector initSelector() throws IOException {
@ -65,8 +65,7 @@ public class NioServer extends NioNetwork{
serverChannel.configureBlocking(false); serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port // Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(address, port); serverChannel.socket().bind(localAddress);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in // Register the server socket channel, indicating an interest in
// accepting new connections // accepting new connections
@ -74,17 +73,16 @@ public class NioServer extends NioNetwork{
return socketSelector; return socketSelector;
} }
/** /**
* Broadcasts the message to all the connected clients * Broadcasts the message to all the connected clients
* *
* @param data The data to broadcast * @param data the data to broadcast
*/ */
public void broadcast(byte[] data){ public void broadcast(byte[] data){
synchronized(clients){ synchronized(clients){
Iterator<InetSocketAddress> it = clients.keySet().iterator(); for(InetSocketAddress target : clients.keySet()){
while(it.hasNext()){ send(target, data);
send(it.next(), data);
} }
} }
} }

View file

@ -1,58 +1,51 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message.type; package zutil.net.nio.message;
import zutil.net.nio.message.Message; /**
* The reciver will echo out this message to the sender
/** *
* The reciver will echo out this message to the sender * @author Ziver
* */
* @author Ziver public abstract class EchoMessage extends Message implements SystemMessage{
*/ private static final long serialVersionUID = 1L;
public abstract class EchoMessage extends Message implements SystemMessage{
private static final long serialVersionUID = 1L; private boolean echo = false;
private boolean echo;
/**
public EchoMessage(){ * @return true if this message is an echo of an original message
echo = true; */
} public boolean echo() {
return echo;
/** }
* This method returns if the message should be echoed
* @return If the message should be echoed /**
*/ * Called by the receiver to mark this message as an echo copy
public boolean echo() { */
return echo; public void received() {
} echo = true;
}
/** }
* Called by the reciver to disable looping of the message
*
*/
public void recived() {
echo = false;
}
}

2
src/zutil/net/nio/message/KeepAliveMessage.java Normal file → Executable file
View file

@ -24,8 +24,6 @@
package zutil.net.nio.message; package zutil.net.nio.message;
import zutil.net.nio.message.type.SystemMessage;
/** /**
* Tells the destination that the * Tells the destination that the
* source is still online * source is still online

View file

@ -1,42 +1,41 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message.type; package zutil.net.nio.message;
/** /**
* This interface means that the sender * This interface defines a request response flow where a request
* wants a reply from the destination * message is sent to a server and a registered handler will handle the message.
* *
* @author Ziver * @author Ziver
* *
*/ */
public interface ResponseRequestMessage { public interface RequestResponseMessage {
/** /**
* The id of the response to identify the response event * @return a unique id for this message
* @return Response id */
*/ long getResponseId();
public double getResponseId();
}
}

View file

@ -1,58 +1,54 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message; package zutil.net.nio.message;
import zutil.net.nio.message.type.EchoMessage;
import zutil.net.nio.message.type.ResponseRequestMessage; public class StringResponseMessage extends EchoMessage implements RequestResponseMessage {
private static final long serialVersionUID = 1L;
private long responseId;
public class StringMessage extends EchoMessage implements ResponseRequestMessage{ private String msg;
private static final long serialVersionUID = 1L;
private double responseId; public StringResponseMessage(String msg){
this.msg = msg;
private String msg; responseId = (long)(Math.random()*Long.MAX_VALUE);
}
public StringMessage(String msg){
this.msg = msg; public String getString(){
responseId = Math.random(); return msg;
} }
public String getString(){ public void setString(String msg){
return msg; this.msg = msg;
} }
public void setString(String msg){ public String toString(){
this.msg = msg; return getString();
} }
public String toString(){ public long getResponseId() {
return getString(); return responseId;
} }
}
public double getResponseId() {
return responseId;
}
}

2
src/zutil/net/nio/message/SyncMessage.java Normal file → Executable file
View file

@ -24,8 +24,6 @@
package zutil.net.nio.message; package zutil.net.nio.message;
import zutil.net.nio.message.type.SystemMessage;
public class SyncMessage extends Message implements SystemMessage{ public class SyncMessage extends Message implements SystemMessage{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static enum MessageType { REQUEST_ID, NEW, REMOVE, SYNC }; public static enum MessageType { REQUEST_ID, NEW, REMOVE, SYNC };

View file

@ -1,36 +1,36 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.message.type; package zutil.net.nio.message;
/** /**
* A message that implements this will be * A message that implements this will be
* handeld internaly by the network engine * handeld internaly by the network engine
* *
* @author Ziver * @author Ziver
* *
*/ */
public interface SystemMessage { public interface SystemMessage {
} }

4
src/zutil/net/nio/server/ChangeRequest.java Normal file → Executable file
View file

@ -26,6 +26,7 @@ package zutil.net.nio.server;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
public class ChangeRequest { public class ChangeRequest {
public static final int REGISTER = 1; public static final int REGISTER = 1;
public static final int CHANGEOPS = 2; public static final int CHANGEOPS = 2;
@ -33,7 +34,8 @@ public class ChangeRequest {
public SocketChannel socket; public SocketChannel socket;
public int type; public int type;
public int ops; public int ops;
public ChangeRequest(SocketChannel socket, int type, int ops) { public ChangeRequest(SocketChannel socket, int type, int ops) {
this.socket = socket; this.socket = socket;
this.type = type; this.type = type;

6
src/zutil/net/nio/server/ClientData.java Normal file → Executable file
View file

@ -30,11 +30,13 @@ import java.nio.channels.SocketChannel;
public class ClientData { public class ClientData {
private SocketChannel socketChannel; private SocketChannel socketChannel;
private long lastMessageReceived; private long lastMessageReceived;
public ClientData(SocketChannel socketChannel){ public ClientData(SocketChannel socketChannel){
this.socketChannel = socketChannel; this.socketChannel = socketChannel;
} }
public SocketChannel getSocketChannel(){ public SocketChannel getSocketChannel(){
return socketChannel; return socketChannel;
} }

View file

@ -1,49 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Ziver Koc
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package zutil.net.nio.service;
import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.Message;
import java.nio.channels.SocketChannel;
public abstract class NetworkService {
protected static NetworkService instance;
protected NioNetwork nio;
public NetworkService(NioNetwork nio){
instance = this;
this.nio = nio;
}
public abstract void handleMessage(Message message, SocketChannel socket);
/**
* @return A instance of this class
*/
public static NetworkService getInstance(){
return instance;
}
}

8
src/zutil/net/nio/worker/EchoWorker.java Normal file → Executable file
View file

@ -24,18 +24,16 @@
package zutil.net.nio.worker; package zutil.net.nio.worker;
import zutil.io.MultiPrintStream;
import java.io.IOException; import java.io.IOException;
public class EchoWorker extends ThreadedEventWorker { public class EchoWorker extends ThreadedEventWorker {
@Override @Override
public void messageEvent(WorkerDataEvent dataEvent) { public void messageEvent(WorkerEventData dataEvent) {
try { try {
// Return to sender // Return to sender
MultiPrintStream.out.println("Recived Msg: "+dataEvent.data); dataEvent.network.send(dataEvent.remoteAddress, dataEvent.data);
dataEvent.network.send(dataEvent.socket, dataEvent.data);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }

113
src/zutil/net/nio/worker/SystemWorker.java Normal file → Executable file
View file

@ -26,15 +26,10 @@ package zutil.net.nio.worker;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.ChatMessage;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.message.SyncMessage; import zutil.net.nio.message.EchoMessage;
import zutil.net.nio.message.type.EchoMessage; import zutil.net.nio.message.RequestResponseMessage;
import zutil.net.nio.message.type.ResponseRequestMessage;
import zutil.net.nio.response.ResponseEvent; import zutil.net.nio.response.ResponseEvent;
import zutil.net.nio.service.NetworkService;
import zutil.net.nio.service.chat.ChatService;
import zutil.net.nio.service.sync.SyncService;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -43,41 +38,49 @@ import java.util.logging.Logger;
public class SystemWorker extends ThreadedEventWorker { public class SystemWorker extends ThreadedEventWorker {
private static Logger logger = LogUtil.getLogger(); private static Logger logger = LogUtil.getLogger();
private NioNetwork nio; private NioNetwork nio;
// Maps a SocketChannel to a RspHandler // Maps a responseId to a RspHandler
private Map<Double, ResponseEvent> rspEvents = new HashMap<Double, ResponseEvent>(); private Map<Long, ResponseEvent> rspEvents = new HashMap<>();
// Difren services listening on specific messages // Different services listening on specific messages
private Map<Class<?>, NetworkService> services = new HashMap<Class<?>, NetworkService>(); private Map<Class<?>, ThreadedEventWorker> services = new HashMap<>();
/** /**
* Creates a new SystemWorker * Creates a new SystemWorker
* @param nio The Network
*/ */
public SystemWorker(NioNetwork nio){ public SystemWorker(NioNetwork nio){
this.nio = nio; this.nio = nio;
} }
@Override @Override
public void messageEvent(WorkerDataEvent event) { public void messageEvent(WorkerEventData event) {
try { try {
logger.finer("System Message: "+event.data.getClass().getName()); logger.finer("System Message: "+event.data.getClass().getName());
if(event.data instanceof Message){ if(event.data instanceof Message){
if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){ if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){
// Echos back the recived message // Echos back the received message
((EchoMessage)event.data).recived(); ((EchoMessage)event.data).received();
logger.finer("Echoing Message: "+event.data); logger.finer("Echoing Message: "+event.data);
nio.send(event.socket, event.data); nio.send(event.remoteAddress, event.data);
} }
else if(event.data instanceof ResponseRequestMessage && else if(event.data instanceof RequestResponseMessage &&
rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){ rspEvents.get(((RequestResponseMessage)event.data).getResponseId()) != null){
// Handle the response long responseId = ((RequestResponseMessage)event.data).getResponseId();
handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data); // Look up the handler for this channel
ResponseEvent handler = rspEvents.get(responseId);
// And pass the response to it
handler.handleResponse(event.data);
rspEvents.remove(responseId);
logger.finer("Response Request Message: "+event.data); logger.finer("Response Request Message: "+event.data);
} }
else{ else{
//Services // Check mapped workers
if(services.containsKey(event.data.getClass()) || if(services.containsKey(event.data.getClass())){
!services.containsKey(event.data.getClass()) && defaultServices(event.data)){ services.get(event.data.getClass()).messageEvent(event);
services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket);
} }
} }
} }
@ -87,67 +90,29 @@ public class SystemWorker extends ThreadedEventWorker {
} }
/** /**
* Registers a Service to a specific message * Maps a Worker to a specific message
* *
* @param c The Message class * @param messageClass the received message class
* @param ns The service * @param worker the worker that should handle the specified message type
*/ */
public void registerService(Class<?> c, NetworkService ns){ public void registerWorker(Class<?> messageClass, ThreadedEventWorker worker){
services.put(c, ns); services.put(messageClass, worker);
} }
/** /**
* Unregisters a service * Un-maps a message class to a worker
* *
* @param c The class * @param messageClass the received message class
*/ */
public void unregisterService(Class<?> c){ public void unregisterWorker(Class<?> messageClass){
services.remove(c); services.remove(messageClass);
} }
/** /**
* Connects a ResponseHandler to a specific message * Connects a ResponseHandler to a specific message object
* @param handler The Handler
* @param data The Message
*/ */
public void addResponseHandler(ResponseEvent handler, ResponseRequestMessage data){ public void addResponseHandler(ResponseEvent handler, RequestResponseMessage data){
rspEvents.put(data.getResponseId(), handler); rspEvents.put(data.getResponseId(), handler);
} }
/**
* Client And Server ResponseEvent
*/
private void handleResponse(double responseId, Object rspData){
// Look up the handler for this channel
ResponseEvent handler = rspEvents.get(responseId);
// And pass the response to it
handler.handleResponse(rspData);
rspEvents.remove(responseId);
}
/**
* Registers the default services in the engin e
* if the message needs one of them
*
* @param o The message
*/
private boolean defaultServices(Object o){
if(o instanceof SyncMessage){
if(SyncService.getInstance() == null)
registerService(o.getClass(), new SyncService(nio));
else
registerService(o.getClass(), SyncService.getInstance());
return true;
}
else if(o instanceof ChatMessage){
if(ChatService.getInstance() == null)
registerService(o.getClass(), new ChatService(nio));
else
registerService(o.getClass(), ChatService.getInstance());
return true;
}
return false;
}
} }

18
src/zutil/net/nio/worker/ThreadedEventWorker.java Normal file → Executable file
View file

@ -24,7 +24,7 @@
package zutil.net.nio.worker; package zutil.net.nio.worker;
public abstract class ThreadedEventWorker extends Worker{ public abstract class ThreadedEventWorker extends Worker implements Runnable{
private Thread thread; private Thread thread;
public ThreadedEventWorker(){ public ThreadedEventWorker(){
@ -32,27 +32,17 @@ public abstract class ThreadedEventWorker extends Worker{
thread.start(); thread.start();
} }
public void update() { public void run() {
WorkerDataEvent dataEvent;
while(true) { while(true) {
try{ try{
// Wait for data to become available // Wait for data to become available
synchronized(getEventQueue()) { messageEvent(pollEvent());
while(getEventQueue().isEmpty()) {
try {
getEventQueue().wait();
} catch (InterruptedException e) {}
}
dataEvent = (WorkerDataEvent) getEventQueue().remove(0);
}
messageEvent(dataEvent);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
public abstract void messageEvent(WorkerDataEvent e);
public abstract void messageEvent(WorkerEventData e);
} }

41
src/zutil/net/nio/worker/Worker.java Normal file → Executable file
View file

@ -26,30 +26,24 @@ package zutil.net.nio.worker;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import java.nio.channels.SocketChannel; import java.net.SocketAddress;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
public abstract class Worker implements Runnable { public abstract class Worker {
private LinkedList<WorkerDataEvent> queue = new LinkedList<WorkerDataEvent>(); private LinkedList<WorkerEventData> queue = new LinkedList<WorkerEventData>();
public void processData(NioNetwork server, SocketChannel socket, Object data) { public void processData(NioNetwork server, SocketAddress remote, Object data) {
synchronized(queue) { synchronized(queue) {
queue.add(new WorkerDataEvent(server, socket, data)); queue.add(new WorkerEventData(server, remote, data));
queue.notify(); queue.notify();
} }
} }
/** /**
* @return The event queue * @return true if there is a event in the queue
*/
protected List<WorkerDataEvent> getEventQueue(){
return queue;
}
/**
* @return If there is a event in the queue
*/ */
protected boolean hasEvent(){ protected boolean hasEvent(){
return !queue.isEmpty(); return !queue.isEmpty();
@ -57,20 +51,17 @@ public abstract class Worker implements Runnable {
/** /**
* Polls a event from the list or waits until there is a event * Polls a event from the list or waits until there is a event
* @return The next event *
* @return the next event
*/ */
protected WorkerDataEvent pollEvent(){ protected WorkerEventData pollEvent(){
while(queue.isEmpty()) { synchronized(queue) {
try { while (queue.isEmpty()) {
this.wait(); try {
} catch (InterruptedException e) {} queue.wait();
} catch (InterruptedException e) {}
}
} }
return queue.poll(); return queue.poll();
} }
public void run(){
update();
}
public abstract void update();
} }

View file

@ -1,42 +1,43 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.worker; package zutil.net.nio.worker;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import java.nio.channels.SocketChannel; import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
public class WorkerDataEvent {
public NioNetwork network; public class WorkerEventData {
public SocketChannel socket; public NioNetwork network;
public Object data; public SocketAddress remoteAddress;
public Object data;
public WorkerDataEvent(NioNetwork server, SocketChannel socket, Object data) {
this.network = server; public WorkerEventData(NioNetwork server, SocketAddress remoteAddress, Object data) {
this.socket = socket; this.network = server;
this.data = data; this.remoteAddress = remoteAddress;
} this.data = data;
} }
}

View file

@ -1,34 +1,34 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.service.chat; package zutil.net.nio.worker.chat;
/** /**
* Tis is a listener class for new chat messages * Tis is a listener class for new chat messages
* @author Ziver * @author Ziver
* *
*/ */
public interface ChatListener { public interface ChatListener {
public void messageAction(String msg, String room); public void messageAction(String msg, String room);
} }

View file

@ -1,157 +1,149 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.service.chat; package zutil.net.nio.worker.chat;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.ChatMessage; import zutil.net.nio.message.ChatMessage;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.service.NetworkService; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData;
import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.net.SocketAddress;
import java.util.LinkedList; import java.nio.channels.SocketChannel;
import java.util.logging.Logger; import java.util.HashMap;
import java.util.LinkedList;
/** import java.util.logging.Logger;
* A simple chat service with users and rooms
* /**
* @author Ziver * A simple chat service with users and rooms
*/ *
public class ChatService extends NetworkService{ * @author Ziver
private static Logger logger = LogUtil.getLogger(); */
private HashMap<String,LinkedList<SocketChannel>> rooms; public class ChatService extends ThreadedEventWorker{
private ChatListener listener; private static Logger logger = LogUtil.getLogger();
public ChatService(NioNetwork nio){ private HashMap<String,LinkedList<SocketAddress>> rooms = new HashMap<>();
super(nio); private ChatListener listener;
rooms = new HashMap<String,LinkedList<SocketChannel>>();
}
@Override
public void handleMessage(Message message, SocketChannel socket) {
try { @Override
// New message public void messageEvent(WorkerEventData event) {
if(message instanceof ChatMessage){ try {
ChatMessage chatmessage = (ChatMessage)message; // New message
//is this a new message if(event.data instanceof ChatMessage){
if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){ ChatMessage chatmessage = (ChatMessage)event.data;
// Is this the server //is this a new message
if(nio.getType() == NioNetwork.NetworkType.SERVER){ if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){
if(rooms.containsKey(chatmessage.room)){ // Is this the server
LinkedList<SocketChannel> tmpList = rooms.get(chatmessage.room); if(rooms.containsKey(chatmessage.room)){
LinkedList<SocketAddress> tmpList = rooms.get(chatmessage.room);
// Broadcast the message
for(SocketChannel s : tmpList){ // Broadcast the message
if(s.isConnected()){ for(SocketAddress remote : tmpList){
nio.send(s, chatmessage); event.network.send(remote, chatmessage); // TODO: should not be done for clients
} }
else{ }
unRegisterUser(chatmessage.room, s); logger.finer("New Chat Message: "+chatmessage.msg);
} listener.messageAction(chatmessage.msg, chatmessage.room);
} }
} // register to a room
} else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){
logger.finer("New Chat Message: "+chatmessage.msg); registerUser(chatmessage.room, event.remoteAddress);
listener.messageAction(chatmessage.msg, chatmessage.room); }
} // unregister to a room
// register to a room else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){
else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){ unRegisterUser(chatmessage.room, event.remoteAddress);
registerUser(chatmessage.room, socket); }
} }
// unregister to a room } catch (Exception e) {
else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){ e.printStackTrace();
unRegisterUser(chatmessage.room, socket); }
}
} }
} catch (Exception e) {
e.printStackTrace(); /**
} * Registers a user to the main room
*
} * @param remoteAddress the address of the remote user
*/
/** public void registerUser(SocketAddress remoteAddress){
* Registers a user to the main room registerUser("", remoteAddress);
* }
* @param socket The socket to the user
*/ /**
public void registerUser(SocketChannel socket){ * Registers the given user to a specific room
registerUser("", socket); *
} * @param room the room name
* @param remoteAddress the address of the remote user
/** */
* Registers the given user to a specific room public void registerUser(String room, SocketAddress remoteAddress){
* addRoom(room);
* @param room The room logger.fine("New Chat User: "+remoteAddress);
* @param socket The socket to the user rooms.get(room).add(remoteAddress);
*/ }
public void registerUser(String room, SocketChannel socket){
addRoom(room); /**
logger.fine("New Chat User: "+socket); * Unregisters a user from a room and removes the room if its empty
rooms.get(room).add(socket); *
} * @param room the room name
* @param remoteAddress the address of the remote user
/** */
* Unregisters a user from a room and removes the room if its empty public void unRegisterUser(String room, SocketAddress remoteAddress){
* if(rooms.containsKey(room)){
* @param room The room logger.fine("Remove Chat User: "+remoteAddress);
* @param socket The socket to the user rooms.get(room).remove(remoteAddress);
*/ removeRoom(room);
public void unRegisterUser(String room, SocketChannel socket){ }
if(rooms.containsKey(room)){ }
logger.fine("Remove Chat User: "+socket);
rooms.get(room).remove(socket); /**
removeRoom(room); * Adds a room into the list
} *
} * @param room The name of the room
*/
/** private void addRoom(String room){
* Adds a room into the list if(!rooms.containsKey(room)){
* logger.fine("New Chat Room: "+room);
* @param room The name of the room rooms.put(room, new LinkedList<>());
*/ }
public void addRoom(String room){ }
if(!rooms.containsKey(room)){
logger.fine("New Chat Room: "+room); /**
rooms.put(room, new LinkedList<SocketChannel>()); * Removes the given room if its empty
} *
} * @param room The room
*/
/** private void removeRoom(String room){
* Removes the given room if its empty if(rooms.get(room).isEmpty()){
* logger.fine("Remove Chat Room: "+room);
* @param room The room rooms.remove(room);
*/ }
public void removeRoom(String room){ }
if(rooms.get(room).isEmpty()){
logger.fine("Remove Chat Room: "+room); }
rooms.remove(room);
}
}
public static ChatService getInstance(){
return (ChatService)instance;
}
}

23
src/zutil/net/nio/worker/grid/GridClient.java Normal file → Executable file
View file

@ -28,7 +28,7 @@ import zutil.io.MultiPrintStream;
import zutil.net.nio.NioClient; import zutil.net.nio.NioClient;
import zutil.net.nio.message.GridMessage; import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerDataEvent; import zutil.net.nio.worker.WorkerEventData;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
@ -51,8 +51,8 @@ public class GridClient extends ThreadedEventWorker {
* Creates a new GridClient object and registers itself at the server * Creates a new GridClient object and registers itself at the server
* and sets itself as a worker in NioClient * and sets itself as a worker in NioClient
* *
* @param thread the Thread interface to run for the jobs * @param thread the Thread interface to run for the jobs
* @param network the NioClient to use to communicate to the server * @param network the NioClient to use to communicate to the server
*/ */
public GridClient(GridThread thread, NioClient network){ public GridClient(GridThread thread, NioClient network){
jobQueue = new LinkedList<GridJob>(); jobQueue = new LinkedList<GridJob>();
@ -64,7 +64,6 @@ public class GridClient extends ThreadedEventWorker {
/** /**
* Starts up the client and a couple of GridThreads. * Starts up the client and a couple of GridThreads.
* And registers itself as a worker in NioClient * And registers itself as a worker in NioClient
* @throws IOException
*/ */
public void initiate() throws IOException{ public void initiate() throws IOException{
network.setDefaultWorker(this); network.setDefaultWorker(this);
@ -77,7 +76,7 @@ public class GridClient extends ThreadedEventWorker {
} }
@Override @Override
public void messageEvent(WorkerDataEvent e) { public void messageEvent(WorkerEventData e) {
// ignores other messages than GridMessage // ignores other messages than GridMessage
if(e.data instanceof GridMessage){ if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data; GridMessage msg = (GridMessage)e.data;
@ -96,10 +95,9 @@ public class GridClient extends ThreadedEventWorker {
/** /**
* Register whit the server that the job is done * Register whit the server that the job is done
* *
* @param jobID is the job id * @param jobID is the job id
* @param correct if the answer was right * @param correct if the answer was right
* @param result the result of the computation * @param result the result of the computation
* @throws IOException
*/ */
public static void jobDone(int jobID, boolean correct, Object result) throws IOException{ public static void jobDone(int jobID, boolean correct, Object result) throws IOException{
if(correct) if(correct)
@ -112,17 +110,16 @@ public class GridClient extends ThreadedEventWorker {
* Registers with the server that there was an * Registers with the server that there was an
* error when computing this job * error when computing this job
* *
* @param jobID is the job id * @param jobId is the job id
*/ */
public static void jobError(int jobID){ public static void jobError(int jobId){
try{ try{
network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID)); network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobId));
}catch(Exception e){e.printStackTrace();} }catch(Exception e){e.printStackTrace();}
} }
/** /**
* @return a new job to compute * @return a new job to compute
* @throws IOException
*/ */
public static synchronized GridJob getNextJob() throws IOException{ public static synchronized GridJob getNextJob() throws IOException{
if(jobQueue.isEmpty()){ if(jobQueue.isEmpty()){

6
src/zutil/net/nio/worker/grid/GridJob.java Normal file → Executable file
View file

@ -33,13 +33,15 @@ public class GridJob{
public int jobID; public int jobID;
public Object job; public Object job;
public long timestamp; public long timestamp;
public GridJob(int jobID, Object job){ public GridJob(int jobID, Object job){
this.jobID = jobID; this.jobID = jobID;
this.job = job; this.job = job;
renewTimeStamp(); renewTimeStamp();
} }
public void renewTimeStamp(){ public void renewTimeStamp(){
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
} }

4
src/zutil/net/nio/worker/grid/GridJobGenerator.java Normal file → Executable file
View file

@ -34,9 +34,9 @@ public interface GridJobGenerator<T> {
/** /**
* @return static and final values that do not change for every job * @return static and final values that do not change for every job
*/ */
public Object initValues(); Object initValues();
/** /**
* @return a new generated job * @return a new generated job
*/ */
public T generateJob(); T generateJob();
} }

2
src/zutil/net/nio/worker/grid/GridResultHandler.java Normal file → Executable file
View file

@ -31,5 +31,5 @@ package zutil.net.nio.worker.grid;
*/ */
public interface GridResultHandler<T> { public interface GridResultHandler<T> {
public void resultEvent(int jobID, boolean correct, T result); void resultEvent(int jobID, boolean correct, T result);
} }

40
src/zutil/net/nio/worker/grid/GridServerWorker.java Normal file → Executable file
View file

@ -26,7 +26,7 @@ package zutil.net.nio.worker.grid;
import zutil.net.nio.message.GridMessage; import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerDataEvent; import zutil.net.nio.worker.WorkerEventData;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -41,41 +41,42 @@ import java.util.Queue;
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public class GridServerWorker extends ThreadedEventWorker{ public class GridServerWorker extends ThreadedEventWorker{
// Job timeout after 30 min // Job timeout after 30 min
public static int JOB_TIMEOUT = 1000*60*30; public int jobTimeout = 1000*60*30;
private HashMap<Integer, GridJob> jobs; // contains all the ongoing jobs private HashMap<Integer, GridJob> jobs; // contains all the ongoing jobs
private Queue<GridJob> reSendjobQueue; // Contains all the jobs that will be recalculated private Queue<GridJob> resendJobQueue; // Contains all the jobs that will be recalculated
private GridJobGenerator jobGenerator; // The job generator private GridJobGenerator jobGenerator; // The job generator
private GridResultHandler resHandler; private GridResultHandler resHandler;
private int nextJobID; private int nextJobID;
public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){ public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){
this.resHandler = resHandler; this.resHandler = resHandler;
this.jobGenerator = jobGenerator; this.jobGenerator = jobGenerator;
nextJobID = 0; nextJobID = 0;
jobs = new HashMap<Integer, GridJob>(); jobs = new HashMap<>();
reSendjobQueue = new LinkedList<GridJob>(); resendJobQueue = new LinkedList<>();
GridMaintainer maintainer = new GridMaintainer(); GridMaintainer maintainer = new GridMaintainer();
maintainer.start(); maintainer.start();
} }
@Override @Override
public void messageEvent(WorkerDataEvent e) { public void messageEvent(WorkerEventData e) {
try { try {
// ignores other messages than GridMessage // ignores other messages than GridMessage
if(e.data instanceof GridMessage){ if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data; GridMessage msg = (GridMessage)e.data;
GridJob job = null; GridJob job;
switch(msg.messageType()){ switch(msg.messageType()){
case GridMessage.REGISTER: case GridMessage.REGISTER:
e.network.send(e.socket, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues())); e.network.send(e.remoteAddress, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues()));
break; break;
// Sending new data to compute to the client // Sending new data to compute to the client
case GridMessage.NEW_DATA: case GridMessage.NEW_DATA:
if(!reSendjobQueue.isEmpty()){ // checks first if there is a job for recalculation if(!resendJobQueue.isEmpty()){ // checks first if there is a job for recalculation
job = reSendjobQueue.poll(); job = resendJobQueue.poll();
job.renewTimeStamp(); job.renewTimeStamp();
} }
else{ // generates new job else{ // generates new job
@ -85,7 +86,7 @@ public class GridServerWorker extends ThreadedEventWorker{
nextJobID++; nextJobID++;
} }
GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job); GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job);
e.network.send(e.socket, newMsg); e.network.send(e.remoteAddress, newMsg);
break; break;
// Received computation results // Received computation results
@ -97,7 +98,7 @@ public class GridServerWorker extends ThreadedEventWorker{
break; break;
case GridMessage.COMP_ERROR: // marks the job for recalculation case GridMessage.COMP_ERROR: // marks the job for recalculation
job = jobs.get(msg.getJobQueueID()); job = jobs.get(msg.getJobQueueID());
reSendjobQueue.add(job); resendJobQueue.add(job);
break; break;
} }
} }
@ -108,23 +109,24 @@ public class GridServerWorker extends ThreadedEventWorker{
/** /**
* Changes the job timeout value * Changes the job timeout value
* @param min is the timeout in minutes *
* @param timeout is the timeout in minutes
*/ */
public static void setJobTimeOut(int min){ public void setJobTimeout(int timeout){
JOB_TIMEOUT = 1000*60*min; jobTimeout = 1000*60*timeout;
} }
class GridMaintainer extends Thread{ class GridMaintainer extends Thread{
/** /**
* Runs some behind the scenes stuff * Runs some behind the scenes stuff
* like job timeout. * like job garbage collection.
*/ */
public void run(){ public void run(){
while(true){ while(true){
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
for(int jobID : jobs.keySet()){ for(int jobID : jobs.keySet()){
if(time-jobs.get(jobID).timestamp > JOB_TIMEOUT){ if(time-jobs.get(jobID).timestamp > jobTimeout){
reSendjobQueue.add(jobs.get(jobID)); resendJobQueue.add(jobs.get(jobID));
} }
} }
try{Thread.sleep(1000*60*1);}catch(Exception e){}; try{Thread.sleep(1000*60*1);}catch(Exception e){};

6
src/zutil/net/nio/worker/grid/GridThread.java Normal file → Executable file
View file

@ -32,8 +32,7 @@ package zutil.net.nio.worker.grid;
*/ */
public abstract class GridThread implements Runnable{ public abstract class GridThread implements Runnable{
/** /**
* The initial static and final data will be sent to this * The initial static and final data will be sent to this method.
* method.
* *
* @param data is the static and or final data * @param data is the static and or final data
*/ */
@ -55,8 +54,7 @@ public abstract class GridThread implements Runnable{
} }
/** /**
* Compute the given data and return * Compute the given data and return
* @param data
*/ */
public abstract void compute(GridJob data) throws Exception; public abstract void compute(GridJob data) throws Exception;
} }

View file

@ -1,52 +1,51 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.service.sync; package zutil.net.nio.worker.sync;
import zutil.net.nio.message.SyncMessage; import zutil.net.nio.message.SyncMessage;
public abstract class ObjectSync { public abstract class ObjectSync {
public String id; public String id;
public ObjectSync(String id){ public ObjectSync(String id){
this.id = id; this.id = id;
} }
/** /**
* Sends sync message if the object has bean changed * Sends sync message if the object has bean changed
*/ */
public abstract void sendSync(); public abstract void sendSync();
/** /**
* Applies the SyncMessage to the object * Applies the SyncMessage to the object
* @param message * @param message
* @param object */
*/ public abstract void syncObject(SyncMessage message);
public abstract void syncObject(SyncMessage message);
/**
/** * Called when the object is removed from the sync list
* Called when the object is removed from the sync list */
*/ public abstract void remove();
public abstract void remove(); }
}

View file

@ -1,84 +1,79 @@
/* /*
* The MIT License (MIT) * The MIT License (MIT)
* *
* Copyright (c) 2015 Ziver Koc * Copyright (c) 2015 Ziver Koc
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy * Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal * of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights * in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is * copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions: * furnished to do so, subject to the following conditions:
* *
* The above copyright notice and this permission notice shall be included in * The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software. * all copies or substantial portions of the Software.
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
package zutil.net.nio.service.sync; package zutil.net.nio.worker.sync;
import zutil.log.LogUtil; import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork; import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.Message; import zutil.net.nio.message.Message;
import zutil.net.nio.message.SyncMessage; import zutil.net.nio.message.SyncMessage;
import zutil.net.nio.service.NetworkService; import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerEventData;
import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.nio.channels.SocketChannel;
import java.util.logging.Logger; import java.util.HashMap;
import java.util.logging.Logger;
public class SyncService extends NetworkService{
private static Logger logger = LogUtil.getLogger(); public class SyncService extends ThreadedEventWorker{
// list of objects to sync private static Logger logger = LogUtil.getLogger();
private HashMap<String, ObjectSync> sync;
// list of objects to sync
public SyncService(NioNetwork nio){ private HashMap<String, ObjectSync> sync = new HashMap<>();
super(nio);
sync = new HashMap<String, ObjectSync>();
}
/**
/** * Adds a object to be synced
* Adds a SyncObject to the sync list */
* @param os The object to sync public void addSyncObject(ObjectSync os){
*/ sync.put(os.id, os);
public void addSyncObject(ObjectSync os){ logger.fine("New Sync object: "+os);
sync.put(os.id, os); }
logger.fine("New Sync object: "+os);
} @Override
public void messageEvent(WorkerEventData event){
public void handleMessage(Message message, SocketChannel socket){ if(event.data instanceof SyncMessage){
if(message instanceof SyncMessage){ SyncMessage syncMessage = (SyncMessage)event.data;
SyncMessage syncMessage = (SyncMessage)message; if(syncMessage.type == SyncMessage.MessageType.SYNC){
if(syncMessage.type == SyncMessage.MessageType.SYNC){ ObjectSync obj = sync.get(syncMessage.id);
ObjectSync obj = sync.get(syncMessage.id); if(obj != null){
if(obj != null){ logger.finer("Syncing Message...");
logger.finer("Syncing Message..."); obj.syncObject(syncMessage);
obj.syncObject(syncMessage); }
} }
} else if(syncMessage.type == SyncMessage.MessageType.REMOVE){
else if(syncMessage.type == SyncMessage.MessageType.REMOVE){ sync.remove(syncMessage.id).remove();
sync.remove(syncMessage.id).remove(); }
} }
} }
}
/**
/** * Syncs all the objects whit the server
* Syncs all the objects whit the server */
*/ public void sync(){
public void sync(){ for(String id : sync.keySet()){
for(String id : sync.keySet()){ sync.get(id).sendSync();
sync.get(id).sendSync(); }
} }
} }
public static SyncService getInstance(){
return (SyncService)instance;
}
}

View file

@ -24,27 +24,33 @@
package zutil.net.nio; package zutil.net.nio;
import zutil.net.nio.message.StringMessage; import zutil.log.CompactLogFormatter;
import zutil.log.LogUtil;
import zutil.net.nio.message.StringResponseMessage;
import zutil.net.nio.response.PrintRsp; import zutil.net.nio.response.PrintRsp;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.logging.Level;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class NetworkClientTest { public class NetworkClientTest {
public static void main(String[] args) throws NoSuchAlgorithmException { public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException {
try { try {
int count = 0; LogUtil.setGlobalLevel(Level.ALL);
LogUtil.setGlobalFormatter(new CompactLogFormatter());
int count = 0;
long time = System.currentTimeMillis()+1000*60; long time = System.currentTimeMillis()+1000*60;
NioClient client = new NioClient(InetAddress.getByName("localhost"), 6056); NioClient client = new NioClient(InetAddress.getByName("localhost"), 6056);
//client.setEncrypter(new Encrypter("lol", Encrypter.PASSPHRASE_DES_ALGO)); Thread.sleep(1000);
while(time > System.currentTimeMillis()){ while(time > System.currentTimeMillis()){
PrintRsp handler = new PrintRsp(); PrintRsp handler = new PrintRsp();
client.send(handler, new StringMessage("StringMessage: "+count)); client.send(handler, new StringResponseMessage("StringResponseMessage: "+count));
handler.waitForResponse(); handler.waitForResponse();
//try {Thread.sleep(100);} catch (InterruptedException e) {} //Thread.sleep(100);
//System.out.println("sending.."); //System.out.println("sending..");
count++; count++;
} }

View file

@ -24,16 +24,26 @@
package zutil.net.nio; package zutil.net.nio;
import zutil.log.CompactLogFormatter;
import zutil.log.LogUtil;
import java.io.IOException; import java.io.IOException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.logging.Level;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class NetworkServerTest { public class NetworkServerTest {
public static void main(String[] args) throws NoSuchAlgorithmException { public static void main(String[] args) throws NoSuchAlgorithmException, InterruptedException {
try { try {
LogUtil.setGlobalLevel(Level.ALL);
LogUtil.setGlobalFormatter(new CompactLogFormatter());
NioServer server = new NioServer(6056); NioServer server = new NioServer(6056);
//server.setEncrypter(new Encrypter("lol", Encrypter.PASSPHRASE_DES_ALGO));
while(true){
Thread.sleep(1000);
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }