diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index 834f4df..f914091 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -26,169 +26,168 @@ public class EsperProcessor implements TickProcessor { EPServiceProvider engine; - public EsperProcessor(String epl) { - // disable esper's internal clock. we use tick data for our - // sense of time. - Configuration config = new Configuration(); - config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); - config.addEventType(TickEvent.class); - config.addEventType(LongEntryEvent.class); - //config.addVariable("FOO", int.class, 12); + public EsperProcessor(String epl) { + // disable esper's internal clock. we use tick data for our + // sense of time. + Configuration config = new Configuration(); + config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); + config.addEventType(TickEvent.class); + config.addEventType(LongEntryEvent.class); + //config.addVariable("FOO", int.class, 12); - config.addEventType("OHLCTick", OHLCTick.class); - config.addEventType("OHLCValue", OHLCValue.class); + config.addEventType("OHLCTick", OHLCTick.class); + config.addEventType("OHLCValue", OHLCValue.class); config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); - engine = EPServiceProviderManager.getDefaultProvider(config); + engine = EPServiceProviderManager.getDefaultProvider(config); - addStatements(epl); + addStatements(epl); - addStatement("select * from TickEvent", - (newData, oldData) -> { - log.debug("Tick: {}", newData[0].getUnderlying()); - }); + addStatement("select * from TickEvent", + (newData, oldData) -> { + log.debug("Tick: {}", newData[0].getUnderlying()); + }); - addStatement("select * from OHLCStream", - (newData, oldData) -> { - log.debug("OLHC: {}", newData[0].getUnderlying()); - // DateTime t = (DateTime)newData[0].get("date"); - // double f = (double)newData[0].get("first"); - // double l = (double)newData[0].get("last"); - // double x = (double)newData[0].get("max"); - // double n = (double)newData[0].get("min"); - // log.info("OHLCValue: {} {} {} {}", f,l,x,n); - }); + addStatement("select * from OHLCStream", + (newData, oldData) -> { + log.debug("OLHC: {}", newData[0].getUnderlying()); + // DateTime t = (DateTime)newData[0].get("date"); + // double f = (double)newData[0].get("first"); + // double l = (double)newData[0].get("last"); + // double x = (double)newData[0].get("max"); + // double n = (double)newData[0].get("min"); + // log.info("OHLCValue: {} {} {} {}", f,l,x,n); + }); - // addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)", - // new OHLCUpdateListener()); + // addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)", + // new OHLCUpdateListener()); - // addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow", - // (newData, oldData) -> { - // log.debug("TickTimeWindow: {}", (Object)newData); - // }); + // addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow", + // (newData, oldData) -> { + // log.debug("TickTimeWindow: {}", (Object)newData); + // }); - // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); + // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); - // statement.addListener((newData, oldData) -> { - // log.debug("event: {}", (Object)newData); - // }); + // statement.addListener((newData, oldData) -> { + // log.debug("event: {}", (Object)newData); + // }); - // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); + // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); - // statement.addListener((newData, oldData) -> { - // log.debug("event: {}", (Object)newData); - // }); + // statement.addListener((newData, oldData) -> { + // log.debug("event: {}", (Object)newData); + // }); - // addStatement("select count(*) as count from TickEvent#time(4 sec)", - // (newData, oldData) -> { - // long count = (long)newData[0].get("count"); - // log.info("Tick Count: {}", count); - // }); - - } + // addStatement("select count(*) as count from TickEvent#time(4 sec)", + // (newData, oldData) -> { + // long count = (long)newData[0].get("count"); + // log.info("Tick Count: {}", count); + // }); + } - /** - * Split a (possibly) multiline string of EPL statements into one - * statement per string. Statements are separated by at least one - * blank line. - */ - private static String[] splitStatements(String str) { - return str.split("(\\r?\\n){2,}"); - } + /** + * Split a (possibly) multiline string of EPL statements into one + * statement per string. Statements are separated by at least one + * blank line. + */ + private static String[] splitStatements(String str) { + return str.split("(\\r?\\n){2,}"); + } - /** - * Split a string into individual lines. - */ - private static String[] splitLines(String str) { - return str.split("\\r?\\n"); - } + /** + * Split a string into individual lines. + */ + private static String[] splitLines(String str) { + return str.split("\\r?\\n"); + } - /** - * Return true if the given string is a comment. - */ - private static boolean isComment(String str) { - return str.startsWith("//") || str.startsWith("--"); - } + /** + * Return true if the given string is a comment. + */ + private static boolean isComment(String str) { + return str.startsWith("//") || str.startsWith("--"); + } - /** - * Return true if the given string is only whitespace. - */ - private static boolean isEmpty(String str) { - return str.matches("^\\s*$"); - } + /** + * Return true if the given string is only whitespace. + */ + private static boolean isEmpty(String str) { + return str.matches("^\\s*$"); + } - /** - * Remove comment lines from the string. - */ - private static String filterComments(String str) { - return Arrays.stream(splitLines(str)) - .filter(s -> !isComment(s)) - .collect(Collectors.joining("\n")); - } + /** + * Remove comment lines from the string. + */ + private static String filterComments(String str) { + return Arrays.stream(splitLines(str)) + .filter(s -> !isComment(s)) + .collect(Collectors.joining("\n")); + } - /** - * Given a string with (possibly) multiple statements, split and - * add individually. - */ - private void addStatements(String epl) { - for (String s : splitStatements(filterComments(epl))) { - if (!isEmpty(s)) - addStatement(s); - } - } + /** + * Given a string with (possibly) multiple statements, split and + * add individually. + */ + private void addStatements(String epl) { + for (String s : splitStatements(filterComments(epl))) { + if (!isEmpty(s)) + addStatement(s); + } + } - /** - * Add a single EPL statement to the Esper engine. - */ - private EPStatement addStatement(String epl) { - return addStatement(epl, (UpdateListener)null); - } + /** + * Add a single EPL statement to the Esper engine. + */ + private EPStatement addStatement(String epl) { + return addStatement(epl, (UpdateListener)null); + } - /** - * Add a single EPL statement to the Esper engine with a listener - * to respond to the Statement. - */ - private EPStatement addStatement(String epl, UpdateListener listener) { - // log.debug("Adding statement: {}", epl); + /** + * Add a single EPL statement to the Esper engine with a listener + * to respond to the Statement. + */ + private EPStatement addStatement(String epl, UpdateListener listener) { + // log.debug("Adding statement: {}", epl); - EPStatement statement = engine.getEPAdministrator().createEPL(epl); + EPStatement statement = engine.getEPAdministrator().createEPL(epl); - if (listener != null) { - statement.addListener(listener); - } + if (listener != null) { + statement.addListener(listener); + } - return statement; - } + return statement; + } - /** - * Add a single EPL statement to the Esper engine with a listener - * to respond to the Statement. - */ - private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) { - // log.debug("Adding statement aware statement: {}", epl); + /** + * Add a single EPL statement to the Esper engine with a listener + * to respond to the Statement. + */ + private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) { + // log.debug("Adding statement aware statement: {}", epl); - EPStatement statement = engine.getEPAdministrator().createEPL(epl); + EPStatement statement = engine.getEPAdministrator().createEPL(epl); - if (listener != null) { - statement.addListener(listener); - } + if (listener != null) { + statement.addListener(listener); + } - return statement; - } + return statement; + } - /** - * Send a single TickEvent to Esper. - */ - public void process(TickEvent tick) { - process(new CurrentTimeEvent(tick.getTime().getMillis())); - engine.getEPRuntime().sendEvent(tick); - } + /** + * Send a single TickEvent to Esper. + */ + public void process(TickEvent tick) { + process(new CurrentTimeEvent(tick.getTime().getMillis())); + engine.getEPRuntime().sendEvent(tick); + } - /** - * Update Esper's internal clock. - */ - public void process(CurrentTimeEvent timeEvent) { - engine.getEPRuntime().sendEvent(timeEvent); - } + /** + * Update Esper's internal clock. + */ + public void process(CurrentTimeEvent timeEvent) { + engine.getEPRuntime().sendEvent(timeEvent); + } }