add OHLC plugin
This commit is contained in:
275
src/main/java/ats/plugin/OHLCPlugInView.java
Normal file
275
src/main/java/ats/plugin/OHLCPlugInView.java
Normal file
@ -0,0 +1,275 @@
|
||||
package ats.plugin;
|
||||
|
||||
import com.espertech.esper.client.EventBean;
|
||||
import com.espertech.esper.client.EventType;
|
||||
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
||||
import com.espertech.esper.core.service.EPStatementHandleCallback;
|
||||
import com.espertech.esper.core.service.EngineLevelExtensionServicesContext;
|
||||
import com.espertech.esper.epl.expression.core.ExprNode;
|
||||
import com.espertech.esper.event.EventAdapterService;
|
||||
import com.espertech.esper.schedule.ScheduleHandleCallback;
|
||||
import com.espertech.esper.view.ViewSupport;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.Iterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.joda.time.Period;
|
||||
import org.joda.time.format.PeriodFormatter;
|
||||
import org.joda.time.format.PeriodFormatterBuilder;
|
||||
import com.espertech.esper.epl.expression.core.ExprEvaluator;
|
||||
import com.espertech.esper.schedule.SchedulingService;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
import java.util.TimeZone;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Instant;
|
||||
import org.joda.time.PeriodType;
|
||||
|
||||
/**
|
||||
* OHLCPlugInView computes OHLC bars for a given time interval.
|
||||
*/
|
||||
public class OHLCPlugInView extends ViewSupport {
|
||||
private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class);
|
||||
private static final int LATE_EVENT_SLACK_SECONDS = 5;
|
||||
private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder()
|
||||
.appendDays().appendSuffix("d ")
|
||||
.appendHours().appendSuffix("h ")
|
||||
.appendMinutes().appendSuffix("m ")
|
||||
.appendSeconds().appendSuffix("s")
|
||||
.toFormatter();
|
||||
|
||||
private final AgentInstanceViewFactoryChainContext agentContext;
|
||||
private final long scheduleSlot;
|
||||
private EPStatementHandleCallback handle;
|
||||
|
||||
private final Duration interval;
|
||||
private final ExprNode timestampExpression;
|
||||
private final ExprNode valueExpression;
|
||||
|
||||
private DateTime windowStartTime;
|
||||
private DateTime windowEndTime;
|
||||
private Double open;
|
||||
private Double close;
|
||||
private Double high;
|
||||
private Double low;
|
||||
private EventBean[] lastData;
|
||||
|
||||
|
||||
public OHLCPlugInView(AgentInstanceViewFactoryChainContext context,
|
||||
ExprNode intervalExpression,
|
||||
ExprNode timestampExpression,
|
||||
ExprNode valueExpression)
|
||||
{
|
||||
agentContext = context;
|
||||
scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot();
|
||||
|
||||
interval = parseInterval(intervalExpression);
|
||||
log.info("Interval is {}", interval);
|
||||
|
||||
this.timestampExpression = timestampExpression;
|
||||
this.valueExpression = valueExpression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the time period specified by the given expression value.
|
||||
*/
|
||||
private Duration parseInterval(ExprNode interval) {
|
||||
ExprEvaluator evaluator = interval.getForge().getExprEvaluator();
|
||||
String intervalStr = (String)evaluator.evaluate(null, true, agentContext);
|
||||
return periodFormatter.parsePeriod(intervalStr).toStandardDuration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the timestamp for the event.
|
||||
*/
|
||||
private DateTime getTimestamp(EventBean event) {
|
||||
ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator();
|
||||
//Long l = (Long)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
||||
return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
||||
//return toDateTime(l);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a bare long value to a proper DateTime entity. Assumes
|
||||
* UTC time zone.
|
||||
*/
|
||||
public static DateTime toDateTime(long l) {
|
||||
return new DateTime(l, DateTimeZone.UTC);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the value for the event.
|
||||
*/
|
||||
private double getValue(EventBean event) {
|
||||
ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator();
|
||||
return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that data has been added or removed from the Viewable parent.
|
||||
*/
|
||||
@Override
|
||||
public void update(EventBean[] newData, EventBean[] oldData) {
|
||||
if (newData == null) return;
|
||||
|
||||
for (EventBean event : newData) {
|
||||
DateTime timestamp = getTimestamp(event);
|
||||
double value = getValue(event);
|
||||
|
||||
ensureWindow(timestamp);
|
||||
applyValue(value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure our window times are set up and current.
|
||||
*/
|
||||
private void ensureWindow(DateTime timestamp) {
|
||||
if (timestamp == null) return;
|
||||
|
||||
if (windowStartTime == null) {
|
||||
// create open window
|
||||
windowStartTime = makeWindowStartTime(timestamp, interval);
|
||||
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
||||
}
|
||||
|
||||
if (!inWindow(timestamp)) {
|
||||
// past current window.
|
||||
// post and create a new one.
|
||||
postData();
|
||||
scheduleCallback();
|
||||
|
||||
windowStartTime = makeWindowStartTime(timestamp, interval);
|
||||
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
||||
}
|
||||
}
|
||||
|
||||
public static DateTime makeWindowStartTime(DateTime timestamp,
|
||||
Duration interval)
|
||||
{
|
||||
DateTime today = timestamp.withTimeAtStartOfDay();
|
||||
// log.info("Timestamp is {}", timestamp);
|
||||
// log.info("Day start is {}", today);
|
||||
|
||||
Duration intoToday = new Duration(today, timestamp);
|
||||
|
||||
// then modulo timestamp to find how far into the current window we'd be in
|
||||
long intoPeriod = intoToday.getMillis() % interval.getMillis();
|
||||
|
||||
// then subtract modulo from current time to get window start
|
||||
return timestamp.minus(intoPeriod);
|
||||
}
|
||||
|
||||
private static DateTime makeWindowEndTime(DateTime startTime, Duration interval) {
|
||||
if (startTime == null || interval == null) return null;
|
||||
|
||||
return startTime.plus(interval.getMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the timestamp is within the current time window.
|
||||
*/
|
||||
private boolean inWindow(DateTime timestamp) {
|
||||
if (timestamp == null) return false;
|
||||
|
||||
return timestamp.compareTo(windowStartTime) >= 0 &&
|
||||
timestamp.compareTo(windowEndTime) < 0;
|
||||
}
|
||||
|
||||
private void applyValue(double value) {
|
||||
if (open == null) {
|
||||
open = value;
|
||||
}
|
||||
|
||||
close = value;
|
||||
|
||||
if (low == null) {
|
||||
low = value;
|
||||
} else if (low.compareTo(value) > 0) {
|
||||
low = value;
|
||||
}
|
||||
|
||||
if (high == null) {
|
||||
high = value;
|
||||
} else if (high.compareTo(value) < 0) {
|
||||
high = value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up a callback to post an event when our time window expires.
|
||||
*/
|
||||
private void scheduleCallback() {
|
||||
SchedulingService sched = agentContext.getStatementContext().getSchedulingService();
|
||||
if (handle != null) {
|
||||
// remove old schedule
|
||||
sched.remove(handle, scheduleSlot);
|
||||
handle = null;
|
||||
}
|
||||
|
||||
DateTime currentTime = toDateTime(sched.getTime());
|
||||
DateTime targetTime = windowEndTime.plusSeconds(LATE_EVENT_SLACK_SECONDS);
|
||||
long callbackTime = targetTime.getMillis() - currentTime.getMillis();
|
||||
|
||||
ScheduleHandleCallback callback = new ScheduleHandleCallback() {
|
||||
public void scheduledTrigger(EngineLevelExtensionServicesContext esc) {
|
||||
handle = null; // clear out schedule handle
|
||||
OHLCPlugInView.this.postData();
|
||||
}
|
||||
};
|
||||
|
||||
handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback);
|
||||
sched.add(callbackTime, handle, scheduleSlot);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update listeners with our new value.
|
||||
*/
|
||||
private void postData() {
|
||||
if (open == null) return;
|
||||
|
||||
OHLCValue value = new OHLCValue(windowStartTime, open, high, low, close);
|
||||
|
||||
// send
|
||||
EventAdapterService service = agentContext.getStatementContext().getEventAdapterService();
|
||||
EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)};
|
||||
updateChildren(newBeans, lastData);
|
||||
|
||||
// reset for next post
|
||||
lastData = newBeans;
|
||||
open = null;
|
||||
close = null;
|
||||
high = null;
|
||||
low = null;
|
||||
}
|
||||
|
||||
//
|
||||
// for the EventCollection interface
|
||||
//
|
||||
|
||||
/**
|
||||
* Provides metadata information about the type of object the
|
||||
* event collection contains.
|
||||
*/
|
||||
@Override
|
||||
public EventType getEventType() {
|
||||
return getEventType(agentContext.getStatementContext().getEventAdapterService());
|
||||
}
|
||||
|
||||
protected static EventType getEventType(EventAdapterService service)
|
||||
{
|
||||
return service.addBeanType(OHLCValue.class.getName(),
|
||||
OHLCValue.class,
|
||||
false, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* (Don't) allow iteration through all elements in this event collection.
|
||||
*/
|
||||
@Override
|
||||
public Iterator<EventBean> iterator() {
|
||||
throw new UnsupportedOperationException("Can't iterate.");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user