Added some algorithms and moved some files and fixed some comments

This commit is contained in:
Ziver Koc 2009-02-26 17:10:57 +00:00
parent 017a27931a
commit 9297bea93d
25 changed files with 1043 additions and 192 deletions

View file

@ -1,7 +1,6 @@
package zutil.network.nio;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -26,6 +25,7 @@ import zutil.network.nio.server.ChangeRequest;
import zutil.network.nio.server.ClientData;
import zutil.network.nio.worker.SystemWorker;
import zutil.network.nio.worker.Worker;
import zutil.struct.DynamicByteArrayStream;
public abstract class NioNetwork implements Runnable {
@ -36,7 +36,7 @@ public abstract class NioNetwork implements Runnable {
* 2 = message debug
* 3 = selector debug
*/
public static final int DEBUG = 1;
public static final int DEBUG = 2;
public static enum NetworkType {SERVER, CLIENT};
private NetworkType type;
@ -60,7 +60,8 @@ public abstract class NioNetwork implements Runnable {
// A list of PendingChange instances
private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();
private Map<SocketChannel, List<ByteBuffer>> pendingWriteData = new HashMap<SocketChannel, List<ByteBuffer>>();
private Map<SocketChannel, DynamicByteArrayStream> pendingReadData = new HashMap<SocketChannel, DynamicByteArrayStream>();
// The encrypter
private Encrypter encrypter;
@ -102,11 +103,11 @@ public abstract class NioNetwork implements Runnable {
(encrypter != null ? "Enabled("+encrypter.getAlgorithm()+")" : "Disabled")+"!!");
}
public void send(SocketChannel socket, Object data) {
public void send(SocketChannel socket, Object data) throws IOException{
send(socket, Converter.toBytes(data));
}
public void send(InetSocketAddress address, Object data){
public void send(InetSocketAddress address, Object data) throws IOException{
send(address, Converter.toBytes(data));
}
@ -140,11 +141,11 @@ public abstract class NioNetwork implements Runnable {
protected void queueSend(SocketChannel socket, byte[] data){
if(DEBUG>=3)MultiPrintStream.out.println("Sending Queue...");
// And queue the data we want written
synchronized (pendingData) {
List<ByteBuffer> queue = pendingData.get(socket);
synchronized (pendingWriteData) {
List<ByteBuffer> queue = pendingWriteData.get(socket);
if (queue == null) {
queue = new ArrayList<ByteBuffer>();
pendingData.put(socket, queue);
pendingWriteData.put(socket, queue);
}
//encrypts
if(encrypter != null)
@ -266,8 +267,10 @@ public abstract class NioNetwork implements Runnable {
key.cancel();
socketChannel.close();
clients.remove(remoteAdr);
pendingReadData.remove(socketChannel);
pendingWriteData.remove(socketChannel);
if(DEBUG>=1)MultiPrintStream.out.println("Connection Forced Close("+remoteAdr+")!!! Connection Count: "+clients.size());
if(type == NetworkType.CLIENT) throw new ConnectException("Server Closed The Connection!!!");
if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!");
return;
}
@ -277,8 +280,10 @@ public abstract class NioNetwork implements Runnable {
key.channel().close();
key.cancel();
clients.remove(remoteAdr);
pendingReadData.remove(socketChannel);
pendingWriteData.remove(socketChannel);
if(DEBUG>=1)MultiPrintStream.out.println("Connection Close("+remoteAdr+")!!! Connection Count: "+clients.size());
if(type == NetworkType.CLIENT) throw new ConnectException("Server Closed The Connection!!!");
if(type == NetworkType.CLIENT) throw new IOException("Server Closed The Connection!!!");
return;
}
@ -286,8 +291,28 @@ public abstract class NioNetwork implements Runnable {
// to the client
byte[] rspByteData = new byte[numRead];
System.arraycopy(readBuffer.array(), 0, rspByteData, 0, numRead);
if(encrypter != null)// Encryption
rspByteData = encrypter.decrypt(rspByteData);
handleRecivedMessage(socketChannel, rspByteData);
/*
if(!pendingReadData.containsKey(socketChannel)){
pendingReadData.put(socketChannel, new DynamicByteArrayStream());
}
if(encrypter != null)// Encryption
rspByteData = encrypter.decrypt(rspByteData);
pendingReadData.get(socketChannel).add(rspByteData);
*/
Object rspData = null;
try{
rspData = Converter.toObject(rspByteData);
//rspData = Converter.toObject(pendingReadData.get(socketChannel));
handleRecivedMessage(socketChannel, rspData);
//pendingReadData.get(socketChannel).clear();
}catch(Exception e){
e.printStackTrace();
}
}
/**
@ -296,18 +321,22 @@ public abstract class NioNetwork implements Runnable {
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (pendingData) {
List<ByteBuffer> queue = pendingData.get(socketChannel);
synchronized (pendingWriteData) {
List<ByteBuffer> queue = pendingWriteData.get(socketChannel);
if(queue == null){
queue = new ArrayList<ByteBuffer>();
}
int i = 0;
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = queue.get(0);
i += buf.remaining();
socketChannel.write(buf);
i -= buf.remaining();
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
if(DEBUG>=3)MultiPrintStream.out.println("Write Buffer Full!!");
break;
}
queue.remove(0);
@ -323,14 +352,8 @@ public abstract class NioNetwork implements Runnable {
}
}
private void handleRecivedMessage(SocketChannel socketChannel, byte[] rspByteData){
//Encryption
Object rspData;
if(encrypter != null)
rspData = Converter.toObject(encrypter.decrypt(rspByteData));
else rspData = Converter.toObject(rspByteData);
private void handleRecivedMessage(SocketChannel socketChannel, Object rspData){
if(DEBUG>=2)MultiPrintStream.out.println("Handling incomming message...");
if(rspData instanceof SystemMessage){
if(systemWorker != null){
if(DEBUG>=3)MultiPrintStream.out.println("System Message!!!");
@ -347,13 +370,13 @@ public abstract class NioNetwork implements Runnable {
worker.processData(this, socketChannel, rspData);
}
else{
if(DEBUG>=1)MultiPrintStream.out.println("Unhandled Message!!!");
if(DEBUG>=1)MultiPrintStream.out.println("Unhandled Worker Message!!!");
}
}
}
/**
* Initializes a socket to the server
* Initializes a socket to a server
*/
protected SocketChannel initiateConnection(InetSocketAddress address) throws IOException {
// Create a non-blocking socket channel

View file

@ -0,0 +1,84 @@
package zutil.network.nio.message;
public class GridMessage<T> extends Message{
private static final long serialVersionUID = 1L;
// Client type messages
/** Computation job return right answer **/
public static final int COMP_SUCCESSFUL = 1; //
/** Initial static data **/
public static final int COMP_INCORRECT = 2; //
/** Computation job return wrong answer **/
public static final int COMP_ERROR = 3; //
/** There was an error computing **/
public static final int REGISTER = 4; //
/** Register at the server **/
public static final int UNREGISTER = 5; //
/** Request new computation data **/
public static final int NEW_DATA = 6; //
// Server type messages
/** Sending initial static data **/
public static final int INIT_DATA = 100;
/** Sending new dynamic data **/
public static final int COMP_DATA = 101;
private int type;
private int jobID;
private T data;
/**
* Creates a new GridMessage
*
* @param type is the type of message
* @param jobID is the id of the job
*/
public GridMessage(int type){
this(type, 0, null);
}
/**
* Creates a new GridMessage
*
* @param type is the type of message
* @param jobID is the id of the job
*/
public GridMessage(int type, int jobID){
this(type, jobID, null);
}
/**
* Creates a new GridMessage
*
* @param type is the type of message
* @param jobID is the id of the job
* @param data is the data to send with this message
*/
public GridMessage(int type, int jobID, T data){
this.type = type;
this.jobID = jobID;
this.data = data;
}
/**
* @return the type of message
*/
public int messageType(){
return type;
}
/**
* @return the job id for this message
*/
public int getJobQueueID(){
return jobID;
}
/**
* @return the data in this message, may not always carry any data.
*/
public T getData(){
return data;
}
}

View file

@ -21,38 +21,42 @@ public class ChatService extends NetworkService{
@Override
public void handleMessage(Message message, SocketChannel socket) {
// New message
if(message instanceof ChatMessage){
ChatMessage chatmessage = (ChatMessage)message;
//is this a new message
if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){
// Is this the server
if(nio.getType() == NioNetwork.NetworkType.SERVER){
if(rooms.containsKey(chatmessage.room)){
LinkedList<SocketChannel> tmpList = rooms.get(chatmessage.room);
try {
// New message
if(message instanceof ChatMessage){
ChatMessage chatmessage = (ChatMessage)message;
//is this a new message
if(chatmessage.type == ChatMessage.ChatMessageType.MESSAGE){
// Is this the server
if(nio.getType() == NioNetwork.NetworkType.SERVER){
if(rooms.containsKey(chatmessage.room)){
LinkedList<SocketChannel> tmpList = rooms.get(chatmessage.room);
// Broadcast the message
for(SocketChannel s : tmpList){
if(s.isConnected()){
nio.send(s, chatmessage);
}
else{
unRegisterUser(chatmessage.room, s);
// Broadcast the message
for(SocketChannel s : tmpList){
if(s.isConnected()){
nio.send(s, chatmessage);
}
else{
unRegisterUser(chatmessage.room, s);
}
}
}
}
if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("New Chat Message: "+chatmessage.msg);
listener.messageAction(chatmessage.msg, chatmessage.room);
}
// register to a room
else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){
registerUser(chatmessage.room, socket);
}
// unregister to a room
else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){
unRegisterUser(chatmessage.room, socket);
}
if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("New Chat Message: "+chatmessage.msg);
listener.messageAction(chatmessage.msg, chatmessage.room);
}
// register to a room
else if(chatmessage.type == ChatMessage.ChatMessageType.REGISTER){
registerUser(chatmessage.room, socket);
}
// unregister to a room
else if(chatmessage.type == ChatMessage.ChatMessageType.UNREGISTER){
unRegisterUser(chatmessage.room, socket);
}
} catch (Exception e) {
e.printStackTrace();
}
}

View file

@ -1,14 +1,20 @@
package zutil.network.nio.worker;
import java.io.IOException;
import zutil.MultiPrintStream;
public class EchoWorker extends ThreadedEventWorker {
@Override
public void messageEvent(WorkerDataEvent dataEvent) {
// Return to sender
MultiPrintStream.out.println("Recived Msg: "+dataEvent.data);
dataEvent.network.send(dataEvent.socket, dataEvent.data);
try {
// Return to sender
MultiPrintStream.out.println("Recived Msg: "+dataEvent.data);
dataEvent.network.send(dataEvent.socket, dataEvent.data);
} catch (IOException e) {
e.printStackTrace();
}
}

View file

@ -33,27 +33,31 @@ public class SystemWorker extends ThreadedEventWorker {
@Override
public void messageEvent(WorkerDataEvent event) {
if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("System Message: "+event.data.getClass().getName());
if(event.data instanceof Message){
if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){
// Echos back the recived message
((EchoMessage)event.data).recived();
if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Echoing Message: "+event.data);
nio.send(event.socket, event.data);
}
else if(event.data instanceof ResponseRequestMessage &&
rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){
// Handle the response
handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data);
if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Response Request Message: "+event.data);
}
else{
//Services
if(services.containsKey(event.data.getClass()) ||
!services.containsKey(event.data.getClass()) && defaultServices(event.data)){
services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket);
try {
if(NioNetwork.DEBUG>=2)MultiPrintStream.out.println("System Message: "+event.data.getClass().getName());
if(event.data instanceof Message){
if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){
// Echos back the recived message
((EchoMessage)event.data).recived();
if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Echoing Message: "+event.data);
nio.send(event.socket, event.data);
}
else if(event.data instanceof ResponseRequestMessage &&
rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){
// Handle the response
handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data);
if(NioNetwork.DEBUG>=3)MultiPrintStream.out.println("Response Request Message: "+event.data);
}
else{
//Services
if(services.containsKey(event.data.getClass()) ||
!services.containsKey(event.data.getClass()) && defaultServices(event.data)){
services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

View file

@ -0,0 +1,114 @@
package zutil.network.nio.worker.grid;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import zutil.MultiPrintStream;
import zutil.network.nio.NioClient;
import zutil.network.nio.message.GridMessage;
import zutil.network.nio.worker.ThreadedEventWorker;
import zutil.network.nio.worker.WorkerDataEvent;
/**
* This class is the client part of the grid.
* It connects to a grid server and requests new job.
* And then sends back the result to the server.
*
* @author Ziver
*/
@SuppressWarnings("unchecked")
public class GridClient extends ThreadedEventWorker {
private static LinkedList<GridJob> jobQueue;
private static GridThread thread;
private static NioClient network;
/**
* Creates a new GridClient object and registers itself at the server
* and sets itself as a worker in NioClient
*
* @param thread the Thread interface to run for the jobs
* @param network the NioClient to use to communicate to the server
*/
public GridClient(GridThread thread, NioClient network){
jobQueue = new LinkedList<GridJob>();
GridClient.thread = thread;
GridClient.network = network;
}
/**
* Starts up the client and a couple of GridThreads.
* And registers itself as a worker in NioClient
* @throws IOException
*/
public void initiate() throws IOException{
network.setDefaultWorker(this);
network.send(new GridMessage(GridMessage.REGISTER));
for(int i=0; i<Runtime.getRuntime().availableProcessors() ;i++){
Thread t = new Thread(thread);
t.start();
}
}
@Override
public void messageEvent(WorkerDataEvent e) {
// ignores other messages than GridMessage
if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data;
switch(msg.messageType()){
// Receive data from Server
case GridMessage.INIT_DATA:
thread.setInitData(msg.getData());
break;
case GridMessage.COMP_DATA:
jobQueue.add(new GridJob(msg.getJobQueueID(), (Queue)msg.getData()));
break;
}
}
}
/**
* Register whit the server that the job is done
*
* @param jobID is the job id
* @param correct if the answer was right
* @param result the result of the computation
* @throws IOException
*/
public static void jobDone(int jobID, boolean correct, Object result) throws IOException{
if(correct)
network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID, result));
else
network.send(new GridMessage(GridMessage.COMP_INCORRECT, jobID, result));
}
/**
* Registers with the server that there was an
* error when computing this job
*
* @param jobID is the job id
*/
public static void jobError(int jobID){
try{
network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID));
}catch(Exception e){e.printStackTrace();}
}
/**
* @return a new job to compute
* @throws IOException
*/
public static synchronized GridJob getNextJob() throws IOException{
if(jobQueue.isEmpty()){
network.send(new GridMessage(GridMessage.NEW_DATA));
while(jobQueue.isEmpty()){
try{Thread.sleep(100);}catch(Exception e){}
}
}
MultiPrintStream.out.println("Starting job");
return jobQueue.poll();
}
}

View file

@ -0,0 +1,22 @@
package zutil.network.nio.worker.grid;
/**
* A internal class for handling the jobs
*
* @author Ziver
*/
public class GridJob{
public int jobID;
public Object job;
public long timestamp;
public GridJob(int jobID, Object job){
this.jobID = jobID;
this.job = job;
renewTimeStamp();
}
public void renewTimeStamp(){
timestamp = System.currentTimeMillis();
}
}

View file

@ -0,0 +1,18 @@
package zutil.network.nio.worker.grid;
/**
* Generates new jobs for the grid to compute
*
* @author Ziver
*/
public interface GridJobGenerator<T> {
/**
* @return static and final values that do not change for every job
*/
public Object initValues();
/**
* @return a new generated job
*/
public T generateJob();
}

View file

@ -0,0 +1,10 @@
package zutil.network.nio.worker.grid;
/**
* Handles the incoming results from the grid
*
* @author Ziver
*/
public interface GridResultHandler<T> {
public void resultEvent(int jobID, boolean correct, T result);
}

View file

@ -0,0 +1,111 @@
package zutil.network.nio.worker.grid;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import zutil.network.nio.message.GridMessage;
import zutil.network.nio.worker.ThreadedEventWorker;
import zutil.network.nio.worker.WorkerDataEvent;
/**
* Implements a simple network computing server
*
* @author Ziver
*/
@SuppressWarnings("unchecked")
public class GridServerWorker extends ThreadedEventWorker{
// Job timeout after 30 min
public static int JOB_TIMEOUT = 1000*60*30;
private HashMap<Integer, GridJob> jobs; // contains all the ongoing jobs
private Queue<GridJob> reSendjobQueue; // Contains all the jobs that will be recalculated
private GridJobGenerator jobGenerator; // The job generator
private GridResultHandler resHandler;
private int nextJobID;
public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){
this.resHandler = resHandler;
this.jobGenerator = jobGenerator;
nextJobID = 0;
jobs = new HashMap<Integer, GridJob>();
reSendjobQueue = new LinkedList<GridJob>();
GridMaintainer maintainer = new GridMaintainer();
maintainer.start();
}
@Override
public void messageEvent(WorkerDataEvent e) {
try {
// ignores other messages than GridMessage
if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data;
GridJob job = null;
switch(msg.messageType()){
case GridMessage.REGISTER:
e.network.send(e.socket, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues()));
break;
// Sending new data to compute to the client
case GridMessage.NEW_DATA:
if(!reSendjobQueue.isEmpty()){ // checks first if there is a job for recalculation
job = reSendjobQueue.poll();
job.renewTimeStamp();
}
else{ // generates new job
job = new GridJob(nextJobID,
jobGenerator.generateJob());
jobs.put(job.jobID, job);
nextJobID++;
}
GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job);
e.network.send(e.socket, newMsg);
break;
// Received computation results
case GridMessage.COMP_SUCCESSFUL:
resHandler.resultEvent(msg.getJobQueueID(), true, msg.getData());
break;
case GridMessage.COMP_INCORRECT:
resHandler.resultEvent(msg.getJobQueueID(), false, msg.getData());
break;
case GridMessage.COMP_ERROR: // marks the job for recalculation
job = jobs.get(msg.getJobQueueID());
reSendjobQueue.add(job);
break;
}
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* Changes the job timeout value
* @param min is the timeout in minutes
*/
public static void setJobTimeOut(int min){
JOB_TIMEOUT = 1000*60*min;
}
class GridMaintainer extends Thread{
/**
* Runs some behind the scenes stuff
* like job timeout.
*/
public void run(){
while(true){
long time = System.currentTimeMillis();
for(int jobID : jobs.keySet()){
if(time-jobs.get(jobID).timestamp > JOB_TIMEOUT){
reSendjobQueue.add(jobs.get(jobID));
}
}
try{Thread.sleep(1000*60*1);}catch(Exception e){};
}
}
}
}

View file

@ -0,0 +1,38 @@
package zutil.network.nio.worker.grid;
/**
* This interface is the thread that will do
* all the computation in the grid
*
* @author Ziver
*/
public abstract class GridThread implements Runnable{
/**
* The initial static and final data will be sent to this
* method.
*
* @param data is the static and or final data
*/
public abstract void setInitData(Object data);
public void run(){
while(true){
GridJob tmp = null;
try {
tmp = GridClient.getNextJob();
compute(tmp);
} catch (Exception e) {
e.printStackTrace();
if(tmp != null){
GridClient.jobError(tmp.jobID);
}
}
}
}
/**
* Compute the given data and return
* @param data
*/
public abstract void compute(GridJob data) throws Exception;
}