changed folder name

This commit is contained in:
Ziver Koc 2011-02-15 19:40:02 +00:00
parent 4503531ec9
commit 80c6a52c69
73 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,22 @@
package zutil.net.nio.worker;
import java.io.IOException;
import zutil.io.MultiPrintStream;
public class EchoWorker extends ThreadedEventWorker {
@Override
public void messageEvent(WorkerDataEvent dataEvent) {
try {
// Return to sender
MultiPrintStream.out.println("Recived Msg: "+dataEvent.data);
dataEvent.network.send(dataEvent.socket, dataEvent.data);
} catch (IOException e) {
e.printStackTrace();
}
}
}

View file

@ -0,0 +1,129 @@
package zutil.net.nio.worker;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import zutil.log.LogUtil;
import zutil.net.nio.NioNetwork;
import zutil.net.nio.message.ChatMessage;
import zutil.net.nio.message.Message;
import zutil.net.nio.message.SyncMessage;
import zutil.net.nio.message.type.EchoMessage;
import zutil.net.nio.message.type.ResponseRequestMessage;
import zutil.net.nio.response.ResponseEvent;
import zutil.net.nio.service.NetworkService;
import zutil.net.nio.service.chat.ChatService;
import zutil.net.nio.service.sync.SyncService;
public class SystemWorker extends ThreadedEventWorker {
private static Logger logger = LogUtil.getLogger();
private NioNetwork nio;
// Maps a SocketChannel to a RspHandler
private Map<Double, ResponseEvent> rspEvents = new HashMap<Double, ResponseEvent>();
// Difren services listening on specific messages
private Map<Class<?>, NetworkService> services = new HashMap<Class<?>, NetworkService>();
/**
* Creates a new SystemWorker
* @param nio The Network
*/
public SystemWorker(NioNetwork nio){
this.nio = nio;
}
@Override
public void messageEvent(WorkerDataEvent event) {
try {
logger.finer("System Message: "+event.data.getClass().getName());
if(event.data instanceof Message){
if(event.data instanceof EchoMessage && ((EchoMessage)event.data).echo()){
// Echos back the recived message
((EchoMessage)event.data).recived();
logger.finer("Echoing Message: "+event.data);
nio.send(event.socket, event.data);
}
else if(event.data instanceof ResponseRequestMessage &&
rspEvents.get(((ResponseRequestMessage)event.data).getResponseId()) != null){
// Handle the response
handleResponse(((ResponseRequestMessage)event.data).getResponseId(), event.data);
logger.finer("Response Request Message: "+event.data);
}
else{
//Services
if(services.containsKey(event.data.getClass()) ||
!services.containsKey(event.data.getClass()) && defaultServices(event.data)){
services.get(event.data.getClass()).handleMessage((Message)event.data, event.socket);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Registers a Service to a specific message
*
* @param c The Message class
* @param ns The service
*/
public void registerService(Class<?> c, NetworkService ns){
services.put(c, ns);
}
/**
* Unregisters a service
*
* @param c The class
*/
public void unregisterService(Class<?> c){
services.remove(c);
}
/**
* Connects a ResponseHandler to a specific message
* @param handler The Handler
* @param data The Message
*/
public void addResponseHandler(ResponseEvent handler, ResponseRequestMessage data){
rspEvents.put(data.getResponseId(), handler);
}
/**
* Client And Server ResponseEvent
*/
private void handleResponse(double responseId, Object rspData){
// Look up the handler for this channel
ResponseEvent handler = rspEvents.get(responseId);
// And pass the response to it
handler.handleResponse(rspData);
rspEvents.remove(responseId);
}
/**
* Registers the default services in the engin e
* if the message needs one of them
*
* @param o The message
*/
private boolean defaultServices(Object o){
if(o instanceof SyncMessage){
if(SyncService.getInstance() == null)
registerService(o.getClass(), new SyncService(nio));
else
registerService(o.getClass(), SyncService.getInstance());
return true;
}
else if(o instanceof ChatMessage){
if(ChatService.getInstance() == null)
registerService(o.getClass(), new ChatService(nio));
else
registerService(o.getClass(), ChatService.getInstance());
return true;
}
return false;
}
}

View file

@ -0,0 +1,34 @@
package zutil.net.nio.worker;
public abstract class ThreadedEventWorker extends Worker{
private Thread thread;
public ThreadedEventWorker(){
thread = new Thread(this);
thread.start();
}
public void update() {
WorkerDataEvent dataEvent;
while(true) {
try{
// Wait for data to become available
synchronized(getEventQueue()) {
while(getEventQueue().isEmpty()) {
try {
getEventQueue().wait();
} catch (InterruptedException e) {}
}
dataEvent = (WorkerDataEvent) getEventQueue().remove(0);
}
messageEvent(dataEvent);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public abstract void messageEvent(WorkerDataEvent e);
}

View file

@ -0,0 +1,52 @@
package zutil.net.nio.worker;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
import zutil.net.nio.NioNetwork;
public abstract class Worker implements Runnable {
private LinkedList<WorkerDataEvent> queue = new LinkedList<WorkerDataEvent>();
public void processData(NioNetwork server, SocketChannel socket, Object data) {
synchronized(queue) {
queue.add(new WorkerDataEvent(server, socket, data));
queue.notify();
}
}
/**
* @return The event queue
*/
protected List<WorkerDataEvent> getEventQueue(){
return queue;
}
/**
* @return If there is a event in the queue
*/
protected boolean hasEvent(){
return !queue.isEmpty();
}
/**
* Polls a event from the list or waits until there is a event
* @return The next event
*/
protected WorkerDataEvent pollEvent(){
while(queue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {}
}
return queue.poll();
}
public void run(){
update();
}
public abstract void update();
}

View file

@ -0,0 +1,18 @@
package zutil.net.nio.worker;
import java.nio.channels.SocketChannel;
import zutil.net.nio.NioNetwork;
public class WorkerDataEvent {
public NioNetwork network;
public SocketChannel socket;
public Object data;
public WorkerDataEvent(NioNetwork server, SocketChannel socket, Object data) {
this.network = server;
this.socket = socket;
this.data = data;
}
}

View file

@ -0,0 +1,114 @@
package zutil.net.nio.worker.grid;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import zutil.io.MultiPrintStream;
import zutil.net.nio.NioClient;
import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerDataEvent;
/**
* This class is the client part of the grid.
* It connects to a grid server and requests new job.
* And then sends back the result to the server.
*
* @author Ziver
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public class GridClient extends ThreadedEventWorker {
private static LinkedList<GridJob> jobQueue;
private static GridThread thread;
private static NioClient network;
/**
* Creates a new GridClient object and registers itself at the server
* and sets itself as a worker in NioClient
*
* @param thread the Thread interface to run for the jobs
* @param network the NioClient to use to communicate to the server
*/
public GridClient(GridThread thread, NioClient network){
jobQueue = new LinkedList<GridJob>();
GridClient.thread = thread;
GridClient.network = network;
}
/**
* Starts up the client and a couple of GridThreads.
* And registers itself as a worker in NioClient
* @throws IOException
*/
public void initiate() throws IOException{
network.setDefaultWorker(this);
network.send(new GridMessage(GridMessage.REGISTER));
for(int i=0; i<Runtime.getRuntime().availableProcessors() ;i++){
Thread t = new Thread(thread);
t.start();
}
}
@Override
public void messageEvent(WorkerDataEvent e) {
// ignores other messages than GridMessage
if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data;
switch(msg.messageType()){
// Receive data from Server
case GridMessage.INIT_DATA:
thread.setInitData(msg.getData());
break;
case GridMessage.COMP_DATA:
jobQueue.add(new GridJob(msg.getJobQueueID(), (Queue)msg.getData()));
break;
}
}
}
/**
* Register whit the server that the job is done
*
* @param jobID is the job id
* @param correct if the answer was right
* @param result the result of the computation
* @throws IOException
*/
public static void jobDone(int jobID, boolean correct, Object result) throws IOException{
if(correct)
network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID, result));
else
network.send(new GridMessage(GridMessage.COMP_INCORRECT, jobID, result));
}
/**
* Registers with the server that there was an
* error when computing this job
*
* @param jobID is the job id
*/
public static void jobError(int jobID){
try{
network.send(new GridMessage(GridMessage.COMP_SUCCESSFUL, jobID));
}catch(Exception e){e.printStackTrace();}
}
/**
* @return a new job to compute
* @throws IOException
*/
public static synchronized GridJob getNextJob() throws IOException{
if(jobQueue.isEmpty()){
network.send(new GridMessage(GridMessage.NEW_DATA));
while(jobQueue.isEmpty()){
try{Thread.sleep(100);}catch(Exception e){}
}
}
MultiPrintStream.out.println("Starting job");
return jobQueue.poll();
}
}

View file

@ -0,0 +1,22 @@
package zutil.net.nio.worker.grid;
/**
* A internal class for handling the jobs
*
* @author Ziver
*/
public class GridJob{
public int jobID;
public Object job;
public long timestamp;
public GridJob(int jobID, Object job){
this.jobID = jobID;
this.job = job;
renewTimeStamp();
}
public void renewTimeStamp(){
timestamp = System.currentTimeMillis();
}
}

View file

@ -0,0 +1,18 @@
package zutil.net.nio.worker.grid;
/**
* Generates new jobs for the grid to compute
*
* @author Ziver
*/
public interface GridJobGenerator<T> {
/**
* @return static and final values that do not change for every job
*/
public Object initValues();
/**
* @return a new generated job
*/
public T generateJob();
}

View file

@ -0,0 +1,11 @@
package zutil.net.nio.worker.grid;
/**
* Handles the incoming results from the grid
*
* @author Ziver
*/
public interface GridResultHandler<T> {
public void resultEvent(int jobID, boolean correct, T result);
}

View file

@ -0,0 +1,111 @@
package zutil.net.nio.worker.grid;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import zutil.net.nio.message.GridMessage;
import zutil.net.nio.worker.ThreadedEventWorker;
import zutil.net.nio.worker.WorkerDataEvent;
/**
* Implements a simple network computing server
*
* @author Ziver
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public class GridServerWorker extends ThreadedEventWorker{
// Job timeout after 30 min
public static int JOB_TIMEOUT = 1000*60*30;
private HashMap<Integer, GridJob> jobs; // contains all the ongoing jobs
private Queue<GridJob> reSendjobQueue; // Contains all the jobs that will be recalculated
private GridJobGenerator jobGenerator; // The job generator
private GridResultHandler resHandler;
private int nextJobID;
public GridServerWorker(GridResultHandler resHandler, GridJobGenerator jobGenerator){
this.resHandler = resHandler;
this.jobGenerator = jobGenerator;
nextJobID = 0;
jobs = new HashMap<Integer, GridJob>();
reSendjobQueue = new LinkedList<GridJob>();
GridMaintainer maintainer = new GridMaintainer();
maintainer.start();
}
@Override
public void messageEvent(WorkerDataEvent e) {
try {
// ignores other messages than GridMessage
if(e.data instanceof GridMessage){
GridMessage msg = (GridMessage)e.data;
GridJob job = null;
switch(msg.messageType()){
case GridMessage.REGISTER:
e.network.send(e.socket, new GridMessage(GridMessage.INIT_DATA, 0, jobGenerator.initValues()));
break;
// Sending new data to compute to the client
case GridMessage.NEW_DATA:
if(!reSendjobQueue.isEmpty()){ // checks first if there is a job for recalculation
job = reSendjobQueue.poll();
job.renewTimeStamp();
}
else{ // generates new job
job = new GridJob(nextJobID,
jobGenerator.generateJob());
jobs.put(job.jobID, job);
nextJobID++;
}
GridMessage newMsg = new GridMessage(GridMessage.COMP_DATA, job.jobID, job.job);
e.network.send(e.socket, newMsg);
break;
// Received computation results
case GridMessage.COMP_SUCCESSFUL:
resHandler.resultEvent(msg.getJobQueueID(), true, msg.getData());
break;
case GridMessage.COMP_INCORRECT:
resHandler.resultEvent(msg.getJobQueueID(), false, msg.getData());
break;
case GridMessage.COMP_ERROR: // marks the job for recalculation
job = jobs.get(msg.getJobQueueID());
reSendjobQueue.add(job);
break;
}
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* Changes the job timeout value
* @param min is the timeout in minutes
*/
public static void setJobTimeOut(int min){
JOB_TIMEOUT = 1000*60*min;
}
class GridMaintainer extends Thread{
/**
* Runs some behind the scenes stuff
* like job timeout.
*/
public void run(){
while(true){
long time = System.currentTimeMillis();
for(int jobID : jobs.keySet()){
if(time-jobs.get(jobID).timestamp > JOB_TIMEOUT){
reSendjobQueue.add(jobs.get(jobID));
}
}
try{Thread.sleep(1000*60*1);}catch(Exception e){};
}
}
}
}

View file

@ -0,0 +1,38 @@
package zutil.net.nio.worker.grid;
/**
* This interface is the thread that will do
* all the computation in the grid
*
* @author Ziver
*/
public abstract class GridThread implements Runnable{
/**
* The initial static and final data will be sent to this
* method.
*
* @param data is the static and or final data
*/
public abstract void setInitData(Object data);
public void run(){
while(true){
GridJob tmp = null;
try {
tmp = GridClient.getNextJob();
compute(tmp);
} catch (Exception e) {
e.printStackTrace();
if(tmp != null){
GridClient.jobError(tmp.jobID);
}
}
}
}
/**
* Compute the given data and return
* @param data
*/
public abstract void compute(GridJob data) throws Exception;
}