diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 7cdd7a5..01ff8b8 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -69,7 +69,7 @@ create variant schema OHLCStream as OHLCEvent -- TickEvent.time ("time") as the source of the timestamp, and uses -- TickEvent.midDouble() as the value to use in the OHLC calculation. insert into OHLCStream - select * from TickEvent#OHLC(OHLCInterval, time, midDouble) + select * from TickEvent#OHLC("OHLC", "time", OHLCInterval, time, midDouble) -- diff --git a/src/main/java/ats/plugin/CandlestickCalc.java b/src/main/java/ats/plugin/CandlestickCalc.java new file mode 100644 index 0000000..e9ee70d --- /dev/null +++ b/src/main/java/ats/plugin/CandlestickCalc.java @@ -0,0 +1,24 @@ +package ats.plugin; + +/** + * CandlestickCalc embodies a method of calculating candlestick + * values. For example, OHLC or Heikin-Ashi. + */ +interface CandlestickCalc { + + /** + * Reset the calculation for the beginning of an interval. + */ + void reset(); + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + void applyValue(double value); + + /** + * Return the values calculated since the last reset point. + */ + OHLCValues getValues(); +} diff --git a/src/main/java/ats/plugin/DurationWindow.java b/src/main/java/ats/plugin/DurationWindow.java new file mode 100644 index 0000000..b399929 --- /dev/null +++ b/src/main/java/ats/plugin/DurationWindow.java @@ -0,0 +1,194 @@ +package ats.plugin; + +import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; +import com.espertech.esper.core.service.EPStatementHandleCallback; +import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; +import com.espertech.esper.epl.expression.core.ExprNode; +import com.espertech.esper.schedule.ScheduleHandleCallback; +import com.espertech.esper.schedule.SchedulingService; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DurationWindow TODO + */ +class DurationWindow { + private static final Logger log = LoggerFactory.getLogger(DurationWindow.class); + private static final int LATE_EVENT_SLACK_SECONDS = 5; + private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() + .appendDays().appendSuffix("d") + .appendHours().appendSuffix("h") + .appendMinutes().appendSuffix("m") + .appendSeconds().appendSuffix("s") + .toFormatter(); + private final AgentInstanceViewFactoryChainContext agentContext; + private final long scheduleSlot; + private EPStatementHandleCallback handle; + private final Duration interval; + private OHLCPlugInView plugin; + private CandlestickCalc calc; + private DateTime windowStartTime; + private DateTime windowEndTime; + private DateTime lastStartTime; + + + public DurationWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) + { + this.plugin = plugin; + this.calc = calc; + agentContext = context; + scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); + interval = parseInterval(intervalExpression); + } + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value) { + ensureWindow(timestamp); + calc.applyValue(value); + } + + /** + * Return the time period specified by the given expression value. + */ + private Duration parseInterval(ExprNode interval) { + ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); + String intervalStr = (String)evaluator.evaluate(null, true, agentContext); + return parseInterval(intervalStr); + } + + /** + * Return the time period specified by the given string. + */ + public static Duration parseInterval(String interval) { + interval = interval.replaceAll("\\s+",""); + return periodFormatter.parsePeriod(interval).toStandardDuration(); + } + + /** + * Make sure our window times are set up and current. + */ + public void ensureWindow(DateTime timestamp) { + if (timestamp == null) return; + + if (windowStartTime == null) { + // create open window + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + + if (!inWindow(timestamp)) { + // past current window. + // post and create a new one. + postData(); + + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + } + + public static DateTime makeWindowStartTime(DateTime timestamp, + Duration interval) + { + DateTime today = timestamp.withTimeAtStartOfDay(); + // log.info("Timestamp is {}", timestamp); + // log.info("Day start is {}", today); + + Duration intoToday = new Duration(today, timestamp); + + // calc how far into the current window we are + long intoPeriod = intoToday.getMillis() % interval.getMillis(); + + return timestamp.minus(intoPeriod); + } + + private static DateTime makeWindowEndTime(DateTime startTime, + Duration interval) + { + if (startTime == null || interval == null) return null; + + return startTime.plus(interval.getMillis()); + } + + /** + * Return true if the timestamp is within the current time window. + */ + private boolean inWindow(DateTime timestamp) { + if (timestamp == null) return false; + + return timestamp.compareTo(windowStartTime) >= 0 && + timestamp.compareTo(windowEndTime) < 0; + } + + /** + * Convert a bare long value to a proper DateTime entity. Assumes + * UTC time zone. + */ + public static DateTime toDateTime(long l) { + return new DateTime(l, DateTimeZone.UTC); + } + + /** + * Set up a callback to post an event when our time window expires. + */ + private void scheduleCallback(DateTime endTime) { + SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); + if (handle != null) { + // remove old schedule + // log.info("Removing old callback"); + sched.remove(handle, scheduleSlot); + handle = null; + } + + DateTime currentTime = toDateTime(sched.getTime()); + DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); + long callbackTime = targetTime.getMillis() - currentTime.getMillis(); + + ScheduleHandleCallback callback = new ScheduleHandleCallback() { + public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { + handle = null; // clear out schedule handle + // log.info("Callback running"); + DurationWindow.this.postData(); + } + }; + + handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); + sched.add(callbackTime, handle, scheduleSlot); + // log.info("Scheduled callback for {}", callbackTime); + } + + /** + * Update listeners with our new value. + */ + private void postData() { + if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { + log.warn("DUP START TIME"); + } + + // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); + + OHLCValues values = calc.getValues(); + if (values == null) { + // log.info("No data to post"); + return; + } + + plugin.postEvent(new OHLCEvent(windowStartTime, values)); + + calc.reset(); + lastStartTime = windowStartTime; + } +} diff --git a/src/main/java/ats/plugin/OHLCCandlestickCalc.java b/src/main/java/ats/plugin/OHLCCandlestickCalc.java new file mode 100644 index 0000000..b679083 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCCandlestickCalc.java @@ -0,0 +1,56 @@ +package ats.plugin; + +/** + * OHLCCandlestickCalc calculates values for OHLC. + */ +class OHLCCandlestickCalc implements CandlestickCalc { + private Double open; + private Double close; + private Double high; + private Double low; + + + /** + * Reset the calculation for the beginning of an interval. + */ + @Override + public void reset() { + open = null; + close = null; + high = null; + low = null; + } + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + @Override + public void applyValue(double value) { + if (open == null) { + open = value; + } + + close = value; + + if (low == null) { + low = value; + } else if (low.compareTo(value) > 0) { + low = value; + } + + if (high == null) { + high = value; + } else if (high.compareTo(value) < 0) { + high = value; + } + } + + /** + * Return our calculated values up to now. + */ + @Override + public OHLCValues getValues() { + return new OHLCValues(open, high, low, close); + } +} diff --git a/src/main/java/ats/plugin/OHLCEvent.java b/src/main/java/ats/plugin/OHLCEvent.java index 99eaf17..99ed6f8 100644 --- a/src/main/java/ats/plugin/OHLCEvent.java +++ b/src/main/java/ats/plugin/OHLCEvent.java @@ -13,6 +13,10 @@ public class OHLCEvent { private double close; + public OHLCEvent(DateTime time, OHLCValues values) { + this(time, values.open, values.high, values.low, values.close); + } + public OHLCEvent(DateTime time, double open, double high, double low, double close) diff --git a/src/main/java/ats/plugin/OHLCPlugInView.java b/src/main/java/ats/plugin/OHLCPlugInView.java index 8f55cae..df288c2 100644 --- a/src/main/java/ats/plugin/OHLCPlugInView.java +++ b/src/main/java/ats/plugin/OHLCPlugInView.java @@ -1,92 +1,46 @@ package ats.plugin; +import java.util.Iterator; + import com.espertech.esper.client.EventBean; import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; -import com.espertech.esper.core.service.EPStatementHandleCallback; -import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; import com.espertech.esper.epl.expression.core.ExprNode; import com.espertech.esper.event.EventAdapterService; -import com.espertech.esper.schedule.ScheduleHandleCallback; import com.espertech.esper.view.ViewSupport; -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.Iterator; + +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.Period; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import com.espertech.esper.epl.expression.core.ExprEvaluator; -import com.espertech.esper.schedule.SchedulingService; -import org.joda.time.Duration; -import org.joda.time.format.ISODateTimeFormat; -import java.util.TimeZone; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.Instant; -import org.joda.time.PeriodType; /** * OHLCPlugInView computes OHLC bars for a given time interval. */ public class OHLCPlugInView extends ViewSupport { private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class); - private static final int LATE_EVENT_SLACK_SECONDS = 5; - private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() - .appendDays().appendSuffix("d") - .appendHours().appendSuffix("h") - .appendMinutes().appendSuffix("m") - .appendSeconds().appendSuffix("s") - .toFormatter(); - - private final AgentInstanceViewFactoryChainContext agentContext; - private final long scheduleSlot; - private EPStatementHandleCallback handle; - - private final Duration interval; + private final AgentInstanceViewFactoryChainContext context; private final ExprNode timestampExpression; private final ExprNode valueExpression; - - private DateTime windowStartTime; - private DateTime windowEndTime; - private Double open; - private Double close; - private Double high; - private Double low; + private DurationWindow window; + private EventAdapterService service; private EventBean[] lastData; public OHLCPlugInView(AgentInstanceViewFactoryChainContext context, + ExprNode calcTypeExpression, + ExprNode timeTypeExpression, ExprNode intervalExpression, ExprNode timestampExpression, ExprNode valueExpression) { - agentContext = context; - scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); - - interval = parseInterval(intervalExpression); - // log.info("Interval is {}", interval); - + this.context = context; this.timestampExpression = timestampExpression; this.valueExpression = valueExpression; - } + service = context.getStatementContext().getEventAdapterService(); - /** - * Return the time period specified by the given expression value. - */ - private Duration parseInterval(ExprNode interval) { - ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); - String intervalStr = (String)evaluator.evaluate(null, true, agentContext); - return parseInterval(intervalStr); - } - - /** - * Return the time period specified by the given string. - */ - public static Duration parseInterval(String interval) { - interval = interval.replaceAll("\\s+",""); - return periodFormatter.parsePeriod(interval).toStandardDuration(); + CandlestickCalc calc = new OHLCCandlestickCalc(); + window = new DurationWindow(this, calc, context, intervalExpression); } /** @@ -94,15 +48,7 @@ public class OHLCPlugInView extends ViewSupport { */ private DateTime getTimestamp(EventBean event) { ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator(); - return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext); - } - - /** - * Convert a bare long value to a proper DateTime entity. Assumes - * UTC time zone. - */ - public static DateTime toDateTime(long l) { - return new DateTime(l, DateTimeZone.UTC); + return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -110,7 +56,7 @@ public class OHLCPlugInView extends ViewSupport { */ private double getValue(EventBean event) { ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator(); - return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext); + return (double)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -123,150 +69,17 @@ public class OHLCPlugInView extends ViewSupport { for (EventBean event : newData) { DateTime timestamp = getTimestamp(event); double value = getValue(event); - - ensureWindow(timestamp); - applyValue(value); + window.update(timestamp, value); } } /** - * Make sure our window times are set up and current. + * Send an event to all plugin listeners. */ - private void ensureWindow(DateTime timestamp) { - if (timestamp == null) return; - - if (windowStartTime == null) { - // create open window - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - - if (!inWindow(timestamp)) { - // past current window. - // post and create a new one. - postData(); - - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - } - - public static DateTime makeWindowStartTime(DateTime timestamp, - Duration interval) - { - DateTime today = timestamp.withTimeAtStartOfDay(); - // log.info("Timestamp is {}", timestamp); - // log.info("Day start is {}", today); - - Duration intoToday = new Duration(today, timestamp); - - // calc how far into the current window we are - long intoPeriod = intoToday.getMillis() % interval.getMillis(); - - return timestamp.minus(intoPeriod); - } - - private static DateTime makeWindowEndTime(DateTime startTime, - Duration interval) - { - if (startTime == null || interval == null) return null; - - return startTime.plus(interval.getMillis()); - } - - /** - * Return true if the timestamp is within the current time window. - */ - private boolean inWindow(DateTime timestamp) { - if (timestamp == null) return false; - - return timestamp.compareTo(windowStartTime) >= 0 && - timestamp.compareTo(windowEndTime) < 0; - } - - private void applyValue(double value) { - if (open == null) { - open = value; - } - - close = value; - - if (low == null) { - low = value; - } else if (low.compareTo(value) > 0) { - low = value; - } - - if (high == null) { - high = value; - } else if (high.compareTo(value) < 0) { - high = value; - } - } - - /** - * Set up a callback to post an event when our time window expires. - */ - private void scheduleCallback(DateTime endTime) { - SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); - if (handle != null) { - // remove old schedule - // log.info("Removing old callback"); - sched.remove(handle, scheduleSlot); - handle = null; - } - - DateTime currentTime = toDateTime(sched.getTime()); - DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); - long callbackTime = targetTime.getMillis() - currentTime.getMillis(); - - ScheduleHandleCallback callback = new ScheduleHandleCallback() { - public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { - handle = null; // clear out schedule handle - // log.info("Callback running"); - OHLCPlugInView.this.postData(); - } - }; - - handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); - sched.add(callbackTime, handle, scheduleSlot); - // log.info("Scheduled callback for {}", callbackTime); - } - - DateTime lastStartTime; - - /** - * Update listeners with our new value. - */ - private void postData() { - if (open == null) { - // log.info("No data to post"); - return; - } - - if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { - log.warn("DUP START TIME"); - } - - // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); - - OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close); - - // send - EventAdapterService service = agentContext.getStatementContext().getEventAdapterService(); - EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)}; - updateChildren(newBeans, lastData); - - // reset for next post - lastData = newBeans; - open = null; - close = null; - high = null; - low = null; - - lastStartTime = windowStartTime; + public void postEvent(OHLCEvent event) { + EventBean[] newData = new EventBean[] {service.adapterForBean(event)}; + updateChildren(newData, lastData); + lastData = newData; } // @@ -279,11 +92,10 @@ public class OHLCPlugInView extends ViewSupport { */ @Override public EventType getEventType() { - return getEventType(agentContext.getStatementContext().getEventAdapterService()); + return getEventType(service); } - protected static EventType getEventType(EventAdapterService service) - { + protected static EventType getEventType(EventAdapterService service) { return service.addBeanType(OHLCEvent.class.getName(), OHLCEvent.class, false, false, false); diff --git a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java index 30c5c6e..8aa3001 100644 --- a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java +++ b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java @@ -1,5 +1,8 @@ package ats.plugin; +import java.math.BigDecimal; +import java.util.List; + import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; import com.espertech.esper.core.service.StatementContext; @@ -10,9 +13,7 @@ import com.espertech.esper.view.ViewFactory; import com.espertech.esper.view.ViewFactoryContext; import com.espertech.esper.view.ViewFactorySupport; import com.espertech.esper.view.ViewParameterException; -import java.math.BigDecimal; -import java.util.Date; -import java.util.List; + import org.joda.time.DateTime; /** @@ -21,6 +22,8 @@ import org.joda.time.DateTime; public class OHLCPlugInViewFactory extends ViewFactorySupport { private EventAdapterService eventAdapterService; private List params; + private ExprNode calcType; + private ExprNode timeType; private ExprNode interval; private ExprNode timestamp; private ExprNode value; @@ -30,13 +33,13 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Pass in EPL query view params. */ public void setViewParameters(ViewFactoryContext context, - List params) + List params) throws ViewParameterException { eventAdapterService = context.getEventAdapterService(); - if (params.size() != 3) { - throw new ViewParameterException("OHLC view takes three parameters: time interval, timestamp expression, and mid price expression."); + if (params.size() != 5) { + throw new ViewParameterException("OHLC view takes five parameters: calulation type (\"OHLC\" or \"Heikin-Ashi\"), batching type (\"time\" or \"ticks\"), time interval or tick count, timestamp expression, and mid price expression."); } this.params = params; } @@ -47,9 +50,9 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * type for resulting views. */ public void attach(EventType parentEventType, - StatementContext statementContext, - ViewFactory optionalParentFactory, - List parentViewFactories) + StatementContext statementContext, + ViewFactory optionalParentFactory, + List parentViewFactories) throws ViewParameterException { ExprNode[] validatedNodes = ViewFactorySupport.validate(getViewName(), @@ -57,20 +60,34 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { statementContext, params, true); - interval = validatedNodes[0]; - timestamp = validatedNodes[1]; - value = validatedNodes[2]; + calcType = validatedNodes[0]; + timeType = validatedNodes[1]; + interval = validatedNodes[2]; + timestamp = validatedNodes[3]; + value = validatedNodes[4]; + + Class calcTypeClass = calcType.getForge().getEvaluationType(); + if ((calcTypeClass != String.class)) + { + throw new ViewParameterException("OHLC view needs a String-typed calc type value for parameter 1"); + } + + Class timeTypeClass = timeType.getForge().getEvaluationType(); + if ((timeTypeClass != String.class)) + { + throw new ViewParameterException("OHLC view needs a String-typed time type value for parameter 2"); + } Class intervalClass = interval.getForge().getEvaluationType(); if ((intervalClass != String.class)) { - throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 1."); + throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 3"); } Class timestampClass = timestamp.getForge().getEvaluationType(); if ((timestampClass != DateTime.class)) { - throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 2"); + throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 4"); } Class valueClass = value.getForge().getEvaluationType(); @@ -78,7 +95,7 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { (valueClass != Double.class) && (valueClass != BigDecimal.class)) { - throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 3"); + throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 5"); } } @@ -86,7 +103,8 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Create a new view using already-passed context and params. */ public View makeView(AgentInstanceViewFactoryChainContext agentContext) { - return new OHLCPlugInView(agentContext, interval, timestamp, value); + return new OHLCPlugInView(agentContext, calcType, timeType, + interval, timestamp, value); } /** diff --git a/src/main/java/ats/plugin/OHLCValues.java b/src/main/java/ats/plugin/OHLCValues.java new file mode 100644 index 0000000..5a4a629 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCValues.java @@ -0,0 +1,22 @@ +package ats.plugin; + +/** + * OHLCValues is a struct for storing open, high, low, and close + * values. + */ +public class OHLCValues { + public double open; + public double high; + public double low; + public double close; + + + public OHLCValues(double open, double high, + double low, double close) + { + this.open = open; + this.high = high; + this.low = low; + this.close = close; + } +}