From 518cf23e6d31fd59dd78dc274c959073ab955ee2 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Tue, 30 Oct 2018 17:56:39 -0700 Subject: [PATCH 01/17] Move time and OHLC value calc into separate files --- epl/trading_system_1.epl | 2 +- src/main/java/ats/plugin/CandlestickCalc.java | 24 ++ src/main/java/ats/plugin/DurationWindow.java | 194 ++++++++++++++ .../java/ats/plugin/OHLCCandlestickCalc.java | 56 +++++ src/main/java/ats/plugin/OHLCEvent.java | 4 + src/main/java/ats/plugin/OHLCPlugInView.java | 236 ++---------------- .../ats/plugin/OHLCPlugInViewFactory.java | 50 ++-- src/main/java/ats/plugin/OHLCValues.java | 22 ++ 8 files changed, 359 insertions(+), 229 deletions(-) create mode 100644 src/main/java/ats/plugin/CandlestickCalc.java create mode 100644 src/main/java/ats/plugin/DurationWindow.java create mode 100644 src/main/java/ats/plugin/OHLCCandlestickCalc.java create mode 100644 src/main/java/ats/plugin/OHLCValues.java diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 7cdd7a5..01ff8b8 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -69,7 +69,7 @@ create variant schema OHLCStream as OHLCEvent -- TickEvent.time ("time") as the source of the timestamp, and uses -- TickEvent.midDouble() as the value to use in the OHLC calculation. insert into OHLCStream - select * from TickEvent#OHLC(OHLCInterval, time, midDouble) + select * from TickEvent#OHLC("OHLC", "time", OHLCInterval, time, midDouble) -- diff --git a/src/main/java/ats/plugin/CandlestickCalc.java b/src/main/java/ats/plugin/CandlestickCalc.java new file mode 100644 index 0000000..e9ee70d --- /dev/null +++ b/src/main/java/ats/plugin/CandlestickCalc.java @@ -0,0 +1,24 @@ +package ats.plugin; + +/** + * CandlestickCalc embodies a method of calculating candlestick + * values. For example, OHLC or Heikin-Ashi. + */ +interface CandlestickCalc { + + /** + * Reset the calculation for the beginning of an interval. + */ + void reset(); + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + void applyValue(double value); + + /** + * Return the values calculated since the last reset point. + */ + OHLCValues getValues(); +} diff --git a/src/main/java/ats/plugin/DurationWindow.java b/src/main/java/ats/plugin/DurationWindow.java new file mode 100644 index 0000000..b399929 --- /dev/null +++ b/src/main/java/ats/plugin/DurationWindow.java @@ -0,0 +1,194 @@ +package ats.plugin; + +import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; +import com.espertech.esper.core.service.EPStatementHandleCallback; +import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; +import com.espertech.esper.epl.expression.core.ExprNode; +import com.espertech.esper.schedule.ScheduleHandleCallback; +import com.espertech.esper.schedule.SchedulingService; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DurationWindow TODO + */ +class DurationWindow { + private static final Logger log = LoggerFactory.getLogger(DurationWindow.class); + private static final int LATE_EVENT_SLACK_SECONDS = 5; + private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() + .appendDays().appendSuffix("d") + .appendHours().appendSuffix("h") + .appendMinutes().appendSuffix("m") + .appendSeconds().appendSuffix("s") + .toFormatter(); + private final AgentInstanceViewFactoryChainContext agentContext; + private final long scheduleSlot; + private EPStatementHandleCallback handle; + private final Duration interval; + private OHLCPlugInView plugin; + private CandlestickCalc calc; + private DateTime windowStartTime; + private DateTime windowEndTime; + private DateTime lastStartTime; + + + public DurationWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) + { + this.plugin = plugin; + this.calc = calc; + agentContext = context; + scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); + interval = parseInterval(intervalExpression); + } + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value) { + ensureWindow(timestamp); + calc.applyValue(value); + } + + /** + * Return the time period specified by the given expression value. + */ + private Duration parseInterval(ExprNode interval) { + ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); + String intervalStr = (String)evaluator.evaluate(null, true, agentContext); + return parseInterval(intervalStr); + } + + /** + * Return the time period specified by the given string. + */ + public static Duration parseInterval(String interval) { + interval = interval.replaceAll("\\s+",""); + return periodFormatter.parsePeriod(interval).toStandardDuration(); + } + + /** + * Make sure our window times are set up and current. + */ + public void ensureWindow(DateTime timestamp) { + if (timestamp == null) return; + + if (windowStartTime == null) { + // create open window + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + + if (!inWindow(timestamp)) { + // past current window. + // post and create a new one. + postData(); + + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + } + + public static DateTime makeWindowStartTime(DateTime timestamp, + Duration interval) + { + DateTime today = timestamp.withTimeAtStartOfDay(); + // log.info("Timestamp is {}", timestamp); + // log.info("Day start is {}", today); + + Duration intoToday = new Duration(today, timestamp); + + // calc how far into the current window we are + long intoPeriod = intoToday.getMillis() % interval.getMillis(); + + return timestamp.minus(intoPeriod); + } + + private static DateTime makeWindowEndTime(DateTime startTime, + Duration interval) + { + if (startTime == null || interval == null) return null; + + return startTime.plus(interval.getMillis()); + } + + /** + * Return true if the timestamp is within the current time window. + */ + private boolean inWindow(DateTime timestamp) { + if (timestamp == null) return false; + + return timestamp.compareTo(windowStartTime) >= 0 && + timestamp.compareTo(windowEndTime) < 0; + } + + /** + * Convert a bare long value to a proper DateTime entity. Assumes + * UTC time zone. + */ + public static DateTime toDateTime(long l) { + return new DateTime(l, DateTimeZone.UTC); + } + + /** + * Set up a callback to post an event when our time window expires. + */ + private void scheduleCallback(DateTime endTime) { + SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); + if (handle != null) { + // remove old schedule + // log.info("Removing old callback"); + sched.remove(handle, scheduleSlot); + handle = null; + } + + DateTime currentTime = toDateTime(sched.getTime()); + DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); + long callbackTime = targetTime.getMillis() - currentTime.getMillis(); + + ScheduleHandleCallback callback = new ScheduleHandleCallback() { + public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { + handle = null; // clear out schedule handle + // log.info("Callback running"); + DurationWindow.this.postData(); + } + }; + + handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); + sched.add(callbackTime, handle, scheduleSlot); + // log.info("Scheduled callback for {}", callbackTime); + } + + /** + * Update listeners with our new value. + */ + private void postData() { + if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { + log.warn("DUP START TIME"); + } + + // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); + + OHLCValues values = calc.getValues(); + if (values == null) { + // log.info("No data to post"); + return; + } + + plugin.postEvent(new OHLCEvent(windowStartTime, values)); + + calc.reset(); + lastStartTime = windowStartTime; + } +} diff --git a/src/main/java/ats/plugin/OHLCCandlestickCalc.java b/src/main/java/ats/plugin/OHLCCandlestickCalc.java new file mode 100644 index 0000000..b679083 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCCandlestickCalc.java @@ -0,0 +1,56 @@ +package ats.plugin; + +/** + * OHLCCandlestickCalc calculates values for OHLC. + */ +class OHLCCandlestickCalc implements CandlestickCalc { + private Double open; + private Double close; + private Double high; + private Double low; + + + /** + * Reset the calculation for the beginning of an interval. + */ + @Override + public void reset() { + open = null; + close = null; + high = null; + low = null; + } + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + @Override + public void applyValue(double value) { + if (open == null) { + open = value; + } + + close = value; + + if (low == null) { + low = value; + } else if (low.compareTo(value) > 0) { + low = value; + } + + if (high == null) { + high = value; + } else if (high.compareTo(value) < 0) { + high = value; + } + } + + /** + * Return our calculated values up to now. + */ + @Override + public OHLCValues getValues() { + return new OHLCValues(open, high, low, close); + } +} diff --git a/src/main/java/ats/plugin/OHLCEvent.java b/src/main/java/ats/plugin/OHLCEvent.java index 99eaf17..99ed6f8 100644 --- a/src/main/java/ats/plugin/OHLCEvent.java +++ b/src/main/java/ats/plugin/OHLCEvent.java @@ -13,6 +13,10 @@ public class OHLCEvent { private double close; + public OHLCEvent(DateTime time, OHLCValues values) { + this(time, values.open, values.high, values.low, values.close); + } + public OHLCEvent(DateTime time, double open, double high, double low, double close) diff --git a/src/main/java/ats/plugin/OHLCPlugInView.java b/src/main/java/ats/plugin/OHLCPlugInView.java index 8f55cae..df288c2 100644 --- a/src/main/java/ats/plugin/OHLCPlugInView.java +++ b/src/main/java/ats/plugin/OHLCPlugInView.java @@ -1,92 +1,46 @@ package ats.plugin; +import java.util.Iterator; + import com.espertech.esper.client.EventBean; import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; -import com.espertech.esper.core.service.EPStatementHandleCallback; -import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; import com.espertech.esper.epl.expression.core.ExprNode; import com.espertech.esper.event.EventAdapterService; -import com.espertech.esper.schedule.ScheduleHandleCallback; import com.espertech.esper.view.ViewSupport; -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.Iterator; + +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.Period; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import com.espertech.esper.epl.expression.core.ExprEvaluator; -import com.espertech.esper.schedule.SchedulingService; -import org.joda.time.Duration; -import org.joda.time.format.ISODateTimeFormat; -import java.util.TimeZone; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.Instant; -import org.joda.time.PeriodType; /** * OHLCPlugInView computes OHLC bars for a given time interval. */ public class OHLCPlugInView extends ViewSupport { private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class); - private static final int LATE_EVENT_SLACK_SECONDS = 5; - private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() - .appendDays().appendSuffix("d") - .appendHours().appendSuffix("h") - .appendMinutes().appendSuffix("m") - .appendSeconds().appendSuffix("s") - .toFormatter(); - - private final AgentInstanceViewFactoryChainContext agentContext; - private final long scheduleSlot; - private EPStatementHandleCallback handle; - - private final Duration interval; + private final AgentInstanceViewFactoryChainContext context; private final ExprNode timestampExpression; private final ExprNode valueExpression; - - private DateTime windowStartTime; - private DateTime windowEndTime; - private Double open; - private Double close; - private Double high; - private Double low; + private DurationWindow window; + private EventAdapterService service; private EventBean[] lastData; public OHLCPlugInView(AgentInstanceViewFactoryChainContext context, + ExprNode calcTypeExpression, + ExprNode timeTypeExpression, ExprNode intervalExpression, ExprNode timestampExpression, ExprNode valueExpression) { - agentContext = context; - scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); - - interval = parseInterval(intervalExpression); - // log.info("Interval is {}", interval); - + this.context = context; this.timestampExpression = timestampExpression; this.valueExpression = valueExpression; - } + service = context.getStatementContext().getEventAdapterService(); - /** - * Return the time period specified by the given expression value. - */ - private Duration parseInterval(ExprNode interval) { - ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); - String intervalStr = (String)evaluator.evaluate(null, true, agentContext); - return parseInterval(intervalStr); - } - - /** - * Return the time period specified by the given string. - */ - public static Duration parseInterval(String interval) { - interval = interval.replaceAll("\\s+",""); - return periodFormatter.parsePeriod(interval).toStandardDuration(); + CandlestickCalc calc = new OHLCCandlestickCalc(); + window = new DurationWindow(this, calc, context, intervalExpression); } /** @@ -94,15 +48,7 @@ public class OHLCPlugInView extends ViewSupport { */ private DateTime getTimestamp(EventBean event) { ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator(); - return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext); - } - - /** - * Convert a bare long value to a proper DateTime entity. Assumes - * UTC time zone. - */ - public static DateTime toDateTime(long l) { - return new DateTime(l, DateTimeZone.UTC); + return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -110,7 +56,7 @@ public class OHLCPlugInView extends ViewSupport { */ private double getValue(EventBean event) { ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator(); - return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext); + return (double)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -123,150 +69,17 @@ public class OHLCPlugInView extends ViewSupport { for (EventBean event : newData) { DateTime timestamp = getTimestamp(event); double value = getValue(event); - - ensureWindow(timestamp); - applyValue(value); + window.update(timestamp, value); } } /** - * Make sure our window times are set up and current. + * Send an event to all plugin listeners. */ - private void ensureWindow(DateTime timestamp) { - if (timestamp == null) return; - - if (windowStartTime == null) { - // create open window - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - - if (!inWindow(timestamp)) { - // past current window. - // post and create a new one. - postData(); - - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - } - - public static DateTime makeWindowStartTime(DateTime timestamp, - Duration interval) - { - DateTime today = timestamp.withTimeAtStartOfDay(); - // log.info("Timestamp is {}", timestamp); - // log.info("Day start is {}", today); - - Duration intoToday = new Duration(today, timestamp); - - // calc how far into the current window we are - long intoPeriod = intoToday.getMillis() % interval.getMillis(); - - return timestamp.minus(intoPeriod); - } - - private static DateTime makeWindowEndTime(DateTime startTime, - Duration interval) - { - if (startTime == null || interval == null) return null; - - return startTime.plus(interval.getMillis()); - } - - /** - * Return true if the timestamp is within the current time window. - */ - private boolean inWindow(DateTime timestamp) { - if (timestamp == null) return false; - - return timestamp.compareTo(windowStartTime) >= 0 && - timestamp.compareTo(windowEndTime) < 0; - } - - private void applyValue(double value) { - if (open == null) { - open = value; - } - - close = value; - - if (low == null) { - low = value; - } else if (low.compareTo(value) > 0) { - low = value; - } - - if (high == null) { - high = value; - } else if (high.compareTo(value) < 0) { - high = value; - } - } - - /** - * Set up a callback to post an event when our time window expires. - */ - private void scheduleCallback(DateTime endTime) { - SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); - if (handle != null) { - // remove old schedule - // log.info("Removing old callback"); - sched.remove(handle, scheduleSlot); - handle = null; - } - - DateTime currentTime = toDateTime(sched.getTime()); - DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); - long callbackTime = targetTime.getMillis() - currentTime.getMillis(); - - ScheduleHandleCallback callback = new ScheduleHandleCallback() { - public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { - handle = null; // clear out schedule handle - // log.info("Callback running"); - OHLCPlugInView.this.postData(); - } - }; - - handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); - sched.add(callbackTime, handle, scheduleSlot); - // log.info("Scheduled callback for {}", callbackTime); - } - - DateTime lastStartTime; - - /** - * Update listeners with our new value. - */ - private void postData() { - if (open == null) { - // log.info("No data to post"); - return; - } - - if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { - log.warn("DUP START TIME"); - } - - // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); - - OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close); - - // send - EventAdapterService service = agentContext.getStatementContext().getEventAdapterService(); - EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)}; - updateChildren(newBeans, lastData); - - // reset for next post - lastData = newBeans; - open = null; - close = null; - high = null; - low = null; - - lastStartTime = windowStartTime; + public void postEvent(OHLCEvent event) { + EventBean[] newData = new EventBean[] {service.adapterForBean(event)}; + updateChildren(newData, lastData); + lastData = newData; } // @@ -279,11 +92,10 @@ public class OHLCPlugInView extends ViewSupport { */ @Override public EventType getEventType() { - return getEventType(agentContext.getStatementContext().getEventAdapterService()); + return getEventType(service); } - protected static EventType getEventType(EventAdapterService service) - { + protected static EventType getEventType(EventAdapterService service) { return service.addBeanType(OHLCEvent.class.getName(), OHLCEvent.class, false, false, false); diff --git a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java index 30c5c6e..8aa3001 100644 --- a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java +++ b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java @@ -1,5 +1,8 @@ package ats.plugin; +import java.math.BigDecimal; +import java.util.List; + import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; import com.espertech.esper.core.service.StatementContext; @@ -10,9 +13,7 @@ import com.espertech.esper.view.ViewFactory; import com.espertech.esper.view.ViewFactoryContext; import com.espertech.esper.view.ViewFactorySupport; import com.espertech.esper.view.ViewParameterException; -import java.math.BigDecimal; -import java.util.Date; -import java.util.List; + import org.joda.time.DateTime; /** @@ -21,6 +22,8 @@ import org.joda.time.DateTime; public class OHLCPlugInViewFactory extends ViewFactorySupport { private EventAdapterService eventAdapterService; private List params; + private ExprNode calcType; + private ExprNode timeType; private ExprNode interval; private ExprNode timestamp; private ExprNode value; @@ -30,13 +33,13 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Pass in EPL query view params. */ public void setViewParameters(ViewFactoryContext context, - List params) + List params) throws ViewParameterException { eventAdapterService = context.getEventAdapterService(); - if (params.size() != 3) { - throw new ViewParameterException("OHLC view takes three parameters: time interval, timestamp expression, and mid price expression."); + if (params.size() != 5) { + throw new ViewParameterException("OHLC view takes five parameters: calulation type (\"OHLC\" or \"Heikin-Ashi\"), batching type (\"time\" or \"ticks\"), time interval or tick count, timestamp expression, and mid price expression."); } this.params = params; } @@ -47,9 +50,9 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * type for resulting views. */ public void attach(EventType parentEventType, - StatementContext statementContext, - ViewFactory optionalParentFactory, - List parentViewFactories) + StatementContext statementContext, + ViewFactory optionalParentFactory, + List parentViewFactories) throws ViewParameterException { ExprNode[] validatedNodes = ViewFactorySupport.validate(getViewName(), @@ -57,20 +60,34 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { statementContext, params, true); - interval = validatedNodes[0]; - timestamp = validatedNodes[1]; - value = validatedNodes[2]; + calcType = validatedNodes[0]; + timeType = validatedNodes[1]; + interval = validatedNodes[2]; + timestamp = validatedNodes[3]; + value = validatedNodes[4]; + + Class calcTypeClass = calcType.getForge().getEvaluationType(); + if ((calcTypeClass != String.class)) + { + throw new ViewParameterException("OHLC view needs a String-typed calc type value for parameter 1"); + } + + Class timeTypeClass = timeType.getForge().getEvaluationType(); + if ((timeTypeClass != String.class)) + { + throw new ViewParameterException("OHLC view needs a String-typed time type value for parameter 2"); + } Class intervalClass = interval.getForge().getEvaluationType(); if ((intervalClass != String.class)) { - throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 1."); + throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 3"); } Class timestampClass = timestamp.getForge().getEvaluationType(); if ((timestampClass != DateTime.class)) { - throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 2"); + throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 4"); } Class valueClass = value.getForge().getEvaluationType(); @@ -78,7 +95,7 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { (valueClass != Double.class) && (valueClass != BigDecimal.class)) { - throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 3"); + throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 5"); } } @@ -86,7 +103,8 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Create a new view using already-passed context and params. */ public View makeView(AgentInstanceViewFactoryChainContext agentContext) { - return new OHLCPlugInView(agentContext, interval, timestamp, value); + return new OHLCPlugInView(agentContext, calcType, timeType, + interval, timestamp, value); } /** diff --git a/src/main/java/ats/plugin/OHLCValues.java b/src/main/java/ats/plugin/OHLCValues.java new file mode 100644 index 0000000..5a4a629 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCValues.java @@ -0,0 +1,22 @@ +package ats.plugin; + +/** + * OHLCValues is a struct for storing open, high, low, and close + * values. + */ +public class OHLCValues { + public double open; + public double high; + public double low; + public double close; + + + public OHLCValues(double open, double high, + double low, double close) + { + this.open = open; + this.high = high; + this.low = low; + this.close = close; + } +} From 0d8938420554c17938e4a660d01734d6f2fa2af0 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Tue, 30 Oct 2018 18:03:08 -0700 Subject: [PATCH 02/17] DurationWindow: rename agentContext for consistency --- src/main/java/ats/plugin/DurationWindow.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/ats/plugin/DurationWindow.java b/src/main/java/ats/plugin/DurationWindow.java index b399929..523b241 100644 --- a/src/main/java/ats/plugin/DurationWindow.java +++ b/src/main/java/ats/plugin/DurationWindow.java @@ -28,7 +28,7 @@ class DurationWindow { .appendMinutes().appendSuffix("m") .appendSeconds().appendSuffix("s") .toFormatter(); - private final AgentInstanceViewFactoryChainContext agentContext; + private final AgentInstanceViewFactoryChainContext context; private final long scheduleSlot; private EPStatementHandleCallback handle; private final Duration interval; @@ -46,7 +46,7 @@ class DurationWindow { { this.plugin = plugin; this.calc = calc; - agentContext = context; + this.context = context; scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); interval = parseInterval(intervalExpression); } @@ -64,7 +64,7 @@ class DurationWindow { */ private Duration parseInterval(ExprNode interval) { ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); - String intervalStr = (String)evaluator.evaluate(null, true, agentContext); + String intervalStr = (String)evaluator.evaluate(null, true, context); return parseInterval(intervalStr); } @@ -145,7 +145,7 @@ class DurationWindow { * Set up a callback to post an event when our time window expires. */ private void scheduleCallback(DateTime endTime) { - SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); + SchedulingService sched = context.getStatementContext().getSchedulingService(); if (handle != null) { // remove old schedule // log.info("Removing old callback"); @@ -165,7 +165,7 @@ class DurationWindow { } }; - handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); + handle = new EPStatementHandleCallback(context.getEpStatementAgentInstanceHandle(), callback); sched.add(callbackTime, handle, scheduleSlot); // log.info("Scheduled callback for {}", callbackTime); } From 36985bbfb098c6771bd08492a421f70f800a1dff Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Fri, 2 Nov 2018 11:48:33 -0700 Subject: [PATCH 03/17] OHLCPlugInViewFactory.java: allow int arg for interval --- src/main/java/ats/plugin/OHLCPlugInViewFactory.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java index 8aa3001..004a831 100644 --- a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java +++ b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java @@ -67,21 +67,23 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { value = validatedNodes[4]; Class calcTypeClass = calcType.getForge().getEvaluationType(); - if ((calcTypeClass != String.class)) + if (calcTypeClass != String.class) { throw new ViewParameterException("OHLC view needs a String-typed calc type value for parameter 1"); } Class timeTypeClass = timeType.getForge().getEvaluationType(); - if ((timeTypeClass != String.class)) + if (timeTypeClass != String.class) { throw new ViewParameterException("OHLC view needs a String-typed time type value for parameter 2"); } Class intervalClass = interval.getForge().getEvaluationType(); - if ((intervalClass != String.class)) + if ((intervalClass != String.class) && + (intervalClass != Integer.class) && + (intervalClass != int.class)) { - throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 3"); + throw new ViewParameterException("OHLC view needs String- or integer-typed interval value for parameter 3 - got " + intervalClass); } Class timestampClass = timestamp.getForge().getEvaluationType(); From d1225c27d17f490aa082ae9f1ec181102fc33789 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Fri, 2 Nov 2018 11:56:10 -0700 Subject: [PATCH 04/17] Make candlestick calc configurable --- src/main/java/ats/plugin/CandlestickCalc.java | 1 + .../ats/plugin/HeikinAshiCandlestickCalc.java | 57 +++++++++++++++++++ src/main/java/ats/plugin/OHLCPlugInView.java | 29 +++++++++- 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java diff --git a/src/main/java/ats/plugin/CandlestickCalc.java b/src/main/java/ats/plugin/CandlestickCalc.java index e9ee70d..2a539ce 100644 --- a/src/main/java/ats/plugin/CandlestickCalc.java +++ b/src/main/java/ats/plugin/CandlestickCalc.java @@ -5,6 +5,7 @@ package ats.plugin; * values. For example, OHLC or Heikin-Ashi. */ interface CandlestickCalc { + public enum Type { OHLC, HeikinAshi }; /** * Reset the calculation for the beginning of an interval. diff --git a/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java new file mode 100644 index 0000000..c832c4e --- /dev/null +++ b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java @@ -0,0 +1,57 @@ +package ats.plugin; + +/** + * HeikinAshiCandlestickCalc calculates values using the Heikin-Ashi + * technique. + */ +class HeikinAshiCandlestickCalc implements CandlestickCalc { + private Double open; + private Double close; + private Double high; + private Double low; + + + /** + * Reset the calculation for the beginning of an interval. + */ + @Override + public void reset() { + open = null; + close = null; + high = null; + low = null; + } + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + @Override + public void applyValue(double value) { + if (open == null) { + open = value; + } + + close = value; + + if (low == null) { + low = value; + } else if (low.compareTo(value) > 0) { + low = value; + } + + if (high == null) { + high = value; + } else if (high.compareTo(value) < 0) { + high = value; + } + } + + /** + * Return our calculated values up to now. + */ + @Override + public OHLCValues getValues() { + return new OHLCValues(open, high, low, close); + } +} diff --git a/src/main/java/ats/plugin/OHLCPlugInView.java b/src/main/java/ats/plugin/OHLCPlugInView.java index df288c2..3a3a2fe 100644 --- a/src/main/java/ats/plugin/OHLCPlugInView.java +++ b/src/main/java/ats/plugin/OHLCPlugInView.java @@ -27,6 +27,9 @@ public class OHLCPlugInView extends ViewSupport { private EventBean[] lastData; + /** + * Create a new plugin. + */ public OHLCPlugInView(AgentInstanceViewFactoryChainContext context, ExprNode calcTypeExpression, ExprNode timeTypeExpression, @@ -39,8 +42,32 @@ public class OHLCPlugInView extends ViewSupport { this.valueExpression = valueExpression; service = context.getStatementContext().getEventAdapterService(); - CandlestickCalc calc = new OHLCCandlestickCalc(); window = new DurationWindow(this, calc, context, intervalExpression); + CandlestickCalc calc = chooseCalc(calcTypeExpression); + } + + /** + * Return the proper CandlestickCalc according to the value of the + * calc type parameter. + */ + private CandlestickCalc chooseCalc(ExprNode calcTypeExpression) { + String calcType = exprNodeValue(calcTypeExpression); + + if (CandlestickCalc.Type.valueOf(calcType) == CandlestickCalc.Type.HeikinAshi) { + log.info("Using Heikin-Ashi calc type"); + return new HeikinAshiCandlestickCalc(); + } + + log.info("Using OHLC calc type"); + return new OHLCCandlestickCalc(); + } + + /** + * Return the string value of a node. + */ + private String exprNodeValue(ExprNode node) { + ExprEvaluator evaluator = node.getForge().getExprEvaluator(); + return (String)evaluator.evaluate(null, true, context); } /** From cc9f25c6b2421aa3d93c7e28d4bcfa840ae0ca4f Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Fri, 2 Nov 2018 12:01:29 -0700 Subject: [PATCH 05/17] Add configurable candlestick window types --- .../java/ats/plugin/CandlestickWindow.java | 17 ++++ src/main/java/ats/plugin/OHLCPlugInView.java | 27 +++++- .../ats/plugin/TicksCandlestickWindow.java | 87 +++++++++++++++++++ ...Window.java => TimeCandlestickWindow.java} | 20 +++-- 4 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 src/main/java/ats/plugin/CandlestickWindow.java create mode 100644 src/main/java/ats/plugin/TicksCandlestickWindow.java rename src/main/java/ats/plugin/{DurationWindow.java => TimeCandlestickWindow.java} (91%) diff --git a/src/main/java/ats/plugin/CandlestickWindow.java b/src/main/java/ats/plugin/CandlestickWindow.java new file mode 100644 index 0000000..ee2daf6 --- /dev/null +++ b/src/main/java/ats/plugin/CandlestickWindow.java @@ -0,0 +1,17 @@ +package ats.plugin; + +import org.joda.time.DateTime; + +/** + * A CandlestickWindow runs a CandlestickCalc across a number of + * ticks and sends + */ +interface CandlestickWindow { + public enum Type { time, ticks }; + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value); + +} diff --git a/src/main/java/ats/plugin/OHLCPlugInView.java b/src/main/java/ats/plugin/OHLCPlugInView.java index 3a3a2fe..3c7548c 100644 --- a/src/main/java/ats/plugin/OHLCPlugInView.java +++ b/src/main/java/ats/plugin/OHLCPlugInView.java @@ -22,8 +22,8 @@ public class OHLCPlugInView extends ViewSupport { private final AgentInstanceViewFactoryChainContext context; private final ExprNode timestampExpression; private final ExprNode valueExpression; - private DurationWindow window; - private EventAdapterService service; + private CandlestickWindow window; + private EventAdapterService service; private EventBean[] lastData; @@ -42,8 +42,8 @@ public class OHLCPlugInView extends ViewSupport { this.valueExpression = valueExpression; service = context.getStatementContext().getEventAdapterService(); - window = new DurationWindow(this, calc, context, intervalExpression); CandlestickCalc calc = chooseCalc(calcTypeExpression); + window = chooseWindow(context, timeTypeExpression, intervalExpression, calc); } /** @@ -62,6 +62,27 @@ public class OHLCPlugInView extends ViewSupport { return new OHLCCandlestickCalc(); } + /** + * Return the proper window according to the value of the + * calc type parameter. + * @param intervalExpression + */ + private CandlestickWindow chooseWindow(AgentInstanceViewFactoryChainContext context, + ExprNode timeTypeExpression, + ExprNode intervalExpression, + CandlestickCalc calc) + { + String timeType = exprNodeValue(timeTypeExpression); + + if (CandlestickWindow.Type.valueOf(timeType) == CandlestickWindow.Type.time) { + log.info("Using time based window type"); + return new TimeCandlestickWindow(this, calc, context, intervalExpression); + } + + log.info("Using ticks based window type"); + return new TicksCandlestickWindow(this, calc, context, intervalExpression); + } + /** * Return the string value of a node. */ diff --git a/src/main/java/ats/plugin/TicksCandlestickWindow.java b/src/main/java/ats/plugin/TicksCandlestickWindow.java new file mode 100644 index 0000000..d2f603c --- /dev/null +++ b/src/main/java/ats/plugin/TicksCandlestickWindow.java @@ -0,0 +1,87 @@ +package ats.plugin; + +import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; +import com.espertech.esper.epl.expression.core.ExprNode; + +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TicksCandlestickWindow runs a CandlestickCalc across a set number + * of ticks for each window. + */ +class TicksCandlestickWindow implements CandlestickWindow { + private static final Logger log = LoggerFactory.getLogger(TicksCandlestickWindow.class); + private final AgentInstanceViewFactoryChainContext context; + /** How many ticks in each window? */ + private final long windowTicks; + private OHLCPlugInView plugin; + private CandlestickCalc calc; + /** How many ticks into the window we are. */ + private long currentTick = 0; + /** The time the window started. */ + private DateTime windowStartTime; + + + /** + * Create a new window that calculates across a set number of + * ticks. + */ + public TicksCandlestickWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) + { + this.plugin = plugin; + this.calc = calc; + this.context = context; + windowTicks = parseTickCount(intervalExpression); + } + + /** + * Return the time period specified by the given expression value. + */ + private long parseTickCount(ExprNode intervalExpression) { + ExprEvaluator evaluator = intervalExpression.getForge().getExprEvaluator(); + Object o = evaluator.evaluate(null, true, context); + log.warn("OBJECT " + o.getClass()); + return new Long((Integer)o).longValue(); + } + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value) { + ensureWindow(timestamp); + calc.applyValue(value); + } + + /** + * Make sure our window times are set up and current. + * @param timestamp + */ + public void ensureWindow(DateTime timestamp) { + if (windowStartTime == null) + windowStartTime = timestamp; + + currentTick++; + log.info("tick ", currentTick); + + if (currentTick < windowTicks) return; + + OHLCValues values = calc.getValues(); + if (values == null) { + log.info("No data to post"); + return; + } + + plugin.postEvent(new OHLCEvent(windowStartTime, values)); + + // reset for next window + calc.reset(); + currentTick = 0; + windowStartTime = null; + } +} diff --git a/src/main/java/ats/plugin/DurationWindow.java b/src/main/java/ats/plugin/TimeCandlestickWindow.java similarity index 91% rename from src/main/java/ats/plugin/DurationWindow.java rename to src/main/java/ats/plugin/TimeCandlestickWindow.java index 523b241..23ef0e8 100644 --- a/src/main/java/ats/plugin/DurationWindow.java +++ b/src/main/java/ats/plugin/TimeCandlestickWindow.java @@ -17,10 +17,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * DurationWindow TODO + * TimeCandlestickWindow runs a CandlestickCalc across ticks for a set + * duration each window. */ -class DurationWindow { - private static final Logger log = LoggerFactory.getLogger(DurationWindow.class); +class TimeCandlestickWindow implements CandlestickWindow { + private static final Logger log = LoggerFactory.getLogger(TimeCandlestickWindow.class); private static final int LATE_EVENT_SLACK_SECONDS = 5; private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() .appendDays().appendSuffix("d") @@ -39,10 +40,13 @@ class DurationWindow { private DateTime lastStartTime; - public DurationWindow(OHLCPlugInView plugin, - CandlestickCalc calc, - AgentInstanceViewFactoryChainContext context, - ExprNode intervalExpression) + /** + * Create a window over the given duration. + */ + public TimeCandlestickWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) { this.plugin = plugin; this.calc = calc; @@ -161,7 +165,7 @@ class DurationWindow { public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { handle = null; // clear out schedule handle // log.info("Callback running"); - DurationWindow.this.postData(); + TimeCandlestickWindow.this.postData(); } }; From ed88eca0c77a5c17d3cf3ea90b91303f9525d5be Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Tue, 13 Nov 2018 09:24:40 -0800 Subject: [PATCH 06/17] Use correct HA calc --- .../ats/plugin/HeikinAshiCandlestickCalc.java | 87 ++++++++++++++----- .../ats/plugin/TicksCandlestickWindow.java | 2 - 2 files changed, 66 insertions(+), 23 deletions(-) diff --git a/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java index c832c4e..f6897c5 100644 --- a/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java +++ b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java @@ -3,23 +3,30 @@ package ats.plugin; /** * HeikinAshiCandlestickCalc calculates values using the Heikin-Ashi * technique. + * + * @see Heikin-Ashi description */ class HeikinAshiCandlestickCalc implements CandlestickCalc { - private Double open; - private Double close; - private Double high; - private Double low; + private Double lastOpen; + private Double lastClose; + private Double currentOpen; + private Double currentClose; + private Double currentHigh; + private Double currentLow; + - /** * Reset the calculation for the beginning of an interval. */ @Override public void reset() { - open = null; - close = null; - high = null; - low = null; + lastOpen = currentOpen; + lastClose = currentClose; + + currentOpen = null; + currentClose = null; + currentHigh = null; + currentLow = null; } /** @@ -28,30 +35,68 @@ class HeikinAshiCandlestickCalc implements CandlestickCalc { */ @Override public void applyValue(double value) { - if (open == null) { - open = value; + if (currentOpen == null) { + currentOpen = value; } - close = value; + currentClose = value; - if (low == null) { - low = value; - } else if (low.compareTo(value) > 0) { - low = value; + if (currentLow == null) { + currentLow = value; + } else if (currentLow.compareTo(value) > 0) { + currentLow = value; } - if (high == null) { - high = value; - } else if (high.compareTo(value) < 0) { - high = value; + if (currentHigh == null) { + currentHigh = value; + } else if (currentHigh.compareTo(value) < 0) { + currentHigh = value; } } + /** + * HA-Open = (HA-Open(-1) + HA-Close(-1)) / 2 + */ + private double calcOpen() { + double sum = 0; + + sum += lastOpen != null ? lastOpen : currentOpen; + sum += lastClose != null ? lastClose : currentClose; + + return sum / 2d; + } + + /** + * HA-Close = (Open(0) + High(0) + Low(0) + Close(0)) / 4 + */ + private double calcClose() { + return (currentOpen + currentHigh + currentLow + currentClose) / 4d; + } + + /** + * HA-High = Maximum of the High(0), HA-Open(0) or HA-Close(0) + */ + private double calcHigh(double haOpen, double haClose) { + return Math.max(currentHigh, Math.max(haOpen, haClose)); + } + + /** + * HA-Low = Minimum of the Low(0), HA-Open(0) or HA-Close(0) + */ + private double calcLow(double haOpen, double haClose) { + return Math.min(currentLow, Math.min(haOpen, haClose)); + } + /** * Return our calculated values up to now. */ @Override public OHLCValues getValues() { - return new OHLCValues(open, high, low, close); + double o = calcOpen(); + double c = calcClose(); + double h = calcHigh(o, c); + double l = calcLow(o, c); + + return new OHLCValues(o, h, l, c); } } diff --git a/src/main/java/ats/plugin/TicksCandlestickWindow.java b/src/main/java/ats/plugin/TicksCandlestickWindow.java index d2f603c..94801ed 100644 --- a/src/main/java/ats/plugin/TicksCandlestickWindow.java +++ b/src/main/java/ats/plugin/TicksCandlestickWindow.java @@ -46,7 +46,6 @@ class TicksCandlestickWindow implements CandlestickWindow { private long parseTickCount(ExprNode intervalExpression) { ExprEvaluator evaluator = intervalExpression.getForge().getExprEvaluator(); Object o = evaluator.evaluate(null, true, context); - log.warn("OBJECT " + o.getClass()); return new Long((Integer)o).longValue(); } @@ -67,7 +66,6 @@ class TicksCandlestickWindow implements CandlestickWindow { windowStartTime = timestamp; currentTick++; - log.info("tick ", currentTick); if (currentTick < windowTicks) return; From 22a4a76561d01371f1476e9a4d92b2a3331b98ea Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 18:28:18 -0800 Subject: [PATCH 07/17] Improve trading window - can specify times as "hh:mm" --- epl/trading_system_1.epl | 15 +++++++-------- src/main/java/EPLHelpers.java | 30 +++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 01ff8b8..743d7c8 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -9,11 +9,11 @@ -- The time the trading logic will begin to enter trades. -- Exiting trades is 24/7. -create constant variable int StartTimeHour = 9 +create constant variable DateTime TradingStartTime = EPLHelpers.parseTime("00:00") --- The time the trading logic will begin to enter trades. +-- The time the trading logic will no longer enter trades. -- Exiting trades is 24/7. -create constant variable int EndTimeHour = 17 +create constant variable DateTime TradingEndTime = EPLHelpers.parseTime("23:59") -- The time frame for OHLC calculation. -- Example values: '5s', '1m', '1h 30m', '2h', '12h', '1d'. @@ -47,14 +47,13 @@ insert into CurrentTickWindow select * from TickEvent -- -- InTradingHours will be set to true when the current time is between --- StartTime and EndTime. +-- TradingStartTime and TradingEndTime. create variable bool InTradingHours = false -- Update on each tick --- NOTE: see "timer:within" pattern for possible alternate formulation -on TickEvent as t set InTradingHours = - (EPLHelpers.getHour(t.time) >= StartTimeHour and - EPLHelpers.getHour(t.time) < EndTimeHour) +on TickEvent as t + set InTradingHours = EPLHelpers.inTimeRange(t.time, TradingStartTime, TradingEndTime) + -- diff --git a/src/main/java/EPLHelpers.java b/src/main/java/EPLHelpers.java index ac40a71..f40af94 100644 --- a/src/main/java/EPLHelpers.java +++ b/src/main/java/EPLHelpers.java @@ -1,27 +1,43 @@ import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.DateTimeComparator; - +/** + * EPLHelpers contains small helper routines used within epl files. + */ public class EPLHelpers { final static Logger log = LoggerFactory.getLogger(EPLHelpers.class); + private static DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("HH:mm"); + private static DateTimeComparator timeComparator = DateTimeComparator.getTimeOnlyInstance(); - /** Return the hour of the day for the given date. */ - public static int getHour(DateTime date) { - return date.getHourOfDay(); + /** + * Return a DateTime object for the time. Should be specified in + * 24-hour "hh:mm" format. + */ + public static DateTime parseTime(String time) { + return timeFormatter.parseDateTime(time); } /** A simple toString() wrapper for use in epl. */ public static String str(Object o) { return o.toString(); } + /** + * Return true if the time portion of 'now' is between the time + * portion of 'start' and 'end'. + */ + public static boolean inTimeRange(DateTime now, DateTime start, DateTime end) { + return laterThan(now, start) && earlierThan(now, end); + } /** * Compare two times and return true if the first is earlier than * the second. */ public static boolean earlierThan(DateTime a, DateTime b) { - return DateTimeComparator.getInstance().compare(a, b) < 0; + return timeComparator.compare(a, b) < 0; } /** @@ -29,6 +45,6 @@ public class EPLHelpers { * the second. */ public static boolean laterThan(DateTime a, DateTime b) { - return DateTimeComparator.getInstance().compare(a, b) > 0; + return timeComparator.compare(a, b) > 0; } } From 36cefa32e6f39cc536217c01d1b9d7c1f6a7861e Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 18:29:25 -0800 Subject: [PATCH 08/17] EPLHelpers.str() now takes varargs --- src/main/java/EPLHelpers.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/EPLHelpers.java b/src/main/java/EPLHelpers.java index f40af94..628879d 100644 --- a/src/main/java/EPLHelpers.java +++ b/src/main/java/EPLHelpers.java @@ -1,3 +1,5 @@ +import java.util.StringJoiner; + import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; import org.joda.time.format.DateTimeFormat; @@ -14,6 +16,17 @@ public class EPLHelpers { private static DateTimeComparator timeComparator = DateTimeComparator.getTimeOnlyInstance(); + /** + * A simple toString() wrapper. + */ + public static String str(Object... objects) { + StringJoiner sj = new StringJoiner(", "); + for (Object o : objects) { + sj.add(o != null ? o.toString() : "null"); + } + return sj.toString(); + } + /** * Return a DateTime object for the time. Should be specified in * 24-hour "hh:mm" format. @@ -22,8 +35,6 @@ public class EPLHelpers { return timeFormatter.parseDateTime(time); } - /** A simple toString() wrapper for use in epl. */ - public static String str(Object o) { return o.toString(); } /** * Return true if the time portion of 'now' is between the time * portion of 'start' and 'end'. From 104eb216b51e2afe66258f4cfd183eda66d547bc Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 18:31:53 -0800 Subject: [PATCH 09/17] Make DateTime available without package prefix in epl --- epl/trading_system_1.epl | 20 ++++++++++---------- src/main/java/EsperProcessor.java | 2 ++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 743d7c8..0273897 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -97,7 +97,7 @@ insert into SMACloseStream -- A stream that feeds B1 and B2. Each event contains a double -- precision floating point value "low", and a timestamp called -- "time". -create schema BStream as (low double, time org.joda.time.DateTime) +create schema BStream as (low double, time DateTime) -- Listen to the last "RefSize" number of SMACloseStream events. Look -- for an OHLC bar with a lower average close than its neighbors. Add @@ -111,7 +111,7 @@ insert into BStream -- Define B1 to contain the same fields as BStream -create schema B1 (low double, time org.joda.time.DateTime) +create schema B1 (low double, time DateTime) -- B1 contains the most recent low value and time from BStream. -- This is the last time an average close was lower @@ -122,14 +122,14 @@ insert into B1 select prev(0, low) as low, prev(0, time) as time -- B2 contains the *second* most recent occurrence in BStream, but is -- otherwise the same as B1. -create schema B2 (low double, time org.joda.time.DateTime) +create schema B2 (low double, time DateTime) insert into B2 select prev(1, low) as low, prev(1, time) as time from BStream#length(RefSize) -- A stream that feeds P1 and P2. -create schema PStream as (low double, time org.joda.time.DateTime) +create schema PStream as (low double, time DateTime) -- Find an OHLC bar with a higher average close than its neighbors. -- Add that low value and its timestamp to the stream. @@ -142,13 +142,13 @@ insert into PStream -- This is the last time an average close was higher -- than the ones before and after. -- Since the time is included in the event, no separate PT1 is needed. -create schema P1 (low double, time org.joda.time.DateTime) +create schema P1 (low double, time DateTime) insert into P1 select prev(0, low) as low, prev(0, time) as time from PStream#length(RefSize) -- P2 contains the second most recent occurrence in PStream. -create schema P2 (low double, time org.joda.time.DateTime) +create schema P2 (low double, time DateTime) insert into P2 select prev(1, low) as low, prev(1, time) as time from PStream#length(RefSize) @@ -167,9 +167,9 @@ insert into MaxHigh3Window select max(high) as high from OHLCStream#length(3) --- Long entry events contain the current tick's midpoint value and --- timestamp. -create schema LongEntryStream as (current BigDecimal, time org.joda.time.DateTime, instrument String) +-- Long entry events contain the current tick's midpoint value, +-- timestamp, and instrument name. +create schema LongEntryStream as (current BigDecimal, time DateTime, instrument String) -- The long entry calc below is translated from this entry in the -- spreadsheet: @@ -202,7 +202,7 @@ insert into LongEntryStream -- -- LongEntryDistinct filters out duplicate LongEntryStream events, -- leaving a maximum of one event per tick. -create schema LongEntryDistinct as (current BigDecimal, time org.joda.time.DateTime, +create schema LongEntryDistinct as (current BigDecimal, time DateTime, instrument String, units int) insert into LongEntryDistinct diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index 755ae71..eb6e384 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -9,6 +9,7 @@ import com.espertech.esper.client.StatementAwareUpdateListener; import com.espertech.esper.client.UpdateListener; import com.espertech.esper.client.time.CurrentTimeEvent; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class EsperProcessor implements TickProcessor { config.addEventType(TickEvent.class); config.addEventType(OHLCEvent.class); config.addEventType(OHLCValueEvent.class); + config.addEventType(DateTime.class); // add OHLC plugin config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); From 9a743b89198e6840990a0f2fcce547c14d7412ea Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 18:33:35 -0800 Subject: [PATCH 10/17] Rename CurrentTickWindow -> CurrentTick --- epl/trading_system_1.epl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 0273897..4ff6487 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -35,11 +35,11 @@ create constant variable int RefSize = 5 -- Define the window as length 1, using the structure of the TickEvent -- java class to describe what the window contains. -create window CurrentTickWindow#length(1) as TickEvent +create window CurrentTick#length(1) as TickEvent -- Describe how events get added to the window. This runs every time -- a new TickEvent is posted. -insert into CurrentTickWindow select * from TickEvent +insert into CurrentTick select * from TickEvent -- @@ -183,7 +183,7 @@ create schema LongEntryStream as (current BigDecimal, time DateTime, instrument insert into LongEntryStream select C.mid as current, C.time as time, C.instrument as instrument - from CurrentTickWindow as C, + from CurrentTick as C, MaxHigh3Window as T, B1#lastevent, B2#lastevent, P1#lastevent, P2#lastevent @@ -194,7 +194,7 @@ insert into LongEntryStream and EPLHelpers.laterThan(B2.time, P2.time) and EPLHelpers.laterThan(P1.time, B2.time) --- Because multiple streams feed LongEntryStream (CurrentTickWindow, +-- Because multiple streams feed LongEntryStream (CurrentTick, -- MaxHigh3Window, B1, B2...), an event on any of those streams causes -- the LongEntryStream logic above to be triggered. This often causes -- multiple LongEntryStream events to be generated for a single tick From a9f9e446515cca85a0ca0000a7419a21f1313e19 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 18:34:43 -0800 Subject: [PATCH 11/17] trading_system_1.epl: only LE within trading hours --- epl/trading_system_1.epl | 1 + 1 file changed, 1 insertion(+) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 4ff6487..659face 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -193,6 +193,7 @@ insert into LongEntryStream and EPLHelpers.laterThan(B1.time, P1.time) and EPLHelpers.laterThan(B2.time, P2.time) and EPLHelpers.laterThan(P1.time, B2.time) + and InTradingHours -- Because multiple streams feed LongEntryStream (CurrentTick, -- MaxHigh3Window, B1, B2...), an event on any of those streams causes From 92c3636d8ff1a762520d3959cafc29954154fcf2 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 20:41:28 -0800 Subject: [PATCH 12/17] Add HKStream, TBStream, and TBHKStream --- epl/trading_system_1.epl | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 659face..b0fc9cb 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -19,6 +19,9 @@ create constant variable DateTime TradingEndTime = EPLHelpers.parseTime("23:59") -- Example values: '5s', '1m', '1h 30m', '2h', '12h', '1d'. create constant variable string OHLCInterval = '10s' +-- The number of ticks for OHLC TB calculation. +create constant variable int OHLCTicks = 100 + -- Amount to be traded, measured in units. create constant variable int TradeSize = 10 @@ -60,16 +63,46 @@ on TickEvent as t -- A stream of OHLC values calculated from TickEvents -- --- Create the stream to contain OHLCEvents -create variant schema OHLCStream as OHLCEvent - -- Send every TickEvent to the OHLC plugin. The plugin will post an -- OHLCEvent to OHLCStream every OHLCInterval amount of time. It uses -- TickEvent.time ("time") as the source of the timestamp, and uses -- TickEvent.midDouble() as the value to use in the OHLC calculation. + +create variant schema OHLCStream as OHLCEvent + insert into OHLCStream select * from TickEvent#OHLC("OHLC", "time", OHLCInterval, time, midDouble) +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent using Heikin-Ashi calculations to HKStream every +-- OHLCInterval amount of time. It uses TickEvent.midDouble() as the +-- value to use in the OHLC calculation. + +create variant schema HKStream as OHLCEvent + +insert into HKStream + select * from TickEvent#OHLC("HeikinAshi", "time", OHLCInterval, time, midDouble) + +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent to TBStream every OHLCTicks ticks. It uses +-- TickEvent.time ("time") as the source of the timestamp, and uses +-- TickEvent.midDouble() as the value to use in the OHLC calculation. + +create variant schema TBStream as OHLCEvent + +insert into TBStream + select * from TickEvent#OHLC("HeikinAshi", "ticks", OHLCTicks, time, midDouble) + +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent using Heikin-Ashi calculations to TBHKStream every +-- OHLCTicks ticks. It uses TickEvent.midDouble() as the value to use +-- in the OHLC calculation. + +create variant schema TBHKStream as OHLCEvent + +insert into TBHKStream + select * from TickEvent#OHLC("HeikinAshi", "ticks", OHLCTicks, time, midDouble) + -- -- Simple moving average streams From 8cf12a27fb80fb0bd2854be6ac2f8a42e75968ac Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 20:53:09 -0800 Subject: [PATCH 13/17] Replace OHLCValueEvent --- epl/trading_system_1.epl | 27 +++++++------ src/main/java/EsperProcessor.java | 2 - src/main/java/ats/plugin/OHLCValueEvent.java | 42 -------------------- 3 files changed, 14 insertions(+), 57 deletions(-) delete mode 100644 src/main/java/ats/plugin/OHLCValueEvent.java diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index b0fc9cb..78dc401 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -108,18 +108,21 @@ insert into TBHKStream -- Simple moving average streams -- --- SMACloseStream contains OHLCValueEvents. These are like --- OHLCEvents, but add an extra field for an arbitrary value. In this --- stream, that extra value will contain the average of OHLC close --- values. -create schema SMACloseStream as ats.plugin.OHLCValueEvent +-- SMACloseStream bundles a simple moving average of the close values +-- with the values of the OHLCStream. +create schema SMACloseStream as (time DateTime, + open double, + high double, + low double, + close double, + averageClose double) -- Average the most recent OHLC close values from OHLCStream and -- post an event that contains open, high, low, close, and -- SMA(close). The number of OHLC events used in the SMA calc is set -- by the SMASize variable. insert into SMACloseStream - select new ats.plugin.OHLCValueEvent(time, open, high, low, close, Avg(close)) + select time, open, high, low, close, Avg(close) as averageClose from OHLCStream#length(SMASize) @@ -134,13 +137,11 @@ create schema BStream as (low double, time DateTime) -- Listen to the last "RefSize" number of SMACloseStream events. Look -- for an OHLC bar with a lower average close than its neighbors. Add --- that bar's low value and its timestamp to the stream. As described --- in SMACloseStream, "value" in the query below represents --- SMA(close). +-- that bar's low value and its timestamp to the stream. insert into BStream select prev(1, low) as low, prev(1, time) as time from SMACloseStream#length(RefSize) - where prev(0, value) > prev(1, value) - and prev(1, value) < prev(2, value) + where prev(0, averageClose) > prev(1, averageClose) + and prev(1, averageClose) < prev(2, averageClose) -- Define B1 to contain the same fields as BStream @@ -168,8 +169,8 @@ create schema PStream as (low double, time DateTime) -- Add that low value and its timestamp to the stream. insert into PStream select prev(1, low) as low, prev(1, time) as time from SMACloseStream#length(RefSize) - where prev(0, value) < prev(1, value) - and prev(1, value) > prev(2, value) + where prev(0, averageClose) < prev(1, averageClose) + and prev(1, averageClose) > prev(2, averageClose) -- P1 contains the most recent low value and time from PStream. -- This is the last time an average close was higher diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index eb6e384..c07fa6b 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory; import ats.orders.MarketOrderRequest; import ats.plugin.OHLCEvent; import ats.plugin.OHLCPlugInViewFactory; -import ats.plugin.OHLCValueEvent; public class EsperProcessor implements TickProcessor { @@ -36,7 +35,6 @@ public class EsperProcessor implements TickProcessor { // register event types defined in java classes config.addEventType(TickEvent.class); config.addEventType(OHLCEvent.class); - config.addEventType(OHLCValueEvent.class); config.addEventType(DateTime.class); // add OHLC plugin diff --git a/src/main/java/ats/plugin/OHLCValueEvent.java b/src/main/java/ats/plugin/OHLCValueEvent.java deleted file mode 100644 index ed12b74..0000000 --- a/src/main/java/ats/plugin/OHLCValueEvent.java +++ /dev/null @@ -1,42 +0,0 @@ -package ats.plugin; - -import org.joda.time.DateTime; - -/** - * OHLCValueEvent stores one bar of OHLC info. - */ -public class OHLCValueEvent extends OHLCEvent { - private double value; - - - public OHLCValueEvent() { - this(null, 0, 0, 0, 0, 0); - } - - public OHLCValueEvent(DateTime time, - double open, double high, - double low, double close, - double value) - { - super(time, open, high, low, close); - this.value = value; - } - - public static OHLCValueEvent make(DateTime time, - double open, double high, - double low, double close, - double value) - { - return new OHLCValueEvent(time, open, high, low, close, value); - } - - public Double getValue() { return value; } - - /** - * Return a human readable representation of this event. - */ - public String toString() { - return String.format("OHLCValueEvent[%s, open=%.3f, high=%.3f, low=%.3f, close=%.3f, value=%.3f]", - getTime(), getOpen(), getHigh(), getLow(), getClose(), value); - } -} From b0f254b0063b96074786f9bb646eba784fec0568 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 21:06:51 -0800 Subject: [PATCH 14/17] Prevent LE when we're already in a LE --- epl/trading_system_1.epl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 78dc401..d455a3d 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -57,6 +57,12 @@ create variable bool InTradingHours = false on TickEvent as t set InTradingHours = EPLHelpers.inTimeRange(t.time, TradingStartTime, TradingEndTime) + +-- +-- In long position. Set later. +-- + +create variable bool InLongEntry = false -- @@ -228,6 +234,7 @@ insert into LongEntryStream and EPLHelpers.laterThan(B2.time, P2.time) and EPLHelpers.laterThan(P1.time, B2.time) and InTradingHours + and not InLongEntry -- Because multiple streams feed LongEntryStream (CurrentTick, -- MaxHigh3Window, B1, B2...), an event on any of those streams causes @@ -246,6 +253,14 @@ insert into LongEntryDistinct from pattern [every-distinct(le.time) le=LongEntryStream] +-- +-- Long entry derived variables +-- + +-- Register if We're in a long entry when we get the LongEntryDistinct +-- event. (InLongEntry is declared near top of file) +on LongEntryDistinct as le set InLongEntry = true + -- -- Event logging -- From 7680ed81a278ab1b8ba2059f98c70f820b3db07b Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 21:08:36 -0800 Subject: [PATCH 15/17] Add LE derived values --- epl/trading_system_1.epl | 51 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index d455a3d..f64aa54 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -244,8 +244,11 @@ insert into LongEntryStream -- -- LongEntryDistinct filters out duplicate LongEntryStream events, -- leaving a maximum of one event per tick. -create schema LongEntryDistinct as (current BigDecimal, time DateTime, - instrument String, units int) +create schema LongEntryDistinct + as (current BigDecimal, + time DateTime, + instrument String, + units int) insert into LongEntryDistinct select le.current as current, le.time as time, @@ -254,13 +257,55 @@ insert into LongEntryDistinct -- --- Long entry derived variables +-- Long entry derived values -- -- Register if We're in a long entry when we get the LongEntryDistinct -- event. (InLongEntry is declared near top of file) on LongEntryDistinct as le set InLongEntry = true +-- LongEntryTime contains the timestamp of the last long entry, +-- or null if none has happened yet. +create variable DateTime LongEntryTime = null + +on LongEntryDistinct as le set LongEntryTime = le.time + +-- LongEntryStopBarCount contains the number of OHLCStream bars that +-- have occurred since the most recent long entry. +create variable int LongEntryStopBarCount = 0 + +-- Reset LongEntryStopBarCount when we get a new LE. +on LongEntryDistinct as le set LongEntryStopBarCount = 0 + +-- Increment LongEntryStopBarCount on every bar. +on OHLCStream as ohlc set LongEntryStopBarCount = LongEntryStopBarCount + 1 + +-- LongEntryPrice contains the price of the last long entry, +-- or null if none has happened yet. +create variable BigDecimal LongEntryPrice = null + +on LongEntryDistinct as le set LongEntryPrice = le.current + +-- LongEntryPreEntryBar is a stream that keeps bars just prior to the +-- last long entry +create schema LongEntryPreEntryBar + as (time DateTime, + open double, + high double, + low double, + close double) + +-- Add bars so long as we're not in a long position +insert into LongEntryPreEntryBar + select time, open, high, low, close from OHLCStream + where InLongEntry is false + +-- Contains the second-most-recent LongEntryPreEntryBar's "low" value +create schema LongEntryPreEntryBarPrevLow (low double) + +insert into LongEntryPreEntryBarPrevLow select prev(1, low) as low + from LongEntryPreEntryBar#length(2) + -- -- Event logging -- From d83b1da903c935a4d639bb93b8027e0ca0cbf81e Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 21:14:31 -0800 Subject: [PATCH 16/17] Add long exit (LX) code --- epl/trading_system_1.epl | 37 +++++++++++++++++++++++++++++++ src/main/java/EsperProcessor.java | 14 ++++++++++++ 2 files changed, 51 insertions(+) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index f64aa54..ec9039f 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -25,6 +25,9 @@ create constant variable int OHLCTicks = 100 -- Amount to be traded, measured in units. create constant variable int TradeSize = 10 +-- How large is a pip? +create constant variable BigDecimal PipSize = new BigDecimal(0.0001) + -- How many events to use for simple moving average calculation create constant variable int SMASize = 5 @@ -306,6 +309,40 @@ create schema LongEntryPreEntryBarPrevLow (low double) insert into LongEntryPreEntryBarPrevLow select prev(1, low) as low from LongEntryPreEntryBar#length(2) + +-- +-- Long exit +-- + +-- Long exit event definition +create schema LongExitStream + as (current BigDecimal, + time DateTime, + instrument String, + units int) + +-- The long exit calc below is translated from this entry in the +-- spreadsheet: +-- +-- LX = (LESBC <= 60 and C < MIN(PreEntrybar(Low,1), (LongEntryPrice - 10 pips))) +-- or +-- (LESBC>60 and C<(EntryPrice + 2 pips) + +insert into LongExitStream + select C.mid as current, + C.time as time, + C.instrument as instrument, + 0 - TradeSize as units -- negative for "sell" + from CurrentTick as C, + LongEntryPreEntryBarPrevLow#lastevent as L + where ((LongEntryStopBarCount <= 60 and C.mid < min(L.low, LongEntryPrice - (10 * PipSize))) or + (LongEntryStopBarCount > 60 and C.mid < LongEntryPrice + (2 * PipSize))) + and InLongEntry + +-- Register the long position as closed +on LongExitStream as lx set InLongEntry = false + + -- -- Event logging -- diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index c07fa6b..5a2c186 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -61,6 +61,20 @@ public class EsperProcessor implements TickProcessor { newData[0].get("current"), newData[0].get("time")); }); + + // respond to long exit events + addStatement("select * from LongExitStream", + (newData, oldData) -> { + String instrument = (String)newData[0].get("instrument"); + Integer units = (Integer)newData[0].get("units"); + trader.placeOrder(new MarketOrderRequest(instrument, units)); + + log.debug("Long exit triggered: {} of {} at price {} at time {}", + units, + instrument, + newData[0].get("current"), + newData[0].get("time")); + }); } /** From 3d65a8e91e6cad251aedac4e3e6669ecd40686a2 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Wed, 14 Nov 2018 21:15:22 -0800 Subject: [PATCH 17/17] Add many epl log definitions --- epl/trading_system_1.epl | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index ec9039f..3d25e63 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -355,27 +355,51 @@ create schema LogStream as (stream string, event string) -- of these can be either helpful or too noisy. Comment/uncomment as -- you see fit. -insert into LogStream select 'TickEvent' as stream, EPLHelpers.str(*) as event from TickEvent +-- insert into LogStream select 'TickEvent' as stream, EPLHelpers.str(*) as event from TickEvent -insert into LogStream select 'OHLCStream' as stream, EPLHelpers.str(*) as event from OHLCStream +-- insert into LogStream select 'OHLCStream' as stream, EPLHelpers.str(*) as event from OHLCStream + +-- insert into LogStream select 'HKStream' as stream, EPLHelpers.str(*) as event from HKStream + +-- insert into LogStream select 'TBStream' as stream, EPLHelpers.str(*) as event from TBStream + +-- insert into LogStream select 'TBHKStream' as stream, EPLHelpers.str(*) as event from TBHKStream + +-- insert into LogStream select 'InTradingHours' as stream, EPLHelpers.str(time, InTradingHours) as event from TickEvent -- insert into LogStream select 'BStream' as stream, EPLHelpers.str(*) as event from BStream -- insert into LogStream select 'PStream' as stream, EPLHelpers.str(*) as event from PStream -insert into LogStream select 'B1' as stream, EPLHelpers.str(*) as event from B1 +-- insert into LogStream select 'B1' as stream, EPLHelpers.str(*) as event from B1 -insert into LogStream select 'B2' as stream, EPLHelpers.str(*) as event from B2 +-- insert into LogStream select 'B2' as stream, EPLHelpers.str(*) as event from B2 -insert into LogStream select 'P1' as stream, EPLHelpers.str(*) as event from P1 +-- insert into LogStream select 'P1' as stream, EPLHelpers.str(*) as event from P1 -insert into LogStream select 'P2' as stream, EPLHelpers.str(*) as event from P2 +-- insert into LogStream select 'P2' as stream, EPLHelpers.str(*) as event from P2 -- insert into LogStream select 'MaxHigh3Window' as stream, EPLHelpers.str(*) as event from MaxHigh3Window -- insert into LogStream select 'LongEntryStream' as stream, EPLHelpers.str(*) as event from LongEntryStream --- insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as event from LongEntryDistinct +insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as event from LongEntryDistinct +-- insert into LogStream select 'InLongEntry' as stream, EPLHelpers.str(InLongEntry) as event from OHLCStream + +-- insert into LogStream select 'LongEntryTime' as stream, EPLHelpers.str(LongEntryTime) as event from TickEvent + +-- insert into LogStream select 'LongEntryPrice' as stream, EPLHelpers.str(LongEntryPrice) as event from OHLCStream + +-- insert into LogStream select 'LongEntryStopBarCount' as stream, EPLHelpers.str(LongEntryStopBarCount) as event from OHLCStream where InLongEntry + +-- insert into LogStream select 'LongEntryPreEntryBar' as stream, EPLHelpers.str(*) as event from LongEntryPreEntryBar + +-- insert into LogStream select 'LongEntryPreEntryBarPrevLow' as stream, EPLHelpers.str(*) as event from LongEntryPreEntryBarPrevLow + +-- insert into LogStream select 'LXPrice' as stream, EPLHelpers.str(mid, LongEntryPrice, LongEntryPrice - (10 * PipSize)) as event from TickEvent +-- where InLongEntry and mid < LongEntryPrice + +insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream -- TODO (for Seth): look into LogSink http://esper.espertech.com/release-7.1.0/esper-reference/html/dataflow.html#dataflow-reference-logsink