import java.util.Arrays; import java.util.stream.Collectors; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.StatementAwareUpdateListener; import com.espertech.esper.client.UpdateListener; import com.espertech.esper.client.time.CurrentTimeEvent; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ats.orders.MarketOrderRequest; import ats.plugin.OHLCEvent; import ats.plugin.OHLCPlugInViewFactory; public class EsperProcessor implements TickProcessor { final Logger log = LoggerFactory.getLogger(EsperProcessor.class); EPServiceProvider engine; TradingManager trader; public EsperProcessor(String epl, TradingManager trader) { this.trader = trader; Configuration config = new Configuration(); // disable esper's internal clock. we use tick data for our // sense of time. config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); // register event types defined in java classes config.addEventType(TickEvent.class); config.addEventType(OHLCEvent.class); config.addEventType(DateTime.class); // add OHLC plugin config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); engine = EPServiceProviderManager.getDefaultProvider(config); addStatements(epl); // addLogStatement("TickEvent"); addLogStreamHandler(); // respond to long entry events addStatement("select * from LongEntryDistinct", (newData, oldData) -> { String instrument = (String)newData[0].get("instrument"); Integer units = (Integer)newData[0].get("units"); trader.placeOrder(new MarketOrderRequest(instrument, units)); log.debug("Long entry triggered: {} of {} at price {} at time {}", units, instrument, newData[0].get("current"), newData[0].get("time")); }); } /** * Split a (possibly) multiline string of EPL statements into one * statement per string. Statements are separated by at least one * blank line. */ private static String[] splitStatements(String str) { return str.split("(\\r?\\n){2,}"); } /** * Split a string into individual lines. */ private static String[] splitLines(String str) { return str.split("\\r?\\n"); } /** * Return true if the given string is a comment. */ private static boolean isComment(String str) { return str.matches("^\\s*//.*") || str.matches("^\\s*--.*"); } /** * Return true if the given string is only whitespace. */ private static boolean isEmpty(String str) { return str.matches("^\\s*$"); } /** * Remove any inline epl comment from a given line. */ private static String removeInlineComment(String s) { return s.replaceAll("--.*$", ""); } /** * Remove comment lines from the string. */ private static String filterComments(String str) { return Arrays.stream(splitLines(str)) .filter(s -> !isComment(s)) .map(s -> removeInlineComment(s)) .collect(Collectors.joining("\n")); } /** * Given a string with (possibly) multiple statements, split and * add individually. */ private void addStatements(String epl) { for (String s : splitStatements(filterComments(epl))) { if (!isEmpty(s)) addStatement(s); } } /** * Add a single EPL statement to the Esper engine. */ private EPStatement addStatement(String epl) { return addStatement(epl, (UpdateListener)null); } /** * Add a single EPL statement to the Esper engine with a listener * to respond to the Statement. */ private EPStatement addStatement(String epl, UpdateListener listener) { // log.debug("Adding statement: {}", epl); EPStatement statement = engine.getEPAdministrator().createEPL(epl); if (listener != null) { statement.addListener(listener); } return statement; } /** * Add a simple EPL statement to the Esper engine that logs any * event that matches the event name. */ private EPStatement addLogStreamHandler() { return addStatement("select * from LogStream", (newData, oldData) -> { log.debug("{}: {}", newData[0].get("stream"), newData[0].get("event")); }); } /** * Add a simple EPL statement to the Esper engine that logs any * event that matches the event name. */ private EPStatement addLogStatement(String name) { return addStatement("select * from " + name, (newData, oldData) -> { log.debug("{}: {}", name, newData[0].getUnderlying()); }); } /** * Add a single EPL statement to the Esper engine with a listener * to respond to the Statement. */ private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) { // log.debug("Adding statement aware statement: {}", epl); EPStatement statement = engine.getEPAdministrator().createEPL(epl); if (listener != null) { statement.addListener(listener); } return statement; } /** * Send a single TickEvent to Esper. */ public void process(TickEvent tick) { process(new CurrentTimeEvent(tick.getTime().getMillis())); engine.getEPRuntime().sendEvent(tick); } /** * Update Esper's internal clock. */ public void process(CurrentTimeEvent timeEvent) { engine.getEPRuntime().sendEvent(timeEvent); } }