package ats.plugin; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; import com.espertech.esper.core.service.EPStatementHandleCallback; import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; import com.espertech.esper.epl.expression.core.ExprEvaluator; import com.espertech.esper.epl.expression.core.ExprNode; import com.espertech.esper.schedule.ScheduleHandleCallback; import com.espertech.esper.schedule.SchedulingService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * TimeCandlestickWindow runs a CandlestickCalc across ticks for a set * duration each window. */ class TimeCandlestickWindow implements CandlestickWindow { private static final Logger log = LoggerFactory.getLogger(TimeCandlestickWindow.class); private static final int LATE_EVENT_SLACK_SECONDS = 5; private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() .appendDays().appendSuffix("d") .appendHours().appendSuffix("h") .appendMinutes().appendSuffix("m") .appendSeconds().appendSuffix("s") .toFormatter(); private final AgentInstanceViewFactoryChainContext context; private final long scheduleSlot; private EPStatementHandleCallback handle; private final Duration interval; private OHLCPlugInView plugin; private CandlestickCalc calc; private DateTime windowStartTime; private DateTime windowEndTime; private DateTime lastStartTime; /** * Create a window over the given duration. */ public TimeCandlestickWindow(OHLCPlugInView plugin, CandlestickCalc calc, AgentInstanceViewFactoryChainContext context, ExprNode intervalExpression) { this.plugin = plugin; this.calc = calc; this.context = context; scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); interval = parseInterval(intervalExpression); } /** * Process a tick with the given time and value. */ public void update(DateTime timestamp, double value) { ensureWindow(timestamp); calc.applyValue(value); } /** * Return the time period specified by the given expression value. */ private Duration parseInterval(ExprNode interval) { ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); String intervalStr = (String)evaluator.evaluate(null, true, context); return parseInterval(intervalStr); } /** * Return the time period specified by the given string. */ public static Duration parseInterval(String interval) { interval = interval.replaceAll("\\s+",""); return periodFormatter.parsePeriod(interval).toStandardDuration(); } /** * Make sure our window times are set up and current. */ public void ensureWindow(DateTime timestamp) { if (timestamp == null) return; if (windowStartTime == null) { // create open window windowStartTime = makeWindowStartTime(timestamp, interval); windowEndTime = makeWindowEndTime(windowStartTime, interval); scheduleCallback(windowEndTime); } if (!inWindow(timestamp)) { // past current window. // post and create a new one. postData(); windowStartTime = makeWindowStartTime(timestamp, interval); windowEndTime = makeWindowEndTime(windowStartTime, interval); scheduleCallback(windowEndTime); } } public static DateTime makeWindowStartTime(DateTime timestamp, Duration interval) { DateTime today = timestamp.withTimeAtStartOfDay(); // log.info("Timestamp is {}", timestamp); // log.info("Day start is {}", today); Duration intoToday = new Duration(today, timestamp); // calc how far into the current window we are long intoPeriod = intoToday.getMillis() % interval.getMillis(); return timestamp.minus(intoPeriod); } private static DateTime makeWindowEndTime(DateTime startTime, Duration interval) { if (startTime == null || interval == null) return null; return startTime.plus(interval.getMillis()); } /** * Return true if the timestamp is within the current time window. */ private boolean inWindow(DateTime timestamp) { if (timestamp == null) return false; return timestamp.compareTo(windowStartTime) >= 0 && timestamp.compareTo(windowEndTime) < 0; } /** * Convert a bare long value to a proper DateTime entity. Assumes * UTC time zone. */ public static DateTime toDateTime(long l) { return new DateTime(l, DateTimeZone.UTC); } /** * Set up a callback to post an event when our time window expires. */ private void scheduleCallback(DateTime endTime) { SchedulingService sched = context.getStatementContext().getSchedulingService(); if (handle != null) { // remove old schedule // log.info("Removing old callback"); sched.remove(handle, scheduleSlot); handle = null; } DateTime currentTime = toDateTime(sched.getTime()); DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); long callbackTime = targetTime.getMillis() - currentTime.getMillis(); ScheduleHandleCallback callback = new ScheduleHandleCallback() { public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { handle = null; // clear out schedule handle // log.info("Callback running"); TimeCandlestickWindow.this.postData(); } }; handle = new EPStatementHandleCallback(context.getEpStatementAgentInstanceHandle(), callback); sched.add(callbackTime, handle, scheduleSlot); // log.info("Scheduled callback for {}", callbackTime); } /** * Update listeners with our new value. */ private void postData() { if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { log.warn("DUP START TIME"); } // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); OHLCValues values = calc.getValues(); if (values == null) { // log.info("No data to post"); return; } plugin.postEvent(new OHLCEvent(windowStartTime, values)); calc.reset(); lastStartTime = windowStartTime; } }