diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index bf0807c..7e97599 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -27,64 +27,40 @@ public class EsperProcessor implements TickProcessor { public EsperProcessor(String epl) { + Configuration config = new Configuration(); + // disable esper's internal clock. we use tick data for our // sense of time. - Configuration config = new Configuration(); config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); + + // register event types defined in java classes config.addEventType(TickEvent.class); + config.addEventType(TimeValueEvent.class); config.addEventType(LongEntryEvent.class); - //config.addVariable("FOO", int.class, 12); config.addEventType(OHLCEvent.class); config.addEventType(OHLCValueEvent.class); + // add OHLC plugin config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); engine = EPServiceProviderManager.getDefaultProvider(config); + //engine.getEPRuntime().setVariableValue("FOO", 12d); + addStatements(epl); - addStatement("select * from TickEvent", - (newData, oldData) -> { - log.debug("Tick: {}", newData[0].getUnderlying()); - }); + // addLogStatement("TickEvent"); + // addLogStatement("OneMinuteOHLCStream"); + // addLogStatement("BStream"); + // addLogStatement("PStream"); + // addLogStatement("B1"); + // addLogStatement("B2"); + // addLogStatement("P1"); + // addLogStatement("P2"); + // addLogStatement("LongEntryStream"); + // addLogStatement("LogStream"); - addStatement("select * from OHLCStream", - (newData, oldData) -> { - log.debug("OLHC: {}", newData[0].getUnderlying()); - // DateTime t = (DateTime)newData[0].get("date"); - // double f = (double)newData[0].get("first"); - // double l = (double)newData[0].get("last"); - // double x = (double)newData[0].get("max"); - // double n = (double)newData[0].get("min"); - // log.info("OHLC: {} {} {} {}", f,l,x,n); - }); - - // addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)", - // new OHLCUpdateListener()); - - // addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow", - // (newData, oldData) -> { - // log.debug("TickTimeWindow: {}", (Object)newData); - // }); - - // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); - - // statement.addListener((newData, oldData) -> { - // log.debug("event: {}", (Object)newData); - // }); - - - // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); - - // statement.addListener((newData, oldData) -> { - // log.debug("event: {}", (Object)newData); - // }); - - // addStatement("select count(*) as count from TickEvent#time(4 sec)", - // (newData, oldData) -> { - // long count = (long)newData[0].get("count"); - // log.info("Tick Count: {}", count); - // }); + addLogStreamHandler(); } /** @@ -160,6 +136,28 @@ public class EsperProcessor implements TickProcessor { return statement; } + /** + * Add a simple EPL statement to the Esper engine that logs any + * event that matches the event name. + */ + private EPStatement addLogStreamHandler() { + return addStatement("select * from LogStream", + (newData, oldData) -> { + log.debug("{}: {}", newData[0].get("stream"), newData[0].get("event")); + }); + } + + /** + * Add a simple EPL statement to the Esper engine that logs any + * event that matches the event name. + */ + private EPStatement addLogStatement(String name) { + return addStatement("select * from " + name, + (newData, oldData) -> { + log.debug("{}: {}", name, newData[0].getUnderlying()); + }); + } + /** * Add a single EPL statement to the Esper engine with a listener * to respond to the Statement.