updates for worklet transfer optimization project

aib7 [2002-12-13 02:46:14]
updates for worklet transfer optimization project
worklet specific class loaders, pre-sending of byte code
transfer of junctions as byte arrays
Filename
Worklet.java
WorkletJunction.java
http/ClassServer.java
diff --git a/Worklet.java b/Worklet.java
index 7258447..84a700d 100644
--- a/Worklet.java
+++ b/Worklet.java
@@ -17,6 +17,7 @@ package psl.worklets;

 import java.io.*;
 import java.util.*;
+import java.net.URL;

 /**
  * The mobile agent that is transported to and from {@link WVM}'s
@@ -53,6 +54,7 @@ public final class Worklet implements Serializable {

   /** Collected "data" of WorkletJunction */
   protected Hashtable _payload;
+    public String wid = "";

   /** route that the Worket is to take */
   private final Vector _route = new Vector();
@@ -66,6 +68,11 @@ public final class Worklet implements Serializable {
   /** default security */
   private boolean _isSecure = false;

+    //class loader to load junctions reconstructed from byte array
+    private transient WorkletClassLoader _ldr = null;
+    //junctions kept as byte arrays
+    public Hashtable byteArrays = new Hashtable();
+
   /**
    * Creates a Worklet with the given {@link WorkletJunction} as the
    * originJunction
@@ -78,6 +85,9 @@ public final class Worklet implements Serializable {
       _wjClasses.add(_oj.getClass());
       _oj._worklet = this;
     }
+
+    // create s String worklet Id
+    wid = new String((new Long(new Date().getTime())).toString()+WVM.wvm_id);
   }

   /**
@@ -95,6 +105,20 @@ public final class Worklet implements Serializable {
     }
   }

+
+    /*Add a dummy junction, with actual junction being supplied as byte code*/
+    /*code base for actual junction must be supplied also*/
+    public void addJunction(WorkletJunction _wj, byte []bc,URL url){
+	_wj._originClassServer = url;
+	//add dummy junction
+	addJunction(_wj);
+	//store byte array with actual junction
+	byteArrays.put(_wj.getIndex(),bc);
+
+	// TO DO:
+	//encrypt the byte array
+    }
+
   /**
    * Gets the {@link WorkletJunction} at the origin
    *
@@ -115,10 +139,12 @@ public final class Worklet implements Serializable {
     _system = system;
     if (_atOrigin) {
       _originJunction.sysInit(system, wvm);
+
     } else {
       classHashSet = new HashSet();
       _wjClasses.removeElement(_currentJunction.getClass());
       _currentJunction.sysInit(system, wvm);
+
     }
   }

@@ -127,12 +153,59 @@ public final class Worklet implements Serializable {
    * {@link WVM} after Worklet has been received.
    */
   void execute() {
+      //  System.out.println("Worklet: execute");
     (new Thread() {
       // create a new thread regardless
       public void run() {
         // 2-do: create _priority variable in WJ:
         // 2-do: existing super-priority thread to inherit higher priorities from ...
         WorkletJunction _wj = _atOrigin ? _originJunction : _currentJunction;
+	if(byteArrays.containsKey(_wj.getIndex())){
+	    try{
+		 _junctions.removeElement(_wj);
+		 //TO DO
+		 // decrypt the byte array if encrypted
+		byte []bytecode = (byte[])byteArrays.get(_wj.getIndex());
+		ByteArrayInputStream baiStream = new ByteArrayInputStream(bytecode);
+		if(WVM.wkltRepository.containsKey(wid)){
+		    _ldr = (WorkletClassLoader)WVM.wkltRepository.get(wid);
+		} else {
+		    if (!_atOrigin) {
+			if (_junctions.isEmpty()) returnToOrigin();
+			else {
+			    WVM.err.println("FAILED TO LOAD WJ from BYTES ARRAY: LOADER NOT FOUND!");
+			    WVM.err.println("MOVING ON TO THE NEXT JUNCTION");
+			    moveToNextJunction();
+			    return;
+			}
+		    }
+		}
+		//this must be present
+		_ldr.addTopCodebase(_wj._originClassServer);
+		_ldr.dontSendWorkletId();
+		ObjectInputStream ois = new ObjectInputStream(baiStream) {
+			protected Class resolveClass(ObjectStreamClass v) throws IOException, ClassNotFoundException {
+			    String name = v.getName();
+			    Class c = Class.forName(name, true, _ldr);
+			    return ( (c == null) ? super.resolveClass(v) : c );
+			}
+		    };
+		_wj = (WorkletJunction) ois.readObject();
+		_wj.sysInit(_system, _wvm);
+		baiStream.close();
+	    } catch(Exception e){
+		WVM.err.println("FAILED TO LOAD WJ from BYTES ARRAY:");
+		e.printStackTrace();
+		WVM.err.println("MOVING ON TO THE NEXT JUNCTION");
+		if (!_atOrigin) {
+		    if (_junctions.isEmpty()) returnToOrigin();
+		    else{
+			moveToNextJunction();
+			return;
+		    }
+		}
+	    }
+	}
         Thread t = new Thread(_wj, _hashCode);
 	t.setPriority(_wj.getPriority());
         t.start();
@@ -145,7 +218,10 @@ public final class Worklet implements Serializable {

         if (!_atOrigin) {
           if (_junctions.isEmpty()) returnToOrigin();
-          else moveToNextJunction();
+          else{
+	      moveToNextJunction();
+	      return;
+	  }
         }
       }
     }).start();
@@ -159,12 +235,18 @@ public final class Worklet implements Serializable {
    * @param wvm: {@link WVM} to send the Worklet to
    */
   public void deployWorklet(WVM wvm) {
+      //  System.out.println("Worklet: deployWorklet");
     synchronized (_junctions) {
       _currentJunction = (WorkletJunction) _junctions.firstElement();
       _junctions.removeElement(_currentJunction);
       _junctions.trimToSize();
     }
-
+    //  System.out.println(wid + " " + _junctions.size());
+    try{
+	wvm.regJunctions(wid,_junctions);
+    }catch(Exception e){
+	e.printStackTrace();
+    }
     _wvm = wvm;
     _lHost = wvm.transporter._host;
     _lName = wvm.transporter._name;
@@ -175,7 +257,9 @@ public final class Worklet implements Serializable {

   /** Moves the Worklet from one {@link WorkletJunction} to the next */
   void moveToNextJunction() {
+      //   System.out.println("Worklet: moveToNextJunction");
     WVM _tmpWVM = _wvm;
+    _ldr = null;
     synchronized (_junctions) {
       _currentJunction = (WorkletJunction) _junctions.firstElement();
       _junctions.removeElement(_currentJunction);
@@ -202,6 +286,7 @@ public final class Worklet implements Serializable {

   /** Moves the Worklet to the {@link WorkletJunction} at the origin */
   void returnToOrigin() {
+      //   System.out.println("Worklet: returnToOrigin");
     WVM _tmpWVM = _wvm;
     _atOrigin = true;
     _currentJunction = null;
diff --git a/WorkletJunction.java b/WorkletJunction.java
index 202d192..a18bbb3 100644
--- a/WorkletJunction.java
+++ b/WorkletJunction.java
@@ -35,11 +35,15 @@ public abstract class WorkletJunction implements Serializable, Runnable {
   protected String _name;
   /** Socket port number of target system   */
   protected int _port;
+
+    String appName = null;

   /** associated meta-information */
   protected JunctionPlanner _junctionPlanner;
   /** associated identification */
   protected WorkletID _id;
+    protected String wid;
+
   /** tells the {@link Worklet} whether it needs to wait for the WorkletJunction to execute */
   private boolean _dropOff = false;

@@ -83,6 +87,9 @@ public abstract class WorkletJunction implements Serializable, Runnable {
   /** left here for backwards compatibility with WJackCondition */
   int _state = 0;

+    //index into worklet's hash table where the byte array is
+    public String index = null;
+
   // ---------------------------- Set of Constructors ----------------------------------- //
   /**
    * The complete constructor, creates a WorkletJunction with the
@@ -102,12 +109,15 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @param jp; associated {@link JunctionPlanner} that provides meta-information
    */
   public WorkletJunction(String host, String name, int rmiport, int port,
-			 boolean dropOff, WorkletID id, JunctionPlanner jp) {
+			 boolean dropOff, String id, JunctionPlanner jp) {
     _host = host;
     _name = name;
     _port = port;
     _payload = new Hashtable();
     _dropOff = dropOff;
+
+    if(index == null)
+	index =  new String((new Long(new Date().getTime())).toString());

     if (rmiport == -1) {
       try {
@@ -119,7 +129,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
       _rmiport = rmiport;
     }

-    _id = id;
+    wid = id;
     if (_id != null)
       _id.init(this);  // this gives the workletID the reference to this junction.

@@ -136,13 +146,34 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @param port: port number of socket to try to send through
    */
   public WorkletJunction(String host, String name, int port) {
-    this(host, name, WVM_Host.PORT, port, false, new WorkletID("default"), null);
+    this(host, name, WVM_Host.PORT, port, false, new String("default"), null);
   }
+
+    public WorkletJunction(String host, String name, int port,String apName) {
+	this(host, name, WVM_Host.PORT, port, false, new String("default"), null);
+	appName = apName;
+    }
+     public WorkletJunction(String host, String name, int port,String apName,String id) {
+	this(host, name, WVM_Host.PORT, port, false, id, null);
+	appName = apName;
+    }
+
+     public WorkletJunction(String host, String name, int port,String apName,String id,String i) {
+	this(host, name, WVM_Host.PORT, port, false, id, null);
+	appName = apName;
+	index = i;
+    }
+
+    public String getIndex(){
+	return index;
+    }
+

   /** Entry point function for Threads */
   synchronized final public void run() {
     // register the workletjunction in the WVM.
-    _wvm.registerJunction(this);
+
+      _wvm.registerJunction(this);

     if (_junctionPlanner == null) {
       execute();
@@ -150,6 +181,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
       _junctionPlanner.start();
     }
     // remove the worket junction from the WVM.
+
     _wvm.removeJunction(this.id());
   }

@@ -166,6 +198,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @param origin: WorkletJunction holding addressing info of the origin
    */
   final void setOriginWorkletJunction(WorkletJunction origin) {
+     	// System.out.println("WorkletJunction: setOriginWorkletJunction");
     _originJunction = origin;
   }

@@ -200,6 +233,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @return priority level
    */
   final int getPriority() {
+    	//  System.out.println("WorkletJunction: getPriority");
     return _priority;
   }

@@ -209,6 +243,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @param priority: level of priority
    */
   final void setPriority(int priority) {
+     	// System.out.println("WorkletJunction: setPriority");
     _priority = priority;
   }

@@ -240,32 +275,38 @@ public abstract class WorkletJunction implements Serializable, Runnable {

   /** @return target hostname */
   final String getHost() {
+     	// System.out.println("WorkletJunction: getHost");
     return (_host);
   }

   /** @return target RMI port number */
   final int getRMIPort() {
+    	//  System.out.println("WorkletJunction: getRMIPort");
     return (_rmiport);
   }

   /** @return target RMI server name */
   final String getName() {
+     	// System.out.println("WorkletJunction: getName");
     return (_name);
   }

   /** @return target socket number */
   final int getPort() {
+    	//  System.out.println("WorkletJunction: getPort");
     return (_port);
   }

   /** @return name@host:port */
   public String toString() {
+    	//  System.out.println("WorkletJunction: toString");
     return (_name + "@" + _host + ":" + _port);
   }

   /** @return {@link WorkletID} */
-  final WorkletID id() {
-    return _id;
+  final String id() {
+    	//  System.out.println("WorkletJunction: id");
+    return wid;
   }

   /**
@@ -319,6 +360,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @param tm: array of methods
    */
   public final void setTransportMethods(String[] tm){
+    	//  System.out.println("WorkletJunction: setTransportMethods");
     _transportMethods = tm;
   }

@@ -328,6 +370,7 @@ public abstract class WorkletJunction implements Serializable, Runnable {
    * @return current transport methods
    */
   public final String[] getTransportMethods(){
+    	//  System.out.println("WorkletJunction: getTransportmethods");
     if (_transportMethods == null){
       if (isSecure())
 	_transportMethods = secureDefault;
diff --git a/http/ClassServer.java b/http/ClassServer.java
index 3a3eaca..3845d86 100644
--- a/http/ClassServer.java
+++ b/http/ClassServer.java
@@ -52,7 +52,9 @@ import psl.worklets.*;
 public abstract class ClassServer implements Runnable {
   protected ServerSocket server = null;
   protected int port;
-
+    protected WVM_SSLSocketFactory _sf = null;
+    //table to keep track of which .class files have been pre-sent
+    protected static Hashtable sentByteCodes = new Hashtable();
   /**
    * Constructs a ClassServer that listens on <b>port</b> and
    * obtains a class's bytecodes using the method <b>getBytes</b>.
@@ -70,7 +72,10 @@ public abstract class ClassServer implements Runnable {
     this.port = aPort;
     while (this.port >= aPort) {
       try {
-	if (WVM_sf != null) server = WVM_sf.createServerSocket(this.port);
+	if (WVM_sf != null){
+	    _sf = WVM_sf;
+	    server = WVM_sf.createServerSocket(this.port);
+	}
 	else server = new ServerSocket(this.port);
         break;
       } catch (UnknownHostException e) {
@@ -85,6 +90,10 @@ public abstract class ClassServer implements Runnable {
     newListener();
   }

+    public int getPort(){
+	return this.port;
+    }
+
   /**
    * Returns an array of bytes containing the bytecodes for
    * the class represented by the argument <b>path</b>.
@@ -119,9 +128,11 @@ public abstract class ClassServer implements Runnable {
    */
   public void run() {
     Socket socket = null;
+    boolean got_it = false;
     // accept a connection
     try {
       socket = server.accept();
+      //System.out.println("Accept, port: " + socket.getPort());
       // create a new thread to accept the next connection
       newListener();
     } catch (IOException e) {
@@ -135,18 +146,56 @@ public abstract class ClassServer implements Runnable {
       try {
         // get path to class file from header
         BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-        String path = getPath(in);
-        // retrieve bytecodes
+        //String path = getPath(in);
+	//get class name and worklet id
+	StringPair sp = getPath(in);
         byte[] bytecodes = null;
-        if (containsKey(path)) {
-          bytecodes = get(path);
-          // WVM.out.println(" + + + Serving cached bytecode for class: " + path);
+	if(sp.string2 != null){
+	    if(WVM.wkltRepository.containsKey(sp.string2,sp.string1)){
+		bytecodes = WVM.wkltRepository.get(sp.string2,sp.string1);
+		if(bytecodes!=null){
+		    got_it = true;
+		}
+	    }
+	}
+        if (!got_it && containsKey(sp.string1)) {
+          bytecodes = get(sp.string1);
+	  // WVM.out.println(" + + + Serving cached bytecode for class: " + sp.string1);
         } else {
           // retrieve bytecodes from the file system
-          bytecodes = getBytes(path);
-          // cache the bytecodes to be sent out
-          put(path, bytecodes);
-          // WVM.out.println(" + + + Caching binary data for file: " + path);
+          bytecodes = getBytes(sp.string1);
+	  if(bytecodes != null){
+	      // cache the bytecodes to be sent out
+	      if(sp.string2!=null){
+		  WorkletClassLoader wcl;
+		  if(!WVM.wkltRepository.containsKey(sp.string2)){
+		      URL [] urls = new URL[1];
+		      urls[0] = new URL("http://localhost:"+this.port);
+		      wcl = new WorkletClassLoader(urls,_sf,sp.string2);
+		      WVM.wkltRepository.put(sp.string2,wcl);
+		  } else {
+		      wcl = WVM.wkltRepository.get(sp.string2);
+		  }
+		  wcl.putByteCode(sp.string1,bytecodes);
+		  try{
+		      if(!sentByteCodes.containsKey(new String(sp.string2+sp.string1))){
+			     String u = new String("http://"+java.net.InetAddress.getLocalHost().getHostAddress()+":"+this.port+"/");
+			     URL x = new URL(u);
+			     WVM.transporter.sendByteCode(sp.string2,sp.string1,bytecodes,u);
+			     sentByteCodes.put(new String(sp.string2+sp.string1),"");
+		      }
+		  } catch (Exception e){
+		      WVM.out.println("Failed to pre-send bytecode for " + sp.string2);
+		  }
+	      } else {
+		  put(sp.string1, bytecodes);
+	      }
+	  }  else {
+	      out.writeBytes("HTTP/1.0 400 " +  "\r\n");
+	      out.writeBytes("Content-Type: text/html\r\n\r\n");
+	      out.flush();
+	      return;
+	  }
         }

         // WVM.out.println("Retrieved bytecodes: " + bytecodes.length);
@@ -177,11 +226,13 @@ public abstract class ClassServer implements Runnable {
         }
       } catch (Exception e) {
         // write out error response
+	  // e.printStackTrace();
         out.writeBytes("HTTP/1.0 400 " + e.getMessage() + "\r\n");
         out.writeBytes("Content-Type: text/html\r\n\r\n");
         out.flush();
       }
     } catch (IOException ex) {
+	ex.printStackTrace();
       // eat exception (could log error to log file, but
       // write out to stdout for now).
       // dp2041: I commented out next two lines.  possible errors are:
@@ -214,20 +265,28 @@ public abstract class ClassServer implements Runnable {
    * Returns the path to the class file obtained from
    * parsing the HTML header.
    */
-  private static String getPath(BufferedReader in) throws IOException {
+  private static StringPair getPath(BufferedReader in) throws IOException {
+      StringPair sp = null;
     String line = in.readLine();
-    // WVM.out.println("\n + + + request is: " + line);
+    WVM.out.println("\n + + + request is: " + line);
     String path = "";
+    String wid = null;
     // extract class from GET line
     if (line.startsWith("GET /")) {
       line = line.substring(5, line.length()-1).trim();
-      int index = line.indexOf(".class ");
+      int index = line.indexOf(".class");
       if (index != -1) {
         path = line.substring(0, index).replace('/', '.');
       } else {
         path = new StringTokenizer(line, " ").nextToken();
       }
       if (WVM.DEBUG(3)) WVM.out.println("path is: " + path);
+      //see if the worklet id is supplied
+      int i = line.indexOf("?");
+      if(i!= -1 ){
+	  //have worklet id
+	  wid = line.substring(i+1,line.length()-8);
+      }
     } else {
       throw new IOException("Malformed Header in Class request");
     }
@@ -238,8 +297,9 @@ public abstract class ClassServer implements Runnable {
       // WVM.out.println (" + + + " + line);
     } while ((line.length() != 0) && (line.charAt(0) != '\r') && (line.charAt(0) != '\n'));
     if (path.length() != 0) {
+	sp = new StringPair(path,wid);
       // WVM.out.println("Edited: gskc, 19Feb01 --- returning path: " + path);
-      return path;
+      return sp;
     } else {
       throw new IOException("Malformed Header");
     }