Move time and OHLC value calc into separate files
This commit is contained in:
@ -69,7 +69,7 @@ create variant schema OHLCStream as OHLCEvent
|
|||||||
-- TickEvent.time ("time") as the source of the timestamp, and uses
|
-- TickEvent.time ("time") as the source of the timestamp, and uses
|
||||||
-- TickEvent.midDouble() as the value to use in the OHLC calculation.
|
-- TickEvent.midDouble() as the value to use in the OHLC calculation.
|
||||||
insert into OHLCStream
|
insert into OHLCStream
|
||||||
select * from TickEvent#OHLC(OHLCInterval, time, midDouble)
|
select * from TickEvent#OHLC("OHLC", "time", OHLCInterval, time, midDouble)
|
||||||
|
|
||||||
|
|
||||||
--
|
--
|
||||||
|
|||||||
24
src/main/java/ats/plugin/CandlestickCalc.java
Normal file
24
src/main/java/ats/plugin/CandlestickCalc.java
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package ats.plugin;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CandlestickCalc embodies a method of calculating candlestick
|
||||||
|
* values. For example, OHLC or Heikin-Ashi.
|
||||||
|
*/
|
||||||
|
interface CandlestickCalc {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the calculation for the beginning of an interval.
|
||||||
|
*/
|
||||||
|
void reset();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept a current tick value to apply to the current
|
||||||
|
* calculations.
|
||||||
|
*/
|
||||||
|
void applyValue(double value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the values calculated since the last reset point.
|
||||||
|
*/
|
||||||
|
OHLCValues getValues();
|
||||||
|
}
|
||||||
194
src/main/java/ats/plugin/DurationWindow.java
Normal file
194
src/main/java/ats/plugin/DurationWindow.java
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
package ats.plugin;
|
||||||
|
|
||||||
|
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
||||||
|
import com.espertech.esper.core.service.EPStatementHandleCallback;
|
||||||
|
import com.espertech.esper.core.service.EngineLevelExtensionServicesContext;
|
||||||
|
import com.espertech.esper.epl.expression.core.ExprEvaluator;
|
||||||
|
import com.espertech.esper.epl.expression.core.ExprNode;
|
||||||
|
import com.espertech.esper.schedule.ScheduleHandleCallback;
|
||||||
|
import com.espertech.esper.schedule.SchedulingService;
|
||||||
|
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.DateTimeZone;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
import org.joda.time.format.PeriodFormatter;
|
||||||
|
import org.joda.time.format.PeriodFormatterBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DurationWindow TODO
|
||||||
|
*/
|
||||||
|
class DurationWindow {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(DurationWindow.class);
|
||||||
|
private static final int LATE_EVENT_SLACK_SECONDS = 5;
|
||||||
|
private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder()
|
||||||
|
.appendDays().appendSuffix("d")
|
||||||
|
.appendHours().appendSuffix("h")
|
||||||
|
.appendMinutes().appendSuffix("m")
|
||||||
|
.appendSeconds().appendSuffix("s")
|
||||||
|
.toFormatter();
|
||||||
|
private final AgentInstanceViewFactoryChainContext agentContext;
|
||||||
|
private final long scheduleSlot;
|
||||||
|
private EPStatementHandleCallback handle;
|
||||||
|
private final Duration interval;
|
||||||
|
private OHLCPlugInView plugin;
|
||||||
|
private CandlestickCalc calc;
|
||||||
|
private DateTime windowStartTime;
|
||||||
|
private DateTime windowEndTime;
|
||||||
|
private DateTime lastStartTime;
|
||||||
|
|
||||||
|
|
||||||
|
public DurationWindow(OHLCPlugInView plugin,
|
||||||
|
CandlestickCalc calc,
|
||||||
|
AgentInstanceViewFactoryChainContext context,
|
||||||
|
ExprNode intervalExpression)
|
||||||
|
{
|
||||||
|
this.plugin = plugin;
|
||||||
|
this.calc = calc;
|
||||||
|
agentContext = context;
|
||||||
|
scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot();
|
||||||
|
interval = parseInterval(intervalExpression);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a tick with the given time and value.
|
||||||
|
*/
|
||||||
|
public void update(DateTime timestamp, double value) {
|
||||||
|
ensureWindow(timestamp);
|
||||||
|
calc.applyValue(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the time period specified by the given expression value.
|
||||||
|
*/
|
||||||
|
private Duration parseInterval(ExprNode interval) {
|
||||||
|
ExprEvaluator evaluator = interval.getForge().getExprEvaluator();
|
||||||
|
String intervalStr = (String)evaluator.evaluate(null, true, agentContext);
|
||||||
|
return parseInterval(intervalStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the time period specified by the given string.
|
||||||
|
*/
|
||||||
|
public static Duration parseInterval(String interval) {
|
||||||
|
interval = interval.replaceAll("\\s+","");
|
||||||
|
return periodFormatter.parsePeriod(interval).toStandardDuration();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure our window times are set up and current.
|
||||||
|
*/
|
||||||
|
public void ensureWindow(DateTime timestamp) {
|
||||||
|
if (timestamp == null) return;
|
||||||
|
|
||||||
|
if (windowStartTime == null) {
|
||||||
|
// create open window
|
||||||
|
windowStartTime = makeWindowStartTime(timestamp, interval);
|
||||||
|
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
||||||
|
scheduleCallback(windowEndTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!inWindow(timestamp)) {
|
||||||
|
// past current window.
|
||||||
|
// post and create a new one.
|
||||||
|
postData();
|
||||||
|
|
||||||
|
windowStartTime = makeWindowStartTime(timestamp, interval);
|
||||||
|
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
||||||
|
scheduleCallback(windowEndTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DateTime makeWindowStartTime(DateTime timestamp,
|
||||||
|
Duration interval)
|
||||||
|
{
|
||||||
|
DateTime today = timestamp.withTimeAtStartOfDay();
|
||||||
|
// log.info("Timestamp is {}", timestamp);
|
||||||
|
// log.info("Day start is {}", today);
|
||||||
|
|
||||||
|
Duration intoToday = new Duration(today, timestamp);
|
||||||
|
|
||||||
|
// calc how far into the current window we are
|
||||||
|
long intoPeriod = intoToday.getMillis() % interval.getMillis();
|
||||||
|
|
||||||
|
return timestamp.minus(intoPeriod);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DateTime makeWindowEndTime(DateTime startTime,
|
||||||
|
Duration interval)
|
||||||
|
{
|
||||||
|
if (startTime == null || interval == null) return null;
|
||||||
|
|
||||||
|
return startTime.plus(interval.getMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the timestamp is within the current time window.
|
||||||
|
*/
|
||||||
|
private boolean inWindow(DateTime timestamp) {
|
||||||
|
if (timestamp == null) return false;
|
||||||
|
|
||||||
|
return timestamp.compareTo(windowStartTime) >= 0 &&
|
||||||
|
timestamp.compareTo(windowEndTime) < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a bare long value to a proper DateTime entity. Assumes
|
||||||
|
* UTC time zone.
|
||||||
|
*/
|
||||||
|
public static DateTime toDateTime(long l) {
|
||||||
|
return new DateTime(l, DateTimeZone.UTC);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up a callback to post an event when our time window expires.
|
||||||
|
*/
|
||||||
|
private void scheduleCallback(DateTime endTime) {
|
||||||
|
SchedulingService sched = agentContext.getStatementContext().getSchedulingService();
|
||||||
|
if (handle != null) {
|
||||||
|
// remove old schedule
|
||||||
|
// log.info("Removing old callback");
|
||||||
|
sched.remove(handle, scheduleSlot);
|
||||||
|
handle = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
DateTime currentTime = toDateTime(sched.getTime());
|
||||||
|
DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS);
|
||||||
|
long callbackTime = targetTime.getMillis() - currentTime.getMillis();
|
||||||
|
|
||||||
|
ScheduleHandleCallback callback = new ScheduleHandleCallback() {
|
||||||
|
public void scheduledTrigger(EngineLevelExtensionServicesContext esc) {
|
||||||
|
handle = null; // clear out schedule handle
|
||||||
|
// log.info("Callback running");
|
||||||
|
DurationWindow.this.postData();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback);
|
||||||
|
sched.add(callbackTime, handle, scheduleSlot);
|
||||||
|
// log.info("Scheduled callback for {}", callbackTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update listeners with our new value.
|
||||||
|
*/
|
||||||
|
private void postData() {
|
||||||
|
if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) {
|
||||||
|
log.warn("DUP START TIME");
|
||||||
|
}
|
||||||
|
|
||||||
|
// log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread());
|
||||||
|
|
||||||
|
OHLCValues values = calc.getValues();
|
||||||
|
if (values == null) {
|
||||||
|
// log.info("No data to post");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin.postEvent(new OHLCEvent(windowStartTime, values));
|
||||||
|
|
||||||
|
calc.reset();
|
||||||
|
lastStartTime = windowStartTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
56
src/main/java/ats/plugin/OHLCCandlestickCalc.java
Normal file
56
src/main/java/ats/plugin/OHLCCandlestickCalc.java
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
package ats.plugin;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OHLCCandlestickCalc calculates values for OHLC.
|
||||||
|
*/
|
||||||
|
class OHLCCandlestickCalc implements CandlestickCalc {
|
||||||
|
private Double open;
|
||||||
|
private Double close;
|
||||||
|
private Double high;
|
||||||
|
private Double low;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the calculation for the beginning of an interval.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
open = null;
|
||||||
|
close = null;
|
||||||
|
high = null;
|
||||||
|
low = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept a current tick value to apply to the current
|
||||||
|
* calculations.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void applyValue(double value) {
|
||||||
|
if (open == null) {
|
||||||
|
open = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
close = value;
|
||||||
|
|
||||||
|
if (low == null) {
|
||||||
|
low = value;
|
||||||
|
} else if (low.compareTo(value) > 0) {
|
||||||
|
low = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (high == null) {
|
||||||
|
high = value;
|
||||||
|
} else if (high.compareTo(value) < 0) {
|
||||||
|
high = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return our calculated values up to now.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public OHLCValues getValues() {
|
||||||
|
return new OHLCValues(open, high, low, close);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -13,6 +13,10 @@ public class OHLCEvent {
|
|||||||
private double close;
|
private double close;
|
||||||
|
|
||||||
|
|
||||||
|
public OHLCEvent(DateTime time, OHLCValues values) {
|
||||||
|
this(time, values.open, values.high, values.low, values.close);
|
||||||
|
}
|
||||||
|
|
||||||
public OHLCEvent(DateTime time,
|
public OHLCEvent(DateTime time,
|
||||||
double open, double high,
|
double open, double high,
|
||||||
double low, double close)
|
double low, double close)
|
||||||
|
|||||||
@ -1,92 +1,46 @@
|
|||||||
package ats.plugin;
|
package ats.plugin;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
import com.espertech.esper.client.EventBean;
|
import com.espertech.esper.client.EventBean;
|
||||||
import com.espertech.esper.client.EventType;
|
import com.espertech.esper.client.EventType;
|
||||||
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
||||||
import com.espertech.esper.core.service.EPStatementHandleCallback;
|
import com.espertech.esper.epl.expression.core.ExprEvaluator;
|
||||||
import com.espertech.esper.core.service.EngineLevelExtensionServicesContext;
|
|
||||||
import com.espertech.esper.epl.expression.core.ExprNode;
|
import com.espertech.esper.epl.expression.core.ExprNode;
|
||||||
import com.espertech.esper.event.EventAdapterService;
|
import com.espertech.esper.event.EventAdapterService;
|
||||||
import com.espertech.esper.schedule.ScheduleHandleCallback;
|
|
||||||
import com.espertech.esper.view.ViewSupport;
|
import com.espertech.esper.view.ViewSupport;
|
||||||
import java.util.Calendar;
|
|
||||||
import java.util.GregorianCalendar;
|
import org.joda.time.DateTime;
|
||||||
import java.util.Iterator;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.joda.time.Period;
|
|
||||||
import org.joda.time.format.PeriodFormatter;
|
|
||||||
import org.joda.time.format.PeriodFormatterBuilder;
|
|
||||||
import com.espertech.esper.epl.expression.core.ExprEvaluator;
|
|
||||||
import com.espertech.esper.schedule.SchedulingService;
|
|
||||||
import org.joda.time.Duration;
|
|
||||||
import org.joda.time.format.ISODateTimeFormat;
|
|
||||||
import java.util.TimeZone;
|
|
||||||
import org.joda.time.DateTime;
|
|
||||||
import org.joda.time.DateTimeZone;
|
|
||||||
import org.joda.time.Instant;
|
|
||||||
import org.joda.time.PeriodType;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* OHLCPlugInView computes OHLC bars for a given time interval.
|
* OHLCPlugInView computes OHLC bars for a given time interval.
|
||||||
*/
|
*/
|
||||||
public class OHLCPlugInView extends ViewSupport {
|
public class OHLCPlugInView extends ViewSupport {
|
||||||
private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class);
|
private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class);
|
||||||
private static final int LATE_EVENT_SLACK_SECONDS = 5;
|
private final AgentInstanceViewFactoryChainContext context;
|
||||||
private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder()
|
|
||||||
.appendDays().appendSuffix("d")
|
|
||||||
.appendHours().appendSuffix("h")
|
|
||||||
.appendMinutes().appendSuffix("m")
|
|
||||||
.appendSeconds().appendSuffix("s")
|
|
||||||
.toFormatter();
|
|
||||||
|
|
||||||
private final AgentInstanceViewFactoryChainContext agentContext;
|
|
||||||
private final long scheduleSlot;
|
|
||||||
private EPStatementHandleCallback handle;
|
|
||||||
|
|
||||||
private final Duration interval;
|
|
||||||
private final ExprNode timestampExpression;
|
private final ExprNode timestampExpression;
|
||||||
private final ExprNode valueExpression;
|
private final ExprNode valueExpression;
|
||||||
|
private DurationWindow window;
|
||||||
private DateTime windowStartTime;
|
private EventAdapterService service;
|
||||||
private DateTime windowEndTime;
|
|
||||||
private Double open;
|
|
||||||
private Double close;
|
|
||||||
private Double high;
|
|
||||||
private Double low;
|
|
||||||
private EventBean[] lastData;
|
private EventBean[] lastData;
|
||||||
|
|
||||||
|
|
||||||
public OHLCPlugInView(AgentInstanceViewFactoryChainContext context,
|
public OHLCPlugInView(AgentInstanceViewFactoryChainContext context,
|
||||||
|
ExprNode calcTypeExpression,
|
||||||
|
ExprNode timeTypeExpression,
|
||||||
ExprNode intervalExpression,
|
ExprNode intervalExpression,
|
||||||
ExprNode timestampExpression,
|
ExprNode timestampExpression,
|
||||||
ExprNode valueExpression)
|
ExprNode valueExpression)
|
||||||
{
|
{
|
||||||
agentContext = context;
|
this.context = context;
|
||||||
scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot();
|
|
||||||
|
|
||||||
interval = parseInterval(intervalExpression);
|
|
||||||
// log.info("Interval is {}", interval);
|
|
||||||
|
|
||||||
this.timestampExpression = timestampExpression;
|
this.timestampExpression = timestampExpression;
|
||||||
this.valueExpression = valueExpression;
|
this.valueExpression = valueExpression;
|
||||||
}
|
service = context.getStatementContext().getEventAdapterService();
|
||||||
|
|
||||||
/**
|
CandlestickCalc calc = new OHLCCandlestickCalc();
|
||||||
* Return the time period specified by the given expression value.
|
window = new DurationWindow(this, calc, context, intervalExpression);
|
||||||
*/
|
|
||||||
private Duration parseInterval(ExprNode interval) {
|
|
||||||
ExprEvaluator evaluator = interval.getForge().getExprEvaluator();
|
|
||||||
String intervalStr = (String)evaluator.evaluate(null, true, agentContext);
|
|
||||||
return parseInterval(intervalStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the time period specified by the given string.
|
|
||||||
*/
|
|
||||||
public static Duration parseInterval(String interval) {
|
|
||||||
interval = interval.replaceAll("\\s+","");
|
|
||||||
return periodFormatter.parsePeriod(interval).toStandardDuration();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -94,15 +48,7 @@ public class OHLCPlugInView extends ViewSupport {
|
|||||||
*/
|
*/
|
||||||
private DateTime getTimestamp(EventBean event) {
|
private DateTime getTimestamp(EventBean event) {
|
||||||
ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator();
|
ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator();
|
||||||
return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, context);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a bare long value to a proper DateTime entity. Assumes
|
|
||||||
* UTC time zone.
|
|
||||||
*/
|
|
||||||
public static DateTime toDateTime(long l) {
|
|
||||||
return new DateTime(l, DateTimeZone.UTC);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -110,7 +56,7 @@ public class OHLCPlugInView extends ViewSupport {
|
|||||||
*/
|
*/
|
||||||
private double getValue(EventBean event) {
|
private double getValue(EventBean event) {
|
||||||
ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator();
|
ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator();
|
||||||
return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext);
|
return (double)evaluator.evaluate(new EventBean[] {event}, true, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -123,150 +69,17 @@ public class OHLCPlugInView extends ViewSupport {
|
|||||||
for (EventBean event : newData) {
|
for (EventBean event : newData) {
|
||||||
DateTime timestamp = getTimestamp(event);
|
DateTime timestamp = getTimestamp(event);
|
||||||
double value = getValue(event);
|
double value = getValue(event);
|
||||||
|
window.update(timestamp, value);
|
||||||
ensureWindow(timestamp);
|
|
||||||
applyValue(value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make sure our window times are set up and current.
|
* Send an event to all plugin listeners.
|
||||||
*/
|
*/
|
||||||
private void ensureWindow(DateTime timestamp) {
|
public void postEvent(OHLCEvent event) {
|
||||||
if (timestamp == null) return;
|
EventBean[] newData = new EventBean[] {service.adapterForBean(event)};
|
||||||
|
updateChildren(newData, lastData);
|
||||||
if (windowStartTime == null) {
|
lastData = newData;
|
||||||
// create open window
|
|
||||||
windowStartTime = makeWindowStartTime(timestamp, interval);
|
|
||||||
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
|
||||||
scheduleCallback(windowEndTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!inWindow(timestamp)) {
|
|
||||||
// past current window.
|
|
||||||
// post and create a new one.
|
|
||||||
postData();
|
|
||||||
|
|
||||||
windowStartTime = makeWindowStartTime(timestamp, interval);
|
|
||||||
windowEndTime = makeWindowEndTime(windowStartTime, interval);
|
|
||||||
scheduleCallback(windowEndTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static DateTime makeWindowStartTime(DateTime timestamp,
|
|
||||||
Duration interval)
|
|
||||||
{
|
|
||||||
DateTime today = timestamp.withTimeAtStartOfDay();
|
|
||||||
// log.info("Timestamp is {}", timestamp);
|
|
||||||
// log.info("Day start is {}", today);
|
|
||||||
|
|
||||||
Duration intoToday = new Duration(today, timestamp);
|
|
||||||
|
|
||||||
// calc how far into the current window we are
|
|
||||||
long intoPeriod = intoToday.getMillis() % interval.getMillis();
|
|
||||||
|
|
||||||
return timestamp.minus(intoPeriod);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DateTime makeWindowEndTime(DateTime startTime,
|
|
||||||
Duration interval)
|
|
||||||
{
|
|
||||||
if (startTime == null || interval == null) return null;
|
|
||||||
|
|
||||||
return startTime.plus(interval.getMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if the timestamp is within the current time window.
|
|
||||||
*/
|
|
||||||
private boolean inWindow(DateTime timestamp) {
|
|
||||||
if (timestamp == null) return false;
|
|
||||||
|
|
||||||
return timestamp.compareTo(windowStartTime) >= 0 &&
|
|
||||||
timestamp.compareTo(windowEndTime) < 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void applyValue(double value) {
|
|
||||||
if (open == null) {
|
|
||||||
open = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
close = value;
|
|
||||||
|
|
||||||
if (low == null) {
|
|
||||||
low = value;
|
|
||||||
} else if (low.compareTo(value) > 0) {
|
|
||||||
low = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (high == null) {
|
|
||||||
high = value;
|
|
||||||
} else if (high.compareTo(value) < 0) {
|
|
||||||
high = value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set up a callback to post an event when our time window expires.
|
|
||||||
*/
|
|
||||||
private void scheduleCallback(DateTime endTime) {
|
|
||||||
SchedulingService sched = agentContext.getStatementContext().getSchedulingService();
|
|
||||||
if (handle != null) {
|
|
||||||
// remove old schedule
|
|
||||||
// log.info("Removing old callback");
|
|
||||||
sched.remove(handle, scheduleSlot);
|
|
||||||
handle = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
DateTime currentTime = toDateTime(sched.getTime());
|
|
||||||
DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS);
|
|
||||||
long callbackTime = targetTime.getMillis() - currentTime.getMillis();
|
|
||||||
|
|
||||||
ScheduleHandleCallback callback = new ScheduleHandleCallback() {
|
|
||||||
public void scheduledTrigger(EngineLevelExtensionServicesContext esc) {
|
|
||||||
handle = null; // clear out schedule handle
|
|
||||||
// log.info("Callback running");
|
|
||||||
OHLCPlugInView.this.postData();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback);
|
|
||||||
sched.add(callbackTime, handle, scheduleSlot);
|
|
||||||
// log.info("Scheduled callback for {}", callbackTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
DateTime lastStartTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update listeners with our new value.
|
|
||||||
*/
|
|
||||||
private void postData() {
|
|
||||||
if (open == null) {
|
|
||||||
// log.info("No data to post");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) {
|
|
||||||
log.warn("DUP START TIME");
|
|
||||||
}
|
|
||||||
|
|
||||||
// log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread());
|
|
||||||
|
|
||||||
OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close);
|
|
||||||
|
|
||||||
// send
|
|
||||||
EventAdapterService service = agentContext.getStatementContext().getEventAdapterService();
|
|
||||||
EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)};
|
|
||||||
updateChildren(newBeans, lastData);
|
|
||||||
|
|
||||||
// reset for next post
|
|
||||||
lastData = newBeans;
|
|
||||||
open = null;
|
|
||||||
close = null;
|
|
||||||
high = null;
|
|
||||||
low = null;
|
|
||||||
|
|
||||||
lastStartTime = windowStartTime;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@ -279,11 +92,10 @@ public class OHLCPlugInView extends ViewSupport {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public EventType getEventType() {
|
public EventType getEventType() {
|
||||||
return getEventType(agentContext.getStatementContext().getEventAdapterService());
|
return getEventType(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static EventType getEventType(EventAdapterService service)
|
protected static EventType getEventType(EventAdapterService service) {
|
||||||
{
|
|
||||||
return service.addBeanType(OHLCEvent.class.getName(),
|
return service.addBeanType(OHLCEvent.class.getName(),
|
||||||
OHLCEvent.class,
|
OHLCEvent.class,
|
||||||
false, false, false);
|
false, false, false);
|
||||||
|
|||||||
@ -1,5 +1,8 @@
|
|||||||
package ats.plugin;
|
package ats.plugin;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import com.espertech.esper.client.EventType;
|
import com.espertech.esper.client.EventType;
|
||||||
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext;
|
||||||
import com.espertech.esper.core.service.StatementContext;
|
import com.espertech.esper.core.service.StatementContext;
|
||||||
@ -10,9 +13,7 @@ import com.espertech.esper.view.ViewFactory;
|
|||||||
import com.espertech.esper.view.ViewFactoryContext;
|
import com.espertech.esper.view.ViewFactoryContext;
|
||||||
import com.espertech.esper.view.ViewFactorySupport;
|
import com.espertech.esper.view.ViewFactorySupport;
|
||||||
import com.espertech.esper.view.ViewParameterException;
|
import com.espertech.esper.view.ViewParameterException;
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -21,6 +22,8 @@ import org.joda.time.DateTime;
|
|||||||
public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
||||||
private EventAdapterService eventAdapterService;
|
private EventAdapterService eventAdapterService;
|
||||||
private List<ExprNode> params;
|
private List<ExprNode> params;
|
||||||
|
private ExprNode calcType;
|
||||||
|
private ExprNode timeType;
|
||||||
private ExprNode interval;
|
private ExprNode interval;
|
||||||
private ExprNode timestamp;
|
private ExprNode timestamp;
|
||||||
private ExprNode value;
|
private ExprNode value;
|
||||||
@ -30,13 +33,13 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
|||||||
* Pass in EPL query view params.
|
* Pass in EPL query view params.
|
||||||
*/
|
*/
|
||||||
public void setViewParameters(ViewFactoryContext context,
|
public void setViewParameters(ViewFactoryContext context,
|
||||||
List<ExprNode> params)
|
List<ExprNode> params)
|
||||||
throws ViewParameterException
|
throws ViewParameterException
|
||||||
{
|
{
|
||||||
eventAdapterService = context.getEventAdapterService();
|
eventAdapterService = context.getEventAdapterService();
|
||||||
|
|
||||||
if (params.size() != 3) {
|
if (params.size() != 5) {
|
||||||
throw new ViewParameterException("OHLC view takes three parameters: time interval, timestamp expression, and mid price expression.");
|
throw new ViewParameterException("OHLC view takes five parameters: calulation type (\"OHLC\" or \"Heikin-Ashi\"), batching type (\"time\" or \"ticks\"), time interval or tick count, timestamp expression, and mid price expression.");
|
||||||
}
|
}
|
||||||
this.params = params;
|
this.params = params;
|
||||||
}
|
}
|
||||||
@ -47,9 +50,9 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
|||||||
* type for resulting views.
|
* type for resulting views.
|
||||||
*/
|
*/
|
||||||
public void attach(EventType parentEventType,
|
public void attach(EventType parentEventType,
|
||||||
StatementContext statementContext,
|
StatementContext statementContext,
|
||||||
ViewFactory optionalParentFactory,
|
ViewFactory optionalParentFactory,
|
||||||
List<ViewFactory> parentViewFactories)
|
List<ViewFactory> parentViewFactories)
|
||||||
throws ViewParameterException
|
throws ViewParameterException
|
||||||
{
|
{
|
||||||
ExprNode[] validatedNodes = ViewFactorySupport.validate(getViewName(),
|
ExprNode[] validatedNodes = ViewFactorySupport.validate(getViewName(),
|
||||||
@ -57,20 +60,34 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
|||||||
statementContext,
|
statementContext,
|
||||||
params, true);
|
params, true);
|
||||||
|
|
||||||
interval = validatedNodes[0];
|
calcType = validatedNodes[0];
|
||||||
timestamp = validatedNodes[1];
|
timeType = validatedNodes[1];
|
||||||
value = validatedNodes[2];
|
interval = validatedNodes[2];
|
||||||
|
timestamp = validatedNodes[3];
|
||||||
|
value = validatedNodes[4];
|
||||||
|
|
||||||
|
Class calcTypeClass = calcType.getForge().getEvaluationType();
|
||||||
|
if ((calcTypeClass != String.class))
|
||||||
|
{
|
||||||
|
throw new ViewParameterException("OHLC view needs a String-typed calc type value for parameter 1");
|
||||||
|
}
|
||||||
|
|
||||||
|
Class timeTypeClass = timeType.getForge().getEvaluationType();
|
||||||
|
if ((timeTypeClass != String.class))
|
||||||
|
{
|
||||||
|
throw new ViewParameterException("OHLC view needs a String-typed time type value for parameter 2");
|
||||||
|
}
|
||||||
|
|
||||||
Class intervalClass = interval.getForge().getEvaluationType();
|
Class intervalClass = interval.getForge().getEvaluationType();
|
||||||
if ((intervalClass != String.class))
|
if ((intervalClass != String.class))
|
||||||
{
|
{
|
||||||
throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 1.");
|
throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 3");
|
||||||
}
|
}
|
||||||
|
|
||||||
Class timestampClass = timestamp.getForge().getEvaluationType();
|
Class timestampClass = timestamp.getForge().getEvaluationType();
|
||||||
if ((timestampClass != DateTime.class))
|
if ((timestampClass != DateTime.class))
|
||||||
{
|
{
|
||||||
throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 2");
|
throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 4");
|
||||||
}
|
}
|
||||||
|
|
||||||
Class valueClass = value.getForge().getEvaluationType();
|
Class valueClass = value.getForge().getEvaluationType();
|
||||||
@ -78,7 +95,7 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
|||||||
(valueClass != Double.class) &&
|
(valueClass != Double.class) &&
|
||||||
(valueClass != BigDecimal.class))
|
(valueClass != BigDecimal.class))
|
||||||
{
|
{
|
||||||
throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 3");
|
throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 5");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,7 +103,8 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport {
|
|||||||
* Create a new view using already-passed context and params.
|
* Create a new view using already-passed context and params.
|
||||||
*/
|
*/
|
||||||
public View makeView(AgentInstanceViewFactoryChainContext agentContext) {
|
public View makeView(AgentInstanceViewFactoryChainContext agentContext) {
|
||||||
return new OHLCPlugInView(agentContext, interval, timestamp, value);
|
return new OHLCPlugInView(agentContext, calcType, timeType,
|
||||||
|
interval, timestamp, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
22
src/main/java/ats/plugin/OHLCValues.java
Normal file
22
src/main/java/ats/plugin/OHLCValues.java
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package ats.plugin;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OHLCValues is a struct for storing open, high, low, and close
|
||||||
|
* values.
|
||||||
|
*/
|
||||||
|
public class OHLCValues {
|
||||||
|
public double open;
|
||||||
|
public double high;
|
||||||
|
public double low;
|
||||||
|
public double close;
|
||||||
|
|
||||||
|
|
||||||
|
public OHLCValues(double open, double high,
|
||||||
|
double low, double close)
|
||||||
|
{
|
||||||
|
this.open = open;
|
||||||
|
this.high = high;
|
||||||
|
this.low = low;
|
||||||
|
this.close = close;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user