292 lines
9.5 KiB
Java
292 lines
9.5 KiB
Java
package ats.plugin;
|
|
|
|
import com.espertech.esper.client.EventBean;
|
|
import com.espertech.esper.client.EventType;
|
|
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
|
import com.espertech.esper.core.service.EPStatementHandleCallback;
|
|
import com.espertech.esper.core.service.EngineLevelExtensionServicesContext;
|
|
import com.espertech.esper.epl.expression.core.ExprNode;
|
|
import com.espertech.esper.event.EventAdapterService;
|
|
import com.espertech.esper.schedule.ScheduleHandleCallback;
|
|
import com.espertech.esper.view.ViewSupport;
|
|
import java.util.Calendar;
|
|
import java.util.GregorianCalendar;
|
|
import java.util.Iterator;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.joda.time.Period;
|
|
import org.joda.time.format.PeriodFormatter;
|
|
import org.joda.time.format.PeriodFormatterBuilder;
|
|
import com.espertech.esper.epl.expression.core.ExprEvaluator;
|
|
import com.espertech.esper.schedule.SchedulingService;
|
|
import org.joda.time.Duration;
|
|
import org.joda.time.format.ISODateTimeFormat;
|
|
import java.util.TimeZone;
|
|
import org.joda.time.DateTime;
|
|
import org.joda.time.DateTimeZone;
|
|
import org.joda.time.Instant;
|
|
import org.joda.time.PeriodType;
|
|
|
|
/**
|
|
* OHLCPlugInView computes OHLC bars for a given time interval.
|
|
*/
|
|
public class OHLCPlugInView extends ViewSupport {
|
|
private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class);
|
|
private static final int LATE_EVENT_SLACK_SECONDS = 5;
|
|
private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder()
|
|
.appendDays().appendSuffix("d ")
|
|
.appendHours().appendSuffix("h ")
|
|
.appendMinutes().appendSuffix("m ")
|
|
.appendSeconds().appendSuffix("s")
|
|
.toFormatter();
|
|
|
|
private final AgentInstanceViewFactoryChainContext agentContext;
|
|
private final long scheduleSlot;
|
|
private EPStatementHandleCallback handle;
|
|
|
|
private final Duration interval;
|
|
private final ExprNode timestampExpression;
|
|
private final ExprNode valueExpression;
|
|
|
|
private DateTime windowStartTime;
|
|
private DateTime windowEndTime;
|
|
private Double open;
|
|
private Double close;
|
|
private Double high;
|
|
private Double low;
|
|
private EventBean[] lastData;
|
|
|
|
|
|
public OHLCPlugInView(AgentInstanceViewFactoryChainContext context,
|
|
ExprNode intervalExpression,
|
|
ExprNode timestampExpression,
|
|
ExprNode valueExpression)
|
|
{
|
|
agentContext = context;
|
|
scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot();
|
|
|
|
interval = parseInterval(intervalExpression);
|
|
// log.info("Interval is {}", interval);
|
|
|
|
this.timestampExpression = timestampExpression;
|
|
this.valueExpression = valueExpression;
|
|
}
|
|
|
|
/**
|
|
* Return the time period specified by the given expression value.
|
|
*/
|
|
private Duration parseInterval(ExprNode interval) {
|
|
ExprEvaluator evaluator = interval.getForge().getExprEvaluator();
|
|
String intervalStr = (String)evaluator.evaluate(null, true, agentContext);
|
|
return periodFormatter.parsePeriod(intervalStr).toStandardDuration();
|
|
}
|
|
|
|
/**
|
|
* Return the timestamp for the event.
|
|
*/
|
|
private DateTime getTimestamp(EventBean event) {
|
|
ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator();
|
|
return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
|
}
|
|
|
|
/**
|
|
* Convert a bare long value to a proper DateTime entity. Assumes
|
|
* UTC time zone.
|
|
*/
|
|
public static DateTime toDateTime(long l) {
|
|
return new DateTime(l, DateTimeZone.UTC);
|
|
}
|
|
|
|
/**
|
|
* Return the value for the event.
|
|
*/
|
|
private double getValue(EventBean event) {
|
|
ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator();
|
|
return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
|
}
|
|
|
|
/**
|
|
* Notify that data has been added or removed from the Viewable parent.
|
|
*/
|
|
@Override
|
|
public void update(EventBean[] newData, EventBean[] oldData) {
|
|
if (newData == null) return;
|
|
|
|
for (EventBean event : newData) {
|
|
DateTime timestamp = getTimestamp(event);
|
|
double value = getValue(event);
|
|
|
|
ensureWindow(timestamp);
|
|
applyValue(value);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Make sure our window times are set up and current.
|
|
*/
|
|
private void ensureWindow(DateTime timestamp) {
|
|
if (timestamp == null) return;
|
|
|
|
if (windowStartTime == null) {
|
|
// create open window
|
|
windowStartTime = makeWindowStartTime(timestamp, interval);
|
|
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
|
scheduleCallback(windowEndTime);
|
|
}
|
|
|
|
if (!inWindow(timestamp)) {
|
|
// past current window.
|
|
// post and create a new one.
|
|
postData();
|
|
|
|
windowStartTime = makeWindowStartTime(timestamp, interval);
|
|
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
|
scheduleCallback(windowEndTime);
|
|
}
|
|
}
|
|
|
|
public static DateTime makeWindowStartTime(DateTime timestamp,
|
|
Duration interval)
|
|
{
|
|
DateTime today = timestamp.withTimeAtStartOfDay();
|
|
// log.info("Timestamp is {}", timestamp);
|
|
// log.info("Day start is {}", today);
|
|
|
|
Duration intoToday = new Duration(today, timestamp);
|
|
|
|
// calc how far into the current window we are
|
|
long intoPeriod = intoToday.getMillis() % interval.getMillis();
|
|
|
|
return timestamp.minus(intoPeriod);
|
|
}
|
|
|
|
private static DateTime makeWindowEndTime(DateTime startTime,
|
|
Duration interval)
|
|
{
|
|
if (startTime == null || interval == null) return null;
|
|
|
|
return startTime.plus(interval.getMillis());
|
|
}
|
|
|
|
/**
|
|
* Return true if the timestamp is within the current time window.
|
|
*/
|
|
private boolean inWindow(DateTime timestamp) {
|
|
if (timestamp == null) return false;
|
|
|
|
return timestamp.compareTo(windowStartTime) >= 0 &&
|
|
timestamp.compareTo(windowEndTime) < 0;
|
|
}
|
|
|
|
private void applyValue(double value) {
|
|
if (open == null) {
|
|
open = value;
|
|
}
|
|
|
|
close = value;
|
|
|
|
if (low == null) {
|
|
low = value;
|
|
} else if (low.compareTo(value) > 0) {
|
|
low = value;
|
|
}
|
|
|
|
if (high == null) {
|
|
high = value;
|
|
} else if (high.compareTo(value) < 0) {
|
|
high = value;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Set up a callback to post an event when our time window expires.
|
|
*/
|
|
private void scheduleCallback(DateTime endTime) {
|
|
SchedulingService sched = agentContext.getStatementContext().getSchedulingService();
|
|
if (handle != null) {
|
|
// remove old schedule
|
|
// log.info("Removing old callback");
|
|
sched.remove(handle, scheduleSlot);
|
|
handle = null;
|
|
}
|
|
|
|
DateTime currentTime = toDateTime(sched.getTime());
|
|
DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS);
|
|
long callbackTime = targetTime.getMillis() - currentTime.getMillis();
|
|
|
|
ScheduleHandleCallback callback = new ScheduleHandleCallback() {
|
|
public void scheduledTrigger(EngineLevelExtensionServicesContext esc) {
|
|
handle = null; // clear out schedule handle
|
|
// log.info("Callback running");
|
|
OHLCPlugInView.this.postData();
|
|
}
|
|
};
|
|
|
|
handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback);
|
|
sched.add(callbackTime, handle, scheduleSlot);
|
|
// log.info("Scheduled callback for {}", callbackTime);
|
|
}
|
|
|
|
DateTime lastStartTime;
|
|
|
|
/**
|
|
* Update listeners with our new value.
|
|
*/
|
|
private void postData() {
|
|
if (open == null) {
|
|
// log.info("No data to post");
|
|
return;
|
|
}
|
|
|
|
if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) {
|
|
log.warn("DUP START TIME");
|
|
}
|
|
|
|
// log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread());
|
|
|
|
OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close);
|
|
|
|
// send
|
|
EventAdapterService service = agentContext.getStatementContext().getEventAdapterService();
|
|
EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)};
|
|
updateChildren(newBeans, lastData);
|
|
|
|
// reset for next post
|
|
lastData = newBeans;
|
|
open = null;
|
|
close = null;
|
|
high = null;
|
|
low = null;
|
|
|
|
lastStartTime = windowStartTime;
|
|
}
|
|
|
|
//
|
|
// for the EventCollection interface
|
|
//
|
|
|
|
/**
|
|
* Provides metadata information about the type of object the
|
|
* event collection contains.
|
|
*/
|
|
@Override
|
|
public EventType getEventType() {
|
|
return getEventType(agentContext.getStatementContext().getEventAdapterService());
|
|
}
|
|
|
|
protected static EventType getEventType(EventAdapterService service)
|
|
{
|
|
return service.addBeanType(OHLCEvent.class.getName(),
|
|
OHLCEvent.class,
|
|
false, false, false);
|
|
}
|
|
|
|
/**
|
|
* (Don't) allow iteration through all elements in this event collection.
|
|
*/
|
|
@Override
|
|
public Iterator<EventBean> iterator() {
|
|
throw new UnsupportedOperationException("Can't iterate.");
|
|
}
|
|
}
|