package ats.plugin; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; import com.espertech.esper.core.service.EPStatementHandleCallback; import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; import com.espertech.esper.epl.expression.core.ExprNode; import com.espertech.esper.event.EventAdapterService; import com.espertech.esper.schedule.ScheduleHandleCallback; import com.espertech.esper.view.ViewSupport; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; import com.espertech.esper.epl.expression.core.ExprEvaluator; import com.espertech.esper.schedule.SchedulingService; import org.joda.time.Duration; import org.joda.time.format.ISODateTimeFormat; import java.util.TimeZone; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Instant; import org.joda.time.PeriodType; /** * OHLCPlugInView computes OHLC bars for a given time interval. */ public class OHLCPlugInView extends ViewSupport { private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class); private static final int LATE_EVENT_SLACK_SECONDS = 5; private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() .appendDays().appendSuffix("d ") .appendHours().appendSuffix("h ") .appendMinutes().appendSuffix("m ") .appendSeconds().appendSuffix("s") .toFormatter(); private final AgentInstanceViewFactoryChainContext agentContext; private final long scheduleSlot; private EPStatementHandleCallback handle; private final Duration interval; private final ExprNode timestampExpression; private final ExprNode valueExpression; private DateTime windowStartTime; private DateTime windowEndTime; private Double open; private Double close; private Double high; private Double low; private EventBean[] lastData; public OHLCPlugInView(AgentInstanceViewFactoryChainContext context, ExprNode intervalExpression, ExprNode timestampExpression, ExprNode valueExpression) { agentContext = context; scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); interval = parseInterval(intervalExpression); log.info("Interval is {}", interval); this.timestampExpression = timestampExpression; this.valueExpression = valueExpression; } /** * Return the time period specified by the given expression value. */ private Duration parseInterval(ExprNode interval) { ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); String intervalStr = (String)evaluator.evaluate(null, true, agentContext); return periodFormatter.parsePeriod(intervalStr).toStandardDuration(); } /** * Return the timestamp for the event. */ private DateTime getTimestamp(EventBean event) { ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator(); return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext); } /** * Convert a bare long value to a proper DateTime entity. Assumes * UTC time zone. */ public static DateTime toDateTime(long l) { return new DateTime(l, DateTimeZone.UTC); } /** * Return the value for the event. */ private double getValue(EventBean event) { ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator(); return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext); } /** * Notify that data has been added or removed from the Viewable parent. */ @Override public void update(EventBean[] newData, EventBean[] oldData) { if (newData == null) return; for (EventBean event : newData) { DateTime timestamp = getTimestamp(event); double value = getValue(event); ensureWindow(timestamp); applyValue(value); } } /** * Make sure our window times are set up and current. */ private void ensureWindow(DateTime timestamp) { if (timestamp == null) return; if (windowStartTime == null) { // create open window windowStartTime = makeWindowStartTime(timestamp, interval); windowEndTime = makeWindowEndTime(windowStartTime, interval); scheduleCallback(windowEndTime); } if (!inWindow(timestamp)) { // past current window. // post and create a new one. postData(); windowStartTime = makeWindowStartTime(timestamp, interval); windowEndTime = makeWindowEndTime(windowStartTime, interval); scheduleCallback(windowEndTime); } } public static DateTime makeWindowStartTime(DateTime timestamp, Duration interval) { DateTime today = timestamp.withTimeAtStartOfDay(); // log.info("Timestamp is {}", timestamp); // log.info("Day start is {}", today); Duration intoToday = new Duration(today, timestamp); // then modulo timestamp to find how far into the current window we'd be in long intoPeriod = intoToday.getMillis() % interval.getMillis(); // then subtract modulo from current time to get window start return timestamp.minus(intoPeriod); } private static DateTime makeWindowEndTime(DateTime startTime, Duration interval) { if (startTime == null || interval == null) return null; return startTime.plus(interval.getMillis()); } /** * Return true if the timestamp is within the current time window. */ private boolean inWindow(DateTime timestamp) { if (timestamp == null) return false; return timestamp.compareTo(windowStartTime) >= 0 && timestamp.compareTo(windowEndTime) < 0; } private void applyValue(double value) { if (open == null) { open = value; } close = value; if (low == null) { low = value; } else if (low.compareTo(value) > 0) { low = value; } if (high == null) { high = value; } else if (high.compareTo(value) < 0) { high = value; } } /** * Set up a callback to post an event when our time window expires. */ private void scheduleCallback(DateTime endTime) { SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); if (handle != null) { // remove old schedule sched.remove(handle, scheduleSlot); handle = null; } DateTime currentTime = toDateTime(sched.getTime()); DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); long callbackTime = targetTime.getMillis() - currentTime.getMillis(); ScheduleHandleCallback callback = new ScheduleHandleCallback() { public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { handle = null; // clear out schedule handle OHLCPlugInView.this.postData(); } }; handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); sched.add(callbackTime, handle, scheduleSlot); } /** * Update listeners with our new value. */ private void postData() { if (open == null) return; OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close); // send EventAdapterService service = agentContext.getStatementContext().getEventAdapterService(); EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)}; updateChildren(newBeans, lastData); // reset for next post lastData = newBeans; open = null; close = null; high = null; low = null; } // // for the EventCollection interface // /** * Provides metadata information about the type of object the * event collection contains. */ @Override public EventType getEventType() { return getEventType(agentContext.getStatementContext().getEventAdapterService()); } protected static EventType getEventType(EventAdapterService service) { return service.addBeanType(OHLCEvent.class.getName(), OHLCEvent.class, false, false, false); } /** * (Don't) allow iteration through all elements in this event collection. */ @Override public Iterator iterator() { throw new UnsupportedOperationException("Can't iterate."); } }