import ats.plugin.OHLCPlugInViewFactory; import ats.plugin.OHLCTick; import ats.plugin.OHLCUpdateListener; import ats.plugin.OHLCValue; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.StatementAwareUpdateListener; import com.espertech.esper.client.UpdateListener; import com.espertech.esper.client.time.CurrentTimeEvent; import java.io.File; import java.lang.reflect.Array; import java.math.BigDecimal; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EsperProcessor implements TickProcessor { final Logger log = LoggerFactory.getLogger(EsperProcessor.class); EPServiceProvider engine; public EsperProcessor(String epl) { // disable esper's internal clock. we use tick data for our // sense of time. Configuration config = new Configuration(); config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); config.addEventType(TickEvent.class); config.addEventType(LongEntryEvent.class); //config.addVariable("FOO", int.class, 12); config.addEventType("OHLCTick", OHLCTick.class); config.addEventType("OHLCValue", OHLCValue.class); config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); engine = EPServiceProviderManager.getDefaultProvider(config); addStatements(epl); addStatement("select * from TickEvent", (newData, oldData) -> { log.debug("Tick: {}", newData[0].getUnderlying()); }); addStatement("select * from OHLCStream", (newData, oldData) -> { log.debug("OLHC: {}", newData[0].getUnderlying()); // DateTime t = (DateTime)newData[0].get("date"); // double f = (double)newData[0].get("first"); // double l = (double)newData[0].get("last"); // double x = (double)newData[0].get("max"); // double n = (double)newData[0].get("min"); // log.info("OHLCValue: {} {} {} {}", f,l,x,n); }); // addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)", // new OHLCUpdateListener()); // addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow", // (newData, oldData) -> { // log.debug("TickTimeWindow: {}", (Object)newData); // }); // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); // statement.addListener((newData, oldData) -> { // log.debug("event: {}", (Object)newData); // }); // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); // statement.addListener((newData, oldData) -> { // log.debug("event: {}", (Object)newData); // }); // addStatement("select count(*) as count from TickEvent#time(4 sec)", // (newData, oldData) -> { // long count = (long)newData[0].get("count"); // log.info("Tick Count: {}", count); // }); } /** * Split a (possibly) multiline string of EPL statements into one * statement per string. Statements are separated by at least one * blank line. */ private static String[] splitStatements(String str) { return str.split("(\\r?\\n){2,}"); } /** * Split a string into individual lines. */ private static String[] splitLines(String str) { return str.split("\\r?\\n"); } /** * Return true if the given string is a comment. */ private static boolean isComment(String str) { return str.startsWith("//") || str.startsWith("--"); } /** * Return true if the given string is only whitespace. */ private static boolean isEmpty(String str) { return str.matches("^\\s*$"); } /** * Remove comment lines from the string. */ private static String filterComments(String str) { return Arrays.stream(splitLines(str)) .filter(s -> !isComment(s)) .collect(Collectors.joining("\n")); } /** * Given a string with (possibly) multiple statements, split and * add individually. */ private void addStatements(String epl) { for (String s : splitStatements(filterComments(epl))) { if (!isEmpty(s)) addStatement(s); } } /** * Add a single EPL statement to the Esper engine. */ private EPStatement addStatement(String epl) { return addStatement(epl, (UpdateListener)null); } /** * Add a single EPL statement to the Esper engine with a listener * to respond to the Statement. */ private EPStatement addStatement(String epl, UpdateListener listener) { // log.debug("Adding statement: {}", epl); EPStatement statement = engine.getEPAdministrator().createEPL(epl); if (listener != null) { statement.addListener(listener); } return statement; } /** * Add a single EPL statement to the Esper engine with a listener * to respond to the Statement. */ private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) { // log.debug("Adding statement aware statement: {}", epl); EPStatement statement = engine.getEPAdministrator().createEPL(epl); if (listener != null) { statement.addListener(listener); } return statement; } /** * Send a single TickEvent to Esper. */ public void process(TickEvent tick) { process(new CurrentTimeEvent(tick.getTime().getMillis())); engine.getEPRuntime().sendEvent(tick); } /** * Update Esper's internal clock. */ public void process(CurrentTimeEvent timeEvent) { engine.getEPRuntime().sendEvent(timeEvent); } }