Replay support in console

jjp32 [2002-09-09 14:41:32]
Replay support in console
Filename
ep/EPConfig.xml
ep/input/ConsoleInput.java
ep/input/SienaInput.java
ep/store/EPStore.java
ep/store/JDBCStore.java
ep/transform/StoreTransform.java
diff --git a/ep/EPConfig.xml b/ep/EPConfig.xml
index aad104b..58ede3a 100644
--- a/ep/EPConfig.xml
+++ b/ep/EPConfig.xml
@@ -8,7 +8,6 @@
     <EventFormat Name="psl.xues.ep.event.DOMEvent"></EventFormat>
   </EventFormats>
   <Inputters>
-    <Inputter Name="SocketInput1" Type="psl.xues.ep.input.SocketInput" Port="9001" SocketType="tcpconn" DataType="XMLObject" />
     <Inputter Name="SienaInput1" Type="psl.xues.ep.input.SienaInput" SienaReceivePort="7890">
       <SienaFilter Name="AllFilter" />
     </Inputter>
@@ -20,10 +19,15 @@
     <Outputter Name="ConsoleOutput1" Type="psl.xues.ep.output.ConsoleOutput"></Outputter>
   </Outputters>
   <Transforms>
-    <Transform Name="EventConverter1"
-        Type="psl.xues.ep.transform.EventConverter" OutputFormat="SienaEvent"/>
+    <Transform Name="StoreTransform"
+	       Type="psl.xues.ep.transform.StoreTransform"
+	       StoreName="Store1"/>
   </Transforms>
   <Stores>
+    <Store Name="JDBCStore1"
+	   Type="psl.xues.ep.store.JDBCStore"
+	   DBType="hsqldb" DBDriver="org.hsqldb.jdbcDriver"
+	   Username="sa" Password="" />
   </Stores>
   <Rules>
     <Rule Name="SienaInputRule">
diff --git a/ep/input/ConsoleInput.java b/ep/input/ConsoleInput.java
index c1da673..61df678 100644
--- a/ep/input/ConsoleInput.java
+++ b/ep/input/ConsoleInput.java
@@ -12,6 +12,7 @@ import org.w3c.dom.Element;
 import psl.xues.ep.event.StringEvent;
 import psl.xues.ep.event.DOMEvent;
 import psl.xues.ep.event.SienaEvent;
+import psl.xues.ep.store.EPStore;

 import siena.Notification;

@@ -29,6 +30,12 @@ import siena.Notification;
  * Copyright (c) 2002: The Trustees of Columbia University in the
  * City of New York.  All Rights Reserved.
  *
+ * <!--
+ * TODO;
+ * - Support replay, database maintenance
+ * - Consider combining with configuration to support GUI frontend
+ * -->
+ *
  * @author Janak J Parekh <janak@cs.columbia.edu>
  * @version $Revision$
  */
@@ -104,6 +111,10 @@ public class ConsoleInput extends EPInput {
         "  {\"a1\"=(valuetype)\"data\", ...} as the parameter; supported valuetypes\n" +
         "  include String, boolean, double, or long.  If valuetype is not specified,\n" +
         "  String is assumed\n" +
+        "- REPLAY <StoreName> [-s SourceName] [-t StartTime EndTime] [-o]: replay a set\n" +
+        "  of events to the associated output.  If you use -t, specify BOTH start and end\n" +
+        "  times, in long integer form; -o implies \"Original Time Framing\", which plays\n" +
+        "  back events with the time intervals in which they were originally received.\n" +
         "- SHUTDOWN: shuts down the Event Packager cleanly\n" +
         "-------------------");
       }
@@ -200,6 +211,51 @@ public class ConsoleInput extends EPInput {
           ep.injectEvent(new SienaEvent(getName(), n));
       }

+      else if(isCommand(command, "replay")) {
+        // Arguments parsing, much like main()
+        StringTokenizer tok = new StringTokenizer(command);
+        String storeName = null;
+        String sourceName = null;
+        long beginTime = -1;
+        long endTime = -1;
+        boolean originalTime = false;
+        try {
+          // Skip over command declaration; this should never fail
+          tok.nextToken();
+          // Now parse
+          while(tok.hasMoreTokens()) {
+            String param = tok.nextToken();
+            if(!param.startsWith("-")) { // Must be store name
+              storeName = param;
+            } else if(param.equals("-s")) { // Source name
+              sourceName = tok.nextToken();
+            } else if(param.equals("-t")) { // Timestamp range
+              beginTime = Long.parseLong(tok.nextToken());
+              endTime = Long.parseLong(tok.nextToken());
+            } else if(param.equals("-o")) { // Original timestamp
+              originalTime = true;
+            }
+          }
+        } catch(Exception e) {
+          out.println("Error parsing replay request, try again");
+          continue;
+        }
+
+        if(storeName == null) {
+          out.println("Store name must be specified for replay");
+          continue;
+        } else if(sourceName == null && (beginTime == -1 || endTime == -1)) {
+          out.println("Either source or timestamp range must be specified");
+          continue;
+        }
+
+        // Now replay!
+        EPStore eps = ep.getStore(storeName);
+        if(eps.playbackEvents(getName(), sourceName, beginTime, endTime,
+        originalTime) == false) {
+          out.println("No events to replay!");
+        }
+      }
       // Invalid
       else {
         // If we're in ep-shutdown, let bygones be bygones
diff --git a/ep/input/SienaInput.java b/ep/input/SienaInput.java
index 1a47045..99f994e 100644
--- a/ep/input/SienaInput.java
+++ b/ep/input/SienaInput.java
@@ -283,22 +283,9 @@ public class SienaInput extends EPInput implements Notifiable {
         }
         // Now playback based on what we have
         EPStore eps = ep.getStore(store);
-        Object[] refs = null;
-        if(t1 != -1 && t2 != -1) {
-          if(source != null) {
-            refs = eps.requestEvents(source, t1, t2);
-          } else {
-            refs = eps.requestEvents(t1, t2);
-          }
-        } else {
-          refs = eps.requestEvents(source);
-        }
-
-        if(refs != null) {
-          // There's data to play back
-          eps.playbackEvents(getName(), refs, orgTime);
-        } else {
+        if(eps.playbackEvents(getName(), source, t1, t2, orgTime) == false) {
           debug.debug("No events found to actually playback");
+          // XXX - send Siena notification back?
         }
       }
     }
diff --git a/ep/store/EPStore.java b/ep/store/EPStore.java
index 9041521..3d62a4a 100644
--- a/ep/store/EPStore.java
+++ b/ep/store/EPStore.java
@@ -129,6 +129,68 @@ public abstract class EPStore implements Runnable, EPPlugin {
   }

   /**
+   * Another utility method to play back events.  Will create a new playback
+   * thread that injects events into EP on behalf of an input.
+   *
+   * @param sourceAs The source to play back as, into EP.
+   * @param sourceFrom The source to play events from.
+   * @param t1 The lower timebound (inclusive).
+   * @param t2 The upper timebound (inclusive).
+   * @param originalTime Use original time spacing?
+   * @return A boolean indicating if any events were queued for play.
+   */
+  public boolean playbackEvents(String sourceAs, String sourceFrom,
+  long t1, long t2, boolean originalTime) {
+    Object[] refs = null;
+    if(t1 != -1 && t2 != -1 && sourceFrom != null) {
+      refs = requestEvents(sourceFrom, t1, t2);
+    } else if(t1 != -1 && t2 != -1) {
+      refs = requestEvents(t1, t2);
+    } else if(sourceFrom != null) {
+      refs = requestEvents(sourceFrom);
+    } else {
+      debug.warn("PlaybackEvents requested with no source and no timestamp");
+      return false;
+    }
+
+    if(refs == null) {
+      debug.debug("No events found to actually playback");
+      return false;
+    }
+
+    // There's data to play back
+    playbackEvents(sourceAs, refs, originalTime);
+    return true;
+  }
+
+  /**
+   * Same as previous method, but search for all sources.
+   *
+   * @param sourceAs The source to play back as, into EP.
+   * @param t1 The lower timebound (inclusive).
+   * @param t2 The upper timebound (inclusive).
+   * @param originalTime Use original time spacing?
+   * @return A boolean indicating if any events were queued for play.
+   */
+  public boolean playbackEvents(String sourceAs, long t1, long t2,
+  boolean originalTime) {
+    return playbackEvents(sourceAs, null, t1, t2, originalTime);
+  }
+
+  /**
+   * Same as previous methods, but all time across one source.
+   *
+   * @param sourceAs The source to play back as, into EP.
+   * @param sourceFrom The source to play events from.
+   * @param originalTime Use original time spacing?
+   * @return A boolean indicating if any events were queued for play.
+   */
+  public boolean playbackEvents(String sourceAs, String sourceFrom,
+  boolean originalTime) {
+    return playbackEvents(sourceAs, sourceFrom, -1, -1, originalTime);
+  }
+
+  /**
    * Run.
    */
   public void run() {
diff --git a/ep/store/JDBCStore.java b/ep/store/JDBCStore.java
index bf7d233..3ca757f 100644
--- a/ep/store/JDBCStore.java
+++ b/ep/store/JDBCStore.java
@@ -15,7 +15,36 @@ import psl.xues.ep.event.EPEvent;
 import psl.xues.ep.util.Base64;

 /**
- * JDBC store mechanism.
+ * JDBC store mechanism.  We support most JDBC databasers, including
+ * Hypersonic-SQL (tested), PostgreSQL (tested), and even possibly
+ * SQL Server 2000 (untested).
+ * <p>
+ * Usage: <em>([] implies an optional parameter)</em></p>
+ * <p><tt>
+ * &lt;Stores&gt;<br>
+ * <blockquote>&lt;Store Name="<em>instance name</em>"
+ * Type="psl.xues.ep.store.JDBCStore" DBType="<em>database type</em>"
+ * DBDriver="<em>database driver</em>" [DBName="<em>database name</em>"]
+ * [TableName="<em>table name</em>"] Username="<em>username</em>"
+ * Password="<em>password</em>" /&gt;<br></blockquote>
+ * &lt;/Stores&gt;
+ * </tt></p>
+ * <p>
+ * Attributes/parameters:<ol>
+ * <li><b>DBType</b>: The database type name (for JDBC).</li>
+ * <li><b>DBDriver</b>: The database driver (full classname; for JDBC).</li>
+ * <li><b>DBName</b>: The name of the database to use.  If none is specified,
+ * "EventPackager" will be used.  If the database does not exist, it will
+ * automatically be created.</li>
+ * <li><b>DBTable</b>: The name of the table to use.  If none is specified,
+ * "EPData" will be used.  If the table does not exist, it will automatically
+ * be created.</li>
+ * <li><b>Username</b>: The username to use when connecting to the JDBC
+ * driver.  (Required for now; anonymous connections may be supported in the
+ * future.)</li>
+ * <li><b>Password</b>: The password to use when connecting to the JDBC
+ * driver. (For a passwordless connection, specify an empty string.)</li>
+ * </ol>
  * <p>
  * Copyright (c) 2002: The Trustees of Columbia University in the
  * City of New York.  All Rights Reserved.
@@ -36,9 +65,9 @@ public class JDBCStore extends EPStore {
   /** Database driver for this type */
   private String dbDriver = null;
   /** Name of database */
-  private String dbName = null;
+  private String dbName = "EventPackager";
   /** Name of table */
-  private String tableName = null;
+  private String tableName = "EPData";
   /** Username */
   private String username = null;
   /** Password */
diff --git a/ep/transform/StoreTransform.java b/ep/transform/StoreTransform.java
index fb0b598..b080c74 100644
--- a/ep/transform/StoreTransform.java
+++ b/ep/transform/StoreTransform.java
@@ -9,8 +9,17 @@ import org.w3c.dom.Element;
  * Transform that snapshots the current event and writes it to the
  * configured store.
  * <p>
+ * Attributes/parameters: <ol>
+ * <li><b>StoreName</b>: Specifies the store instance to capture events to.
+ * </ol>
+ * <p>
  * Copyright (c) 2002: The Trustees of Columbia University in the
  * City of New York.  All Rights Reserved.
+ *
+ * <!--
+ * TODO:
+ * - Consider additional filtration mechanisms.
+ * -->
  *
  * @author Janak J Parekh
  * @version $Revision$