diff --git a/src/ei/engine/network/ChangeRequest.java b/src/ei/engine/network/ChangeRequest.java new file mode 100644 index 0000000..4174d06 --- /dev/null +++ b/src/ei/engine/network/ChangeRequest.java @@ -0,0 +1,18 @@ +package ei.engine.network; + +import java.nio.channels.SocketChannel; + +public class ChangeRequest { + public static final int REGISTER = 1; + public static final int CHANGEOPS = 2; + + public SocketChannel socket; + public int type; + public int ops; + + public ChangeRequest(SocketChannel socket, int type, int ops) { + this.socket = socket; + this.type = type; + this.ops = ops; + } +} diff --git a/src/ei/engine/network/NioClient.java b/src/ei/engine/network/NioClient.java new file mode 100644 index 0000000..3e8ed18 --- /dev/null +++ b/src/ei/engine/network/NioClient.java @@ -0,0 +1,35 @@ +package ei.engine.network; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; + +import ei.engine.network.response.ResponseEvent; + +public class NioClient extends NioNetwork{ + + public NioClient(InetAddress hostAddress, int port) throws IOException { + super(hostAddress, port, CLIENT); + } + + protected Selector initSelector() throws IOException { + // Create a new selector + return SelectorProvider.provider().openSelector(); + } + + /** + * This method is for the Client to send a message to the server + * + * @param handler The response handler + * @param data The data to send + * @throws IOException + */ + public void send(ResponseEvent handler, byte[] data) throws IOException { + // Start a new connection + SocketChannel socket = initiateConnection(new InetSocketAddress(hostAddress, port)); + send(socket, handler, data); + } +} diff --git a/src/ei/engine/network/NioNetwork.java b/src/ei/engine/network/NioNetwork.java new file mode 100644 index 0000000..d2cac57 --- /dev/null +++ b/src/ei/engine/network/NioNetwork.java @@ -0,0 +1,377 @@ +package ei.engine.network; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import ei.engine.network.response.PrintRsp; +import ei.engine.network.response.ResponseEvent; +import ei.engine.network.server.ClientData; +import ei.engine.network.worker.EchoWorker; +import ei.engine.network.worker.Worker; +import ei.engine.util.MultiPrintStream; + +public abstract class NioNetwork implements Runnable { + public static final int SERVER = 0; + public static final int CLIENT = 1; + + private int type; + + // The host:port combination to listen on + protected InetAddress hostAddress; + protected int port; + + // The channel on which we'll accept connections + protected ServerSocketChannel serverChannel; + // The selector we'll be monitoring + protected Selector selector; + // The buffer into which we'll read data when it's available + protected ByteBuffer readBuffer = ByteBuffer.allocate(8192); + protected Worker worker; + + // This map contains all the clients that are conncted + protected Map clients = new HashMap(); + + // A list of PendingChange instances + protected List pendingChanges = new LinkedList(); + // Maps a SocketChannel to a list of ByteBuffer instances + protected Map pendingData = new HashMap(); + + // Maps a SocketChannel to a RspHandler + protected Map rspEvents = Collections.synchronizedMap(new HashMap()); + + /** + * Create a nio network class + * + * @param hostAddress The host address + * @param port The port + * @param worker The worker that handels the data + * @param type The type of network host + * @throws IOException + */ + public NioNetwork(InetAddress hostAddress, int port, int type) throws IOException { + if(MultiPrintStream.out == null){ + MultiPrintStream.makeInstance(new MultiPrintStream("network.log")); + } + this.hostAddress = hostAddress; + this.port = port; + this.type = type; + this.selector = initSelector(); + } + + protected abstract Selector initSelector() throws IOException; + + /** + * Sets the Worker for the network messages + * + * @param worker The worker that handles the incoming messages + */ + public void setWorker(EchoWorker worker){ + this.worker = worker; + } + + public void send(SocketChannel socket, ResponseEvent handler, byte[] data) throws IOException { + // Register the response handler + rspEvents.put(socket, handler); + + queueSend(socket,data); + } + + /** + * This method sends data true the given socket + * + * @param socket The socket + * @param data The data to send + */ + public void send(SocketChannel socket, byte[] data) { + synchronized (pendingChanges) { + // Indicate we want the interest ops set changed + pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); + queueSend(socket,data); + } + } + + public void send(InetSocketAddress address, byte[] data){ + send(getSocketChannel(address), data); + } + + /** + * Queues the message to be sent and wakeups the selector + * + * @param socket The socet to send the message thrue + * @param data The data to send + */ + @SuppressWarnings("unchecked") + protected void queueSend(SocketChannel socket, byte[] data){ + // And queue the data we want written + synchronized (pendingData) { + List queue = pendingData.get(socket); + if (queue == null) { + queue = new ArrayList(); + pendingData.put(socket, queue); + } + queue.add(ByteBuffer.wrap(data)); + + } + // Finally, wake up our selecting thread so it can make the required changes + selector.wakeup(); + } + + public void run() { + while (true) { + try { + // Process any pending changes + synchronized (pendingChanges) { + Iterator changes = pendingChanges.iterator(); + while (changes.hasNext()) { + ChangeRequest change = (ChangeRequest) changes.next(); + switch (change.type) { + case ChangeRequest.CHANGEOPS: + SelectionKey key = change.socket.keyFor(selector); + key.interestOps(change.ops); + break; + case ChangeRequest.REGISTER: + change.socket.register(selector, change.ops); + break; + } + } + pendingChanges.clear(); + } + + // Wait for an event one of the registered channels + selector.select(); + + // Iterate over the set of keys for which events are available + Iterator selectedKeys = selector.selectedKeys().iterator(); + while (selectedKeys.hasNext()) { + SelectionKey key = (SelectionKey) selectedKeys.next(); + selectedKeys.remove(); + + if (key.isValid()) { + // Check what event is available and deal with it + if (key.isAcceptable()) { + accept(key); + //System.out.println("Accepting new Connection!! connection count:"+selector.keys().size()); + } + else if (key.isConnectable()) { + closeConnection(key); + //System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size()); + } + else if (key.isReadable()) { + read(key); + } + else if (key.isWritable()) { + write(key); + } + } + } + } catch (Exception e) { + System.out.println("run(): "+e.toString()); + e.printStackTrace(); + } + } + } + + /** + * Server + */ + private void accept(SelectionKey key) throws IOException { + // For an accept to be pending the channel must be a server socket channel. + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + + // Accept the connection and make it non-blocking + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + + // Register the new SocketChannel with our Selector, indicating + // we'd like to be notified when there's data waiting to be read + socketChannel.register(selector, SelectionKey.OP_READ); + + // adds the client to the clients list + InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); + if(!clients.containsValue(remoteAdr)){ + clients.put(remoteAdr, new ClientData(socketChannel)); + } + } + + /** + * Client and Server + */ + private void read(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + // Clear out our read buffer so it's ready for new data + readBuffer.clear(); + + // Attempt to read off the channel + int numRead; + try { + numRead = socketChannel.read(readBuffer); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel + // the selection key and close the channel. + key.cancel(); + socketChannel.close(); + return; + } + + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do the + // same from our end and cancel the channel. + key.channel().close(); + key.cancel(); + return; + } + + if(rspEvents.get(socketChannel) != null){ + // Handle the response + handleResponse(socketChannel, readBuffer.array(), numRead); + } + else{ + // Hand the data off to our worker thread + if(worker != null){ + worker.processData(this, socketChannel, readBuffer.array(), numRead); + } + else{ + System.out.println("Unhandled Message Removed!!!"); + } + } + } + + /** + * Initieates a socket to the server + */ + protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException { + // Create a non-blocking socket channel + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + + // Kick off connection establishment + socketChannel.connect(address); + + // Queue a channel registration since the caller is not the + // selecting thread. As part of the registration we'll register + // an interest in connection events. These are raised when a channel + // is ready to complete connection establishment. + synchronized(this.pendingChanges) { + pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); + } + + return socketChannel; + } + + protected SocketChannel getSocketChannel(InetSocketAddress address){ + return clients.get(address).getSocketChannel(); + } + + /** + * Client And Server ResponseEvent + */ + private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException { + // Make a correctly sized copy of the data before handing it + // to the client + byte[] rspData = new byte[numRead]; + System.arraycopy(data, 0, rspData, 0, numRead); + + // Look up the handler for this channel + ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel); + + // And pass the response to it + if (handler.handleResponse(rspData)) { + // The handler has seen enough, close the connection + socketChannel.close(); + socketChannel.keyFor(selector).cancel(); + } + + rspEvents.remove(socketChannel); + } + + /** + * Client and Server + */ + private void write(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + synchronized (pendingData) { + List queue = (List) pendingData.get(socketChannel); + + // Write until there's not more data ... + while (!queue.isEmpty()) { + ByteBuffer buf = (ByteBuffer) queue.get(0); + socketChannel.write(buf); + if (buf.remaining() > 0) { + // ... or the socket's buffer fills up + break; + } + queue.remove(0); + } + + if (queue.isEmpty()) { + // We wrote away all data, so we're no longer interested + // in writing on this socket. Switch back to waiting for + // data. + key.interestOps(SelectionKey.OP_READ); + pendingData.remove(socketChannel); + } + } + } + + /** + * Client + */ + private void closeConnection(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + + // Finish the connection. If the connection operation failed + // this will raise an IOException. + try { + socketChannel.finishConnect(); + } catch (IOException e) { + // Cancel the channel's registration with our selector + e.printStackTrace(); + key.cancel(); + return; + } + + // Register an interest in writing on this channel + key.interestOps(SelectionKey.OP_WRITE); + } + + public static void main(String[] args) { + try { + EchoWorker worker = new EchoWorker(); + new Thread(worker).start(); + NioNetwork server = new NioServer(null, 9090); + server.setWorker(worker); + new Thread(server).start(); + + NioClient client = new NioClient(InetAddress.getByName("localhost"), 9090); + Thread t = new Thread(client); + t.setDaemon(false); + t.start(); + for(int i=0;;i++){ + PrintRsp handler = new PrintRsp(); + client.send(handler, ("LOL: "+i).getBytes()); + handler.waitForResponse(); + System.out.println("sending"); + } + } catch (IOException e) { + e.printStackTrace(); + } + +System.out.println("closing"); + + } +} diff --git a/src/ei/engine/network/NioServer.java b/src/ei/engine/network/NioServer.java new file mode 100644 index 0000000..2fb59bb --- /dev/null +++ b/src/ei/engine/network/NioServer.java @@ -0,0 +1,44 @@ +package ei.engine.network; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.util.Iterator; + +public class NioServer extends NioNetwork{ + + public NioServer(InetAddress hostAddress, int port) throws IOException { + super(hostAddress, port, SERVER); + } + + protected Selector initSelector() throws IOException { + // Create a new selector + Selector socketSelector = SelectorProvider.provider().openSelector(); + + // Create a new non-blocking server socket channel + serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + + // Bind the server socket to the specified address and port + InetSocketAddress isa = new InetSocketAddress(hostAddress, port); + serverChannel.socket().bind(isa); + + // Register the server socket channel, indicating an interest in + // accepting new connections + serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); + + return socketSelector; + } + + public void broadcast(byte[] data){ + Iterator it = clients.keySet().iterator(); + while(it.hasNext()){ + send(it.next(), data); + } + } + +} diff --git a/src/ei/engine/network/message/Message.java b/src/ei/engine/network/message/Message.java new file mode 100644 index 0000000..f8d03c9 --- /dev/null +++ b/src/ei/engine/network/message/Message.java @@ -0,0 +1,9 @@ +package ei.engine.network.message; + +import java.io.Serializable; + +public class Message implements Serializable{ + private static final long serialVersionUID = 1L; + + +} diff --git a/src/ei/engine/network/message/type/KeepAliveMessage.java b/src/ei/engine/network/message/type/KeepAliveMessage.java new file mode 100644 index 0000000..47b2a94 --- /dev/null +++ b/src/ei/engine/network/message/type/KeepAliveMessage.java @@ -0,0 +1,12 @@ +package ei.engine.network.message.type; + +/** + * Tells the destination that the + * source is still online + * + * @author Ziver + * + */ +public interface KeepAliveMessage { + +} diff --git a/src/ei/engine/network/message/type/ReplyRequestMessage.java b/src/ei/engine/network/message/type/ReplyRequestMessage.java new file mode 100644 index 0000000..9fdaf2f --- /dev/null +++ b/src/ei/engine/network/message/type/ReplyRequestMessage.java @@ -0,0 +1,12 @@ +package ei.engine.network.message.type; + +/** + * This interface means that the sender + * wants a reply from the destination + * + * @author Ziver + * + */ +public interface ReplyRequestMessage { + +} diff --git a/src/ei/engine/network/message/type/SystemMessage.java b/src/ei/engine/network/message/type/SystemMessage.java new file mode 100644 index 0000000..13b554c --- /dev/null +++ b/src/ei/engine/network/message/type/SystemMessage.java @@ -0,0 +1,12 @@ +package ei.engine.network.message.type; + +/** + * A message that implements this will be + * handeld internaly by the network engine + * + * @author Ziver + * + */ +public interface SystemMessage { + +} diff --git a/src/ei/engine/network/response/PrintRsp.java b/src/ei/engine/network/response/PrintRsp.java new file mode 100644 index 0000000..c3d2c1e --- /dev/null +++ b/src/ei/engine/network/response/PrintRsp.java @@ -0,0 +1,10 @@ +package ei.engine.network.response; + +public class PrintRsp extends ResponseEvent{ + + @Override + protected void responseEvent(byte[] rsp) { + System.out.println(new String(rsp)); + } + +} diff --git a/src/ei/engine/network/response/ResponseEvent.java b/src/ei/engine/network/response/ResponseEvent.java new file mode 100644 index 0000000..99c0f60 --- /dev/null +++ b/src/ei/engine/network/response/ResponseEvent.java @@ -0,0 +1,35 @@ +package ei.engine.network.response; + + +public abstract class ResponseEvent { + private byte[] rsp = null; + + public synchronized boolean handleResponse(byte[] rsp) { + this.rsp = rsp; + notify(); + return true; + } + + public synchronized void waitForResponse() { + while(!gotResponse()) { + try { + this.wait(); + } catch (InterruptedException e) { + } + } + + responseEvent(rsp); + } + + public void handleResponse(){ + if(gotResponse()){ + responseEvent(rsp); + } + } + + public boolean gotResponse(){ + return (rsp != null); + } + + protected abstract void responseEvent(byte[] rsp); +} diff --git a/src/ei/engine/network/response/ResponseHandler.java b/src/ei/engine/network/response/ResponseHandler.java new file mode 100644 index 0000000..c9fda08 --- /dev/null +++ b/src/ei/engine/network/response/ResponseHandler.java @@ -0,0 +1,41 @@ +package ei.engine.network.response; + +import java.util.LinkedList; +import java.util.List; + + +public abstract class ResponseHandler implements Runnable{ + private List queue = new LinkedList(); + + public ResponseHandler(){ + + } + + public synchronized void addResponseEvent(ResponseEvent re){ + queue.add(re); + notify(); + } + + public synchronized void removeResponseEvent(ResponseEvent re){ + queue.remove(re); + } + + public void run() { + while(true) { + try { + this.wait(); + } catch (InterruptedException e) {} + + update(); + } + } + + public synchronized void update(){ + while(!queue.isEmpty()){ + queue.get(0).handleResponse(); + if(queue.get(0).gotResponse()){ + queue.remove(0); + } + } + } +} diff --git a/src/ei/engine/network/server/ClientData.java b/src/ei/engine/network/server/ClientData.java new file mode 100644 index 0000000..67c8053 --- /dev/null +++ b/src/ei/engine/network/server/ClientData.java @@ -0,0 +1,29 @@ +package ei.engine.network.server; + +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +public class ClientData { + private SocketChannel socketChannel; + private long lastMessageReceived; + + public ClientData(SocketChannel socketChannel){ + this.socketChannel = socketChannel; + } + + public SocketChannel getSocketChannel(){ + return socketChannel; + } + + public InetSocketAddress getAddress(){ + return (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); + } + + public void setLastMessageReceived(long time){ + lastMessageReceived = time; + } + + public long getLastMessageReceived(){ + return lastMessageReceived; + } +} diff --git a/src/ei/engine/network/util/Converter.java b/src/ei/engine/network/util/Converter.java new file mode 100644 index 0000000..7fa9d44 --- /dev/null +++ b/src/ei/engine/network/util/Converter.java @@ -0,0 +1,51 @@ +package ei.engine.network.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import ei.engine.util.MultiPrintStream; + +public class Converter { + + /** + * Converts an object to an array of bytes. + * + * @param object the object to convert. + * @return the associated byte array. + */ + public static byte[] toBytes(Object object){ + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try{ + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.flush(); + }catch(IOException ioe){ + MultiPrintStream.out.println(ioe.getMessage()); + } + return baos.toByteArray(); + } + + /** + * Converts an array of bytes back to its constituent object. The + * input array is assumed to have been created from the original object. + * + * @param bytes the byte array to convert. + * @return the associated object. + */ + public static Object toObject(byte[] bytes) { + Object object = null; + try{ + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois= new ObjectInputStream(bais); + object = ois.readObject(); + }catch(IOException ioe){ + MultiPrintStream.out.println(ioe.getMessage()); + }catch(ClassNotFoundException cnfe){ + MultiPrintStream.out.println(cnfe.getMessage()); + } + return object; + } +} diff --git a/src/ei/engine/network/worker/EchoWorker.java b/src/ei/engine/network/worker/EchoWorker.java new file mode 100644 index 0000000..dd4ce27 --- /dev/null +++ b/src/ei/engine/network/worker/EchoWorker.java @@ -0,0 +1,24 @@ +package ei.engine.network.worker; + +public class EchoWorker extends Worker { + + public void update() { + WorkerDataEvent dataEvent; + + while(true) { + // Wait for data to become available + synchronized(getEventQueue()) { + while(getEventQueue().isEmpty()) { + try { + getEventQueue().wait(); + } catch (InterruptedException e) { + } + } + dataEvent = (WorkerDataEvent) getEventQueue().remove(0); + } + + // Return to sender + dataEvent.network.send(dataEvent.socket, dataEvent.data); + } + } +} diff --git a/src/ei/engine/network/worker/SystemWorker.java b/src/ei/engine/network/worker/SystemWorker.java new file mode 100644 index 0000000..0afa4ec --- /dev/null +++ b/src/ei/engine/network/worker/SystemWorker.java @@ -0,0 +1,11 @@ +package ei.engine.network.worker; + +public class SystemWorker extends Worker { + + @Override + public void update() { + // TODO Auto-generated method stub + + } + +} diff --git a/src/ei/engine/network/worker/Worker.java b/src/ei/engine/network/worker/Worker.java new file mode 100644 index 0000000..97d7fe2 --- /dev/null +++ b/src/ei/engine/network/worker/Worker.java @@ -0,0 +1,30 @@ +package ei.engine.network.worker; + +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.List; + +import ei.engine.network.NioNetwork; + +public abstract class Worker implements Runnable { + private List queue = new LinkedList(); + + public void processData(NioNetwork server, SocketChannel socket, byte[] data, int count) { + byte[] dataCopy = new byte[count]; + System.arraycopy(data, 0, dataCopy, 0, count); + synchronized(queue) { + queue.add(new WorkerDataEvent(server, socket, dataCopy)); + queue.notify(); + } + } + + protected List getEventQueue(){ + return queue; + } + + public void run(){ + update(); + } + + public abstract void update(); +} diff --git a/src/ei/engine/network/worker/WorkerDataEvent.java b/src/ei/engine/network/worker/WorkerDataEvent.java new file mode 100644 index 0000000..b80ae62 --- /dev/null +++ b/src/ei/engine/network/worker/WorkerDataEvent.java @@ -0,0 +1,17 @@ +package ei.engine.network.worker; + +import java.nio.channels.SocketChannel; + +import ei.engine.network.NioNetwork; + +class WorkerDataEvent { + public NioNetwork network; + public SocketChannel socket; + public byte[] data; + + public WorkerDataEvent(NioNetwork server, SocketChannel socket, byte[] data) { + this.network = server; + this.socket = socket; + this.data = data; + } +} \ No newline at end of file