Add configurable candlestick window types

This commit is contained in:
2018-11-02 12:01:29 -07:00
parent d1225c27d1
commit cc9f25c6b2
4 changed files with 140 additions and 11 deletions

View File

@ -0,0 +1,198 @@
package ats.plugin;
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
import com.espertech.esper.core.service.EPStatementHandleCallback;
import com.espertech.esper.core.service.EngineLevelExtensionServicesContext;
import com.espertech.esper.epl.expression.core.ExprEvaluator;
import com.espertech.esper.epl.expression.core.ExprNode;
import com.espertech.esper.schedule.ScheduleHandleCallback;
import com.espertech.esper.schedule.SchedulingService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TimeCandlestickWindow runs a CandlestickCalc across ticks for a set
* duration each window.
*/
class TimeCandlestickWindow implements CandlestickWindow {
private static final Logger log = LoggerFactory.getLogger(TimeCandlestickWindow.class);
private static final int LATE_EVENT_SLACK_SECONDS = 5;
private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder()
.appendDays().appendSuffix("d")
.appendHours().appendSuffix("h")
.appendMinutes().appendSuffix("m")
.appendSeconds().appendSuffix("s")
.toFormatter();
private final AgentInstanceViewFactoryChainContext context;
private final long scheduleSlot;
private EPStatementHandleCallback handle;
private final Duration interval;
private OHLCPlugInView plugin;
private CandlestickCalc calc;
private DateTime windowStartTime;
private DateTime windowEndTime;
private DateTime lastStartTime;
/**
* Create a window over the given duration.
*/
public TimeCandlestickWindow(OHLCPlugInView plugin,
CandlestickCalc calc,
AgentInstanceViewFactoryChainContext context,
ExprNode intervalExpression)
{
this.plugin = plugin;
this.calc = calc;
this.context = context;
scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot();
interval = parseInterval(intervalExpression);
}
/**
* Process a tick with the given time and value.
*/
public void update(DateTime timestamp, double value) {
ensureWindow(timestamp);
calc.applyValue(value);
}
/**
* Return the time period specified by the given expression value.
*/
private Duration parseInterval(ExprNode interval) {
ExprEvaluator evaluator = interval.getForge().getExprEvaluator();
String intervalStr = (String)evaluator.evaluate(null, true, context);
return parseInterval(intervalStr);
}
/**
* Return the time period specified by the given string.
*/
public static Duration parseInterval(String interval) {
interval = interval.replaceAll("\\s+","");
return periodFormatter.parsePeriod(interval).toStandardDuration();
}
/**
* Make sure our window times are set up and current.
*/
public void ensureWindow(DateTime timestamp) {
if (timestamp == null) return;
if (windowStartTime == null) {
// create open window
windowStartTime = makeWindowStartTime(timestamp, interval);
windowEndTime = makeWindowEndTime(windowStartTime, interval);
scheduleCallback(windowEndTime);
}
if (!inWindow(timestamp)) {
// past current window.
// post and create a new one.
postData();
windowStartTime = makeWindowStartTime(timestamp, interval);
windowEndTime = makeWindowEndTime(windowStartTime, interval);
scheduleCallback(windowEndTime);
}
}
public static DateTime makeWindowStartTime(DateTime timestamp,
Duration interval)
{
DateTime today = timestamp.withTimeAtStartOfDay();
// log.info("Timestamp is {}", timestamp);
// log.info("Day start is {}", today);
Duration intoToday = new Duration(today, timestamp);
// calc how far into the current window we are
long intoPeriod = intoToday.getMillis() % interval.getMillis();
return timestamp.minus(intoPeriod);
}
private static DateTime makeWindowEndTime(DateTime startTime,
Duration interval)
{
if (startTime == null || interval == null) return null;
return startTime.plus(interval.getMillis());
}
/**
* Return true if the timestamp is within the current time window.
*/
private boolean inWindow(DateTime timestamp) {
if (timestamp == null) return false;
return timestamp.compareTo(windowStartTime) >= 0 &&
timestamp.compareTo(windowEndTime) < 0;
}
/**
* Convert a bare long value to a proper DateTime entity. Assumes
* UTC time zone.
*/
public static DateTime toDateTime(long l) {
return new DateTime(l, DateTimeZone.UTC);
}
/**
* Set up a callback to post an event when our time window expires.
*/
private void scheduleCallback(DateTime endTime) {
SchedulingService sched = context.getStatementContext().getSchedulingService();
if (handle != null) {
// remove old schedule
// log.info("Removing old callback");
sched.remove(handle, scheduleSlot);
handle = null;
}
DateTime currentTime = toDateTime(sched.getTime());
DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS);
long callbackTime = targetTime.getMillis() - currentTime.getMillis();
ScheduleHandleCallback callback = new ScheduleHandleCallback() {
public void scheduledTrigger(EngineLevelExtensionServicesContext esc) {
handle = null; // clear out schedule handle
// log.info("Callback running");
TimeCandlestickWindow.this.postData();
}
};
handle = new EPStatementHandleCallback(context.getEpStatementAgentInstanceHandle(), callback);
sched.add(callbackTime, handle, scheduleSlot);
// log.info("Scheduled callback for {}", callbackTime);
}
/**
* Update listeners with our new value.
*/
private void postData() {
if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) {
log.warn("DUP START TIME");
}
// log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread());
OHLCValues values = calc.getValues();
if (values == null) {
// log.info("No data to post");
return;
}
plugin.postEvent(new OHLCEvent(windowStartTime, values));
calc.reset();
lastStartTime = windowStartTime;
}
}