Adding networking to the engine that when complet will be integrated whit the game

This commit is contained in:
Ziver Koc 2007-06-02 22:19:26 +00:00
parent 8f6dac310a
commit 84383f666c
17 changed files with 767 additions and 0 deletions

View file

@ -0,0 +1,18 @@
package ei.engine.network;
import java.nio.channels.SocketChannel;
public class ChangeRequest {
public static final int REGISTER = 1;
public static final int CHANGEOPS = 2;
public SocketChannel socket;
public int type;
public int ops;
public ChangeRequest(SocketChannel socket, int type, int ops) {
this.socket = socket;
this.type = type;
this.ops = ops;
}
}

View file

@ -0,0 +1,35 @@
package ei.engine.network;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import ei.engine.network.response.ResponseEvent;
public class NioClient extends NioNetwork{
public NioClient(InetAddress hostAddress, int port) throws IOException {
super(hostAddress, port, CLIENT);
}
protected Selector initSelector() throws IOException {
// Create a new selector
return SelectorProvider.provider().openSelector();
}
/**
* This method is for the Client to send a message to the server
*
* @param handler The response handler
* @param data The data to send
* @throws IOException
*/
public void send(ResponseEvent handler, byte[] data) throws IOException {
// Start a new connection
SocketChannel socket = initiateConnection(new InetSocketAddress(hostAddress, port));
send(socket, handler, data);
}
}

View file

@ -0,0 +1,377 @@
package ei.engine.network;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import ei.engine.network.response.PrintRsp;
import ei.engine.network.response.ResponseEvent;
import ei.engine.network.server.ClientData;
import ei.engine.network.worker.EchoWorker;
import ei.engine.network.worker.Worker;
import ei.engine.util.MultiPrintStream;
public abstract class NioNetwork implements Runnable {
public static final int SERVER = 0;
public static final int CLIENT = 1;
private int type;
// The host:port combination to listen on
protected InetAddress hostAddress;
protected int port;
// The channel on which we'll accept connections
protected ServerSocketChannel serverChannel;
// The selector we'll be monitoring
protected Selector selector;
// The buffer into which we'll read data when it's available
protected ByteBuffer readBuffer = ByteBuffer.allocate(8192);
protected Worker worker;
// This map contains all the clients that are conncted
protected Map<InetSocketAddress, ClientData> clients = new HashMap<InetSocketAddress, ClientData>();
// A list of PendingChange instances
protected List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
// Maps a SocketChannel to a list of ByteBuffer instances
protected Map<SocketChannel, List> pendingData = new HashMap<SocketChannel, List>();
// Maps a SocketChannel to a RspHandler
protected Map<SocketChannel, ResponseEvent> rspEvents = Collections.synchronizedMap(new HashMap<SocketChannel, ResponseEvent>());
/**
* Create a nio network class
*
* @param hostAddress The host address
* @param port The port
* @param worker The worker that handels the data
* @param type The type of network host
* @throws IOException
*/
public NioNetwork(InetAddress hostAddress, int port, int type) throws IOException {
if(MultiPrintStream.out == null){
MultiPrintStream.makeInstance(new MultiPrintStream("network.log"));
}
this.hostAddress = hostAddress;
this.port = port;
this.type = type;
this.selector = initSelector();
}
protected abstract Selector initSelector() throws IOException;
/**
* Sets the Worker for the network messages
*
* @param worker The worker that handles the incoming messages
*/
public void setWorker(EchoWorker worker){
this.worker = worker;
}
public void send(SocketChannel socket, ResponseEvent handler, byte[] data) throws IOException {
// Register the response handler
rspEvents.put(socket, handler);
queueSend(socket,data);
}
/**
* This method sends data true the given socket
*
* @param socket The socket
* @param data The data to send
*/
public void send(SocketChannel socket, byte[] data) {
synchronized (pendingChanges) {
// Indicate we want the interest ops set changed
pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
queueSend(socket,data);
}
}
public void send(InetSocketAddress address, byte[] data){
send(getSocketChannel(address), data);
}
/**
* Queues the message to be sent and wakeups the selector
*
* @param socket The socet to send the message thrue
* @param data The data to send
*/
@SuppressWarnings("unchecked")
protected void queueSend(SocketChannel socket, byte[] data){
// And queue the data we want written
synchronized (pendingData) {
List<ByteBuffer> queue = pendingData.get(socket);
if (queue == null) {
queue = new ArrayList<ByteBuffer>();
pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
// Finally, wake up our selecting thread so it can make the required changes
selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (pendingChanges) {
Iterator changes = pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(selector);
key.interestOps(change.ops);
break;
case ChangeRequest.REGISTER:
change.socket.register(selector, change.ops);
break;
}
}
pendingChanges.clear();
}
// Wait for an event one of the registered channels
selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (key.isValid()) {
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
//System.out.println("Accepting new Connection!! connection count:"+selector.keys().size());
}
else if (key.isConnectable()) {
closeConnection(key);
//System.out.println("Disconnecting Connection!! connection count:"+selector.keys().size());
}
else if (key.isReadable()) {
read(key);
}
else if (key.isWritable()) {
write(key);
}
}
}
} catch (Exception e) {
System.out.println("run(): "+e.toString());
e.printStackTrace();
}
}
}
/**
* Server
*/
private void accept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(selector, SelectionKey.OP_READ);
// adds the client to the clients list
InetSocketAddress remoteAdr = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
if(!clients.containsValue(remoteAdr)){
clients.put(remoteAdr, new ClientData(socketChannel));
}
}
/**
* Client and Server
*/
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
if(rspEvents.get(socketChannel) != null){
// Handle the response
handleResponse(socketChannel, readBuffer.array(), numRead);
}
else{
// Hand the data off to our worker thread
if(worker != null){
worker.processData(this, socketChannel, readBuffer.array(), numRead);
}
else{
System.out.println("Unhandled Message Removed!!!");
}
}
}
/**
* Initieates a socket to the server
*/
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(address);
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
protected SocketChannel getSocketChannel(InetSocketAddress address){
return clients.get(address).getSocketChannel();
}
/**
* Client And Server ResponseEvent
*/
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
// Make a correctly sized copy of the data before handing it
// to the client
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
// Look up the handler for this channel
ResponseEvent handler = (ResponseEvent) rspEvents.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
socketChannel.close();
socketChannel.keyFor(selector).cancel();
}
rspEvents.remove(socketChannel);
}
/**
* Client and Server
*/
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (pendingData) {
List queue = (List) pendingData.get(socketChannel);
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
pendingData.remove(socketChannel);
}
}
}
/**
* Client
*/
private void closeConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
} catch (IOException e) {
// Cancel the channel's registration with our selector
e.printStackTrace();
key.cancel();
return;
}
// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
}
public static void main(String[] args) {
try {
EchoWorker worker = new EchoWorker();
new Thread(worker).start();
NioNetwork server = new NioServer(null, 9090);
server.setWorker(worker);
new Thread(server).start();
NioClient client = new NioClient(InetAddress.getByName("localhost"), 9090);
Thread t = new Thread(client);
t.setDaemon(false);
t.start();
for(int i=0;;i++){
PrintRsp handler = new PrintRsp();
client.send(handler, ("LOL: "+i).getBytes());
handler.waitForResponse();
System.out.println("sending");
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("closing");
}
}

View file

@ -0,0 +1,44 @@
package ei.engine.network;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
public class NioServer extends NioNetwork{
public NioServer(InetAddress hostAddress, int port) throws IOException {
super(hostAddress, port, SERVER);
}
protected Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public void broadcast(byte[] data){
Iterator<InetSocketAddress> it = clients.keySet().iterator();
while(it.hasNext()){
send(it.next(), data);
}
}
}

View file

@ -0,0 +1,9 @@
package ei.engine.network.message;
import java.io.Serializable;
public class Message implements Serializable{
private static final long serialVersionUID = 1L;
}

View file

@ -0,0 +1,12 @@
package ei.engine.network.message.type;
/**
* Tells the destination that the
* source is still online
*
* @author Ziver
*
*/
public interface KeepAliveMessage {
}

View file

@ -0,0 +1,12 @@
package ei.engine.network.message.type;
/**
* This interface means that the sender
* wants a reply from the destination
*
* @author Ziver
*
*/
public interface ReplyRequestMessage {
}

View file

@ -0,0 +1,12 @@
package ei.engine.network.message.type;
/**
* A message that implements this will be
* handeld internaly by the network engine
*
* @author Ziver
*
*/
public interface SystemMessage {
}

View file

@ -0,0 +1,10 @@
package ei.engine.network.response;
public class PrintRsp extends ResponseEvent{
@Override
protected void responseEvent(byte[] rsp) {
System.out.println(new String(rsp));
}
}

View file

@ -0,0 +1,35 @@
package ei.engine.network.response;
public abstract class ResponseEvent {
private byte[] rsp = null;
public synchronized boolean handleResponse(byte[] rsp) {
this.rsp = rsp;
notify();
return true;
}
public synchronized void waitForResponse() {
while(!gotResponse()) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
responseEvent(rsp);
}
public void handleResponse(){
if(gotResponse()){
responseEvent(rsp);
}
}
public boolean gotResponse(){
return (rsp != null);
}
protected abstract void responseEvent(byte[] rsp);
}

View file

@ -0,0 +1,41 @@
package ei.engine.network.response;
import java.util.LinkedList;
import java.util.List;
public abstract class ResponseHandler implements Runnable{
private List<ResponseEvent> queue = new LinkedList<ResponseEvent>();
public ResponseHandler(){
}
public synchronized void addResponseEvent(ResponseEvent re){
queue.add(re);
notify();
}
public synchronized void removeResponseEvent(ResponseEvent re){
queue.remove(re);
}
public void run() {
while(true) {
try {
this.wait();
} catch (InterruptedException e) {}
update();
}
}
public synchronized void update(){
while(!queue.isEmpty()){
queue.get(0).handleResponse();
if(queue.get(0).gotResponse()){
queue.remove(0);
}
}
}
}

View file

@ -0,0 +1,29 @@
package ei.engine.network.server;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class ClientData {
private SocketChannel socketChannel;
private long lastMessageReceived;
public ClientData(SocketChannel socketChannel){
this.socketChannel = socketChannel;
}
public SocketChannel getSocketChannel(){
return socketChannel;
}
public InetSocketAddress getAddress(){
return (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
}
public void setLastMessageReceived(long time){
lastMessageReceived = time;
}
public long getLastMessageReceived(){
return lastMessageReceived;
}
}

View file

@ -0,0 +1,51 @@
package ei.engine.network.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import ei.engine.util.MultiPrintStream;
public class Converter {
/**
* Converts an object to an array of bytes.
*
* @param object the object to convert.
* @return the associated byte array.
*/
public static byte[] toBytes(Object object){
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try{
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
}catch(IOException ioe){
MultiPrintStream.out.println(ioe.getMessage());
}
return baos.toByteArray();
}
/**
* Converts an array of bytes back to its constituent object. The
* input array is assumed to have been created from the original object.
*
* @param bytes the byte array to convert.
* @return the associated object.
*/
public static Object toObject(byte[] bytes) {
Object object = null;
try{
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois= new ObjectInputStream(bais);
object = ois.readObject();
}catch(IOException ioe){
MultiPrintStream.out.println(ioe.getMessage());
}catch(ClassNotFoundException cnfe){
MultiPrintStream.out.println(cnfe.getMessage());
}
return object;
}
}

View file

@ -0,0 +1,24 @@
package ei.engine.network.worker;
public class EchoWorker extends Worker {
public void update() {
WorkerDataEvent dataEvent;
while(true) {
// Wait for data to become available
synchronized(getEventQueue()) {
while(getEventQueue().isEmpty()) {
try {
getEventQueue().wait();
} catch (InterruptedException e) {
}
}
dataEvent = (WorkerDataEvent) getEventQueue().remove(0);
}
// Return to sender
dataEvent.network.send(dataEvent.socket, dataEvent.data);
}
}
}

View file

@ -0,0 +1,11 @@
package ei.engine.network.worker;
public class SystemWorker extends Worker {
@Override
public void update() {
// TODO Auto-generated method stub
}
}

View file

@ -0,0 +1,30 @@
package ei.engine.network.worker;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
import ei.engine.network.NioNetwork;
public abstract class Worker implements Runnable {
private List<WorkerDataEvent> queue = new LinkedList<WorkerDataEvent>();
public void processData(NioNetwork server, SocketChannel socket, byte[] data, int count) {
byte[] dataCopy = new byte[count];
System.arraycopy(data, 0, dataCopy, 0, count);
synchronized(queue) {
queue.add(new WorkerDataEvent(server, socket, dataCopy));
queue.notify();
}
}
protected List getEventQueue(){
return queue;
}
public void run(){
update();
}
public abstract void update();
}

View file

@ -0,0 +1,17 @@
package ei.engine.network.worker;
import java.nio.channels.SocketChannel;
import ei.engine.network.NioNetwork;
class WorkerDataEvent {
public NioNetwork network;
public SocketChannel socket;
public byte[] data;
public WorkerDataEvent(NioNetwork server, SocketChannel socket, byte[] data) {
this.network = server;
this.socket = socket;
this.data = data;
}
}