diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 7cdd7a5..3d25e63 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -9,19 +9,25 @@ -- The time the trading logic will begin to enter trades. -- Exiting trades is 24/7. -create constant variable int StartTimeHour = 9 +create constant variable DateTime TradingStartTime = EPLHelpers.parseTime("00:00") --- The time the trading logic will begin to enter trades. +-- The time the trading logic will no longer enter trades. -- Exiting trades is 24/7. -create constant variable int EndTimeHour = 17 +create constant variable DateTime TradingEndTime = EPLHelpers.parseTime("23:59") -- The time frame for OHLC calculation. -- Example values: '5s', '1m', '1h 30m', '2h', '12h', '1d'. create constant variable string OHLCInterval = '10s' +-- The number of ticks for OHLC TB calculation. +create constant variable int OHLCTicks = 100 + -- Amount to be traded, measured in units. create constant variable int TradeSize = 10 +-- How large is a pip? +create constant variable BigDecimal PipSize = new BigDecimal(0.0001) + -- How many events to use for simple moving average calculation create constant variable int SMASize = 5 @@ -35,11 +41,11 @@ create constant variable int RefSize = 5 -- Define the window as length 1, using the structure of the TickEvent -- java class to describe what the window contains. -create window CurrentTickWindow#length(1) as TickEvent +create window CurrentTick#length(1) as TickEvent -- Describe how events get added to the window. This runs every time -- a new TickEvent is posted. -insert into CurrentTickWindow select * from TickEvent +insert into CurrentTick select * from TickEvent -- @@ -47,47 +53,85 @@ insert into CurrentTickWindow select * from TickEvent -- -- InTradingHours will be set to true when the current time is between --- StartTime and EndTime. +-- TradingStartTime and TradingEndTime. create variable bool InTradingHours = false -- Update on each tick --- NOTE: see "timer:within" pattern for possible alternate formulation -on TickEvent as t set InTradingHours = - (EPLHelpers.getHour(t.time) >= StartTimeHour and - EPLHelpers.getHour(t.time) < EndTimeHour) +on TickEvent as t + set InTradingHours = EPLHelpers.inTimeRange(t.time, TradingStartTime, TradingEndTime) + + +-- +-- In long position. Set later. +-- + +create variable bool InLongEntry = false -- -- A stream of OHLC values calculated from TickEvents -- --- Create the stream to contain OHLCEvents -create variant schema OHLCStream as OHLCEvent - -- Send every TickEvent to the OHLC plugin. The plugin will post an -- OHLCEvent to OHLCStream every OHLCInterval amount of time. It uses -- TickEvent.time ("time") as the source of the timestamp, and uses -- TickEvent.midDouble() as the value to use in the OHLC calculation. + +create variant schema OHLCStream as OHLCEvent + insert into OHLCStream - select * from TickEvent#OHLC(OHLCInterval, time, midDouble) + select * from TickEvent#OHLC("OHLC", "time", OHLCInterval, time, midDouble) + +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent using Heikin-Ashi calculations to HKStream every +-- OHLCInterval amount of time. It uses TickEvent.midDouble() as the +-- value to use in the OHLC calculation. + +create variant schema HKStream as OHLCEvent + +insert into HKStream + select * from TickEvent#OHLC("HeikinAshi", "time", OHLCInterval, time, midDouble) + +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent to TBStream every OHLCTicks ticks. It uses +-- TickEvent.time ("time") as the source of the timestamp, and uses +-- TickEvent.midDouble() as the value to use in the OHLC calculation. + +create variant schema TBStream as OHLCEvent + +insert into TBStream + select * from TickEvent#OHLC("HeikinAshi", "ticks", OHLCTicks, time, midDouble) + +-- Send every TickEvent to the OHLC plugin. The plugin will post an +-- OHLCEvent using Heikin-Ashi calculations to TBHKStream every +-- OHLCTicks ticks. It uses TickEvent.midDouble() as the value to use +-- in the OHLC calculation. + +create variant schema TBHKStream as OHLCEvent + +insert into TBHKStream + select * from TickEvent#OHLC("HeikinAshi", "ticks", OHLCTicks, time, midDouble) -- -- Simple moving average streams -- --- SMACloseStream contains OHLCValueEvents. These are like --- OHLCEvents, but add an extra field for an arbitrary value. In this --- stream, that extra value will contain the average of OHLC close --- values. -create schema SMACloseStream as ats.plugin.OHLCValueEvent +-- SMACloseStream bundles a simple moving average of the close values +-- with the values of the OHLCStream. +create schema SMACloseStream as (time DateTime, + open double, + high double, + low double, + close double, + averageClose double) -- Average the most recent OHLC close values from OHLCStream and -- post an event that contains open, high, low, close, and -- SMA(close). The number of OHLC events used in the SMA calc is set -- by the SMASize variable. insert into SMACloseStream - select new ats.plugin.OHLCValueEvent(time, open, high, low, close, Avg(close)) + select time, open, high, low, close, Avg(close) as averageClose from OHLCStream#length(SMASize) @@ -98,21 +142,19 @@ insert into SMACloseStream -- A stream that feeds B1 and B2. Each event contains a double -- precision floating point value "low", and a timestamp called -- "time". -create schema BStream as (low double, time org.joda.time.DateTime) +create schema BStream as (low double, time DateTime) -- Listen to the last "RefSize" number of SMACloseStream events. Look -- for an OHLC bar with a lower average close than its neighbors. Add --- that bar's low value and its timestamp to the stream. As described --- in SMACloseStream, "value" in the query below represents --- SMA(close). +-- that bar's low value and its timestamp to the stream. insert into BStream select prev(1, low) as low, prev(1, time) as time from SMACloseStream#length(RefSize) - where prev(0, value) > prev(1, value) - and prev(1, value) < prev(2, value) + where prev(0, averageClose) > prev(1, averageClose) + and prev(1, averageClose) < prev(2, averageClose) -- Define B1 to contain the same fields as BStream -create schema B1 (low double, time org.joda.time.DateTime) +create schema B1 (low double, time DateTime) -- B1 contains the most recent low value and time from BStream. -- This is the last time an average close was lower @@ -123,33 +165,33 @@ insert into B1 select prev(0, low) as low, prev(0, time) as time -- B2 contains the *second* most recent occurrence in BStream, but is -- otherwise the same as B1. -create schema B2 (low double, time org.joda.time.DateTime) +create schema B2 (low double, time DateTime) insert into B2 select prev(1, low) as low, prev(1, time) as time from BStream#length(RefSize) -- A stream that feeds P1 and P2. -create schema PStream as (low double, time org.joda.time.DateTime) +create schema PStream as (low double, time DateTime) -- Find an OHLC bar with a higher average close than its neighbors. -- Add that low value and its timestamp to the stream. insert into PStream select prev(1, low) as low, prev(1, time) as time from SMACloseStream#length(RefSize) - where prev(0, value) < prev(1, value) - and prev(1, value) > prev(2, value) + where prev(0, averageClose) < prev(1, averageClose) + and prev(1, averageClose) > prev(2, averageClose) -- P1 contains the most recent low value and time from PStream. -- This is the last time an average close was higher -- than the ones before and after. -- Since the time is included in the event, no separate PT1 is needed. -create schema P1 (low double, time org.joda.time.DateTime) +create schema P1 (low double, time DateTime) insert into P1 select prev(0, low) as low, prev(0, time) as time from PStream#length(RefSize) -- P2 contains the second most recent occurrence in PStream. -create schema P2 (low double, time org.joda.time.DateTime) +create schema P2 (low double, time DateTime) insert into P2 select prev(1, low) as low, prev(1, time) as time from PStream#length(RefSize) @@ -168,9 +210,9 @@ insert into MaxHigh3Window select max(high) as high from OHLCStream#length(3) --- Long entry events contain the current tick's midpoint value and --- timestamp. -create schema LongEntryStream as (current BigDecimal, time org.joda.time.DateTime, instrument String) +-- Long entry events contain the current tick's midpoint value, +-- timestamp, and instrument name. +create schema LongEntryStream as (current BigDecimal, time DateTime, instrument String) -- The long entry calc below is translated from this entry in the -- spreadsheet: @@ -184,7 +226,7 @@ create schema LongEntryStream as (current BigDecimal, time org.joda.time.DateTim insert into LongEntryStream select C.mid as current, C.time as time, C.instrument as instrument - from CurrentTickWindow as C, + from CurrentTick as C, MaxHigh3Window as T, B1#lastevent, B2#lastevent, P1#lastevent, P2#lastevent @@ -194,8 +236,10 @@ insert into LongEntryStream and EPLHelpers.laterThan(B1.time, P1.time) and EPLHelpers.laterThan(B2.time, P2.time) and EPLHelpers.laterThan(P1.time, B2.time) + and InTradingHours + and not InLongEntry --- Because multiple streams feed LongEntryStream (CurrentTickWindow, +-- Because multiple streams feed LongEntryStream (CurrentTick, -- MaxHigh3Window, B1, B2...), an event on any of those streams causes -- the LongEntryStream logic above to be triggered. This often causes -- multiple LongEntryStream events to be generated for a single tick @@ -203,14 +247,101 @@ insert into LongEntryStream -- -- LongEntryDistinct filters out duplicate LongEntryStream events, -- leaving a maximum of one event per tick. -create schema LongEntryDistinct as (current BigDecimal, time org.joda.time.DateTime, - instrument String, units int) +create schema LongEntryDistinct + as (current BigDecimal, + time DateTime, + instrument String, + units int) insert into LongEntryDistinct select le.current as current, le.time as time, le.instrument as instrument, TradeSize as units from pattern [every-distinct(le.time) le=LongEntryStream] + +-- +-- Long entry derived values +-- + +-- Register if We're in a long entry when we get the LongEntryDistinct +-- event. (InLongEntry is declared near top of file) +on LongEntryDistinct as le set InLongEntry = true + +-- LongEntryTime contains the timestamp of the last long entry, +-- or null if none has happened yet. +create variable DateTime LongEntryTime = null + +on LongEntryDistinct as le set LongEntryTime = le.time + +-- LongEntryStopBarCount contains the number of OHLCStream bars that +-- have occurred since the most recent long entry. +create variable int LongEntryStopBarCount = 0 + +-- Reset LongEntryStopBarCount when we get a new LE. +on LongEntryDistinct as le set LongEntryStopBarCount = 0 + +-- Increment LongEntryStopBarCount on every bar. +on OHLCStream as ohlc set LongEntryStopBarCount = LongEntryStopBarCount + 1 + +-- LongEntryPrice contains the price of the last long entry, +-- or null if none has happened yet. +create variable BigDecimal LongEntryPrice = null + +on LongEntryDistinct as le set LongEntryPrice = le.current + +-- LongEntryPreEntryBar is a stream that keeps bars just prior to the +-- last long entry +create schema LongEntryPreEntryBar + as (time DateTime, + open double, + high double, + low double, + close double) + +-- Add bars so long as we're not in a long position +insert into LongEntryPreEntryBar + select time, open, high, low, close from OHLCStream + where InLongEntry is false + +-- Contains the second-most-recent LongEntryPreEntryBar's "low" value +create schema LongEntryPreEntryBarPrevLow (low double) + +insert into LongEntryPreEntryBarPrevLow select prev(1, low) as low + from LongEntryPreEntryBar#length(2) + + +-- +-- Long exit +-- + +-- Long exit event definition +create schema LongExitStream + as (current BigDecimal, + time DateTime, + instrument String, + units int) + +-- The long exit calc below is translated from this entry in the +-- spreadsheet: +-- +-- LX = (LESBC <= 60 and C < MIN(PreEntrybar(Low,1), (LongEntryPrice - 10 pips))) +-- or +-- (LESBC>60 and C<(EntryPrice + 2 pips) + +insert into LongExitStream + select C.mid as current, + C.time as time, + C.instrument as instrument, + 0 - TradeSize as units -- negative for "sell" + from CurrentTick as C, + LongEntryPreEntryBarPrevLow#lastevent as L + where ((LongEntryStopBarCount <= 60 and C.mid < min(L.low, LongEntryPrice - (10 * PipSize))) or + (LongEntryStopBarCount > 60 and C.mid < LongEntryPrice + (2 * PipSize))) + and InLongEntry + +-- Register the long position as closed +on LongExitStream as lx set InLongEntry = false + -- -- Event logging @@ -224,27 +355,51 @@ create schema LogStream as (stream string, event string) -- of these can be either helpful or too noisy. Comment/uncomment as -- you see fit. -insert into LogStream select 'TickEvent' as stream, EPLHelpers.str(*) as event from TickEvent +-- insert into LogStream select 'TickEvent' as stream, EPLHelpers.str(*) as event from TickEvent -insert into LogStream select 'OHLCStream' as stream, EPLHelpers.str(*) as event from OHLCStream +-- insert into LogStream select 'OHLCStream' as stream, EPLHelpers.str(*) as event from OHLCStream + +-- insert into LogStream select 'HKStream' as stream, EPLHelpers.str(*) as event from HKStream + +-- insert into LogStream select 'TBStream' as stream, EPLHelpers.str(*) as event from TBStream + +-- insert into LogStream select 'TBHKStream' as stream, EPLHelpers.str(*) as event from TBHKStream + +-- insert into LogStream select 'InTradingHours' as stream, EPLHelpers.str(time, InTradingHours) as event from TickEvent -- insert into LogStream select 'BStream' as stream, EPLHelpers.str(*) as event from BStream -- insert into LogStream select 'PStream' as stream, EPLHelpers.str(*) as event from PStream -insert into LogStream select 'B1' as stream, EPLHelpers.str(*) as event from B1 +-- insert into LogStream select 'B1' as stream, EPLHelpers.str(*) as event from B1 -insert into LogStream select 'B2' as stream, EPLHelpers.str(*) as event from B2 +-- insert into LogStream select 'B2' as stream, EPLHelpers.str(*) as event from B2 -insert into LogStream select 'P1' as stream, EPLHelpers.str(*) as event from P1 +-- insert into LogStream select 'P1' as stream, EPLHelpers.str(*) as event from P1 -insert into LogStream select 'P2' as stream, EPLHelpers.str(*) as event from P2 +-- insert into LogStream select 'P2' as stream, EPLHelpers.str(*) as event from P2 -- insert into LogStream select 'MaxHigh3Window' as stream, EPLHelpers.str(*) as event from MaxHigh3Window -- insert into LogStream select 'LongEntryStream' as stream, EPLHelpers.str(*) as event from LongEntryStream --- insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as event from LongEntryDistinct +insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as event from LongEntryDistinct +-- insert into LogStream select 'InLongEntry' as stream, EPLHelpers.str(InLongEntry) as event from OHLCStream + +-- insert into LogStream select 'LongEntryTime' as stream, EPLHelpers.str(LongEntryTime) as event from TickEvent + +-- insert into LogStream select 'LongEntryPrice' as stream, EPLHelpers.str(LongEntryPrice) as event from OHLCStream + +-- insert into LogStream select 'LongEntryStopBarCount' as stream, EPLHelpers.str(LongEntryStopBarCount) as event from OHLCStream where InLongEntry + +-- insert into LogStream select 'LongEntryPreEntryBar' as stream, EPLHelpers.str(*) as event from LongEntryPreEntryBar + +-- insert into LogStream select 'LongEntryPreEntryBarPrevLow' as stream, EPLHelpers.str(*) as event from LongEntryPreEntryBarPrevLow + +-- insert into LogStream select 'LXPrice' as stream, EPLHelpers.str(mid, LongEntryPrice, LongEntryPrice - (10 * PipSize)) as event from TickEvent +-- where InLongEntry and mid < LongEntryPrice + +insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream -- TODO (for Seth): look into LogSink http://esper.espertech.com/release-7.1.0/esper-reference/html/dataflow.html#dataflow-reference-logsink diff --git a/src/main/java/EPLHelpers.java b/src/main/java/EPLHelpers.java index ac40a71..628879d 100644 --- a/src/main/java/EPLHelpers.java +++ b/src/main/java/EPLHelpers.java @@ -1,27 +1,54 @@ +import java.util.StringJoiner; + import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.DateTimeComparator; - +/** + * EPLHelpers contains small helper routines used within epl files. + */ public class EPLHelpers { final static Logger log = LoggerFactory.getLogger(EPLHelpers.class); + private static DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("HH:mm"); + private static DateTimeComparator timeComparator = DateTimeComparator.getTimeOnlyInstance(); - /** Return the hour of the day for the given date. */ - public static int getHour(DateTime date) { - return date.getHourOfDay(); + /** + * A simple toString() wrapper. + */ + public static String str(Object... objects) { + StringJoiner sj = new StringJoiner(", "); + for (Object o : objects) { + sj.add(o != null ? o.toString() : "null"); + } + return sj.toString(); } - /** A simple toString() wrapper for use in epl. */ - public static String str(Object o) { return o.toString(); } + /** + * Return a DateTime object for the time. Should be specified in + * 24-hour "hh:mm" format. + */ + public static DateTime parseTime(String time) { + return timeFormatter.parseDateTime(time); + } + + /** + * Return true if the time portion of 'now' is between the time + * portion of 'start' and 'end'. + */ + public static boolean inTimeRange(DateTime now, DateTime start, DateTime end) { + return laterThan(now, start) && earlierThan(now, end); + } /** * Compare two times and return true if the first is earlier than * the second. */ public static boolean earlierThan(DateTime a, DateTime b) { - return DateTimeComparator.getInstance().compare(a, b) < 0; + return timeComparator.compare(a, b) < 0; } /** @@ -29,6 +56,6 @@ public class EPLHelpers { * the second. */ public static boolean laterThan(DateTime a, DateTime b) { - return DateTimeComparator.getInstance().compare(a, b) > 0; + return timeComparator.compare(a, b) > 0; } } diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index 755ae71..5a2c186 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -9,13 +9,13 @@ import com.espertech.esper.client.StatementAwareUpdateListener; import com.espertech.esper.client.UpdateListener; import com.espertech.esper.client.time.CurrentTimeEvent; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ats.orders.MarketOrderRequest; import ats.plugin.OHLCEvent; import ats.plugin.OHLCPlugInViewFactory; -import ats.plugin.OHLCValueEvent; public class EsperProcessor implements TickProcessor { @@ -35,7 +35,7 @@ public class EsperProcessor implements TickProcessor { // register event types defined in java classes config.addEventType(TickEvent.class); config.addEventType(OHLCEvent.class); - config.addEventType(OHLCValueEvent.class); + config.addEventType(DateTime.class); // add OHLC plugin config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); @@ -61,6 +61,20 @@ public class EsperProcessor implements TickProcessor { newData[0].get("current"), newData[0].get("time")); }); + + // respond to long exit events + addStatement("select * from LongExitStream", + (newData, oldData) -> { + String instrument = (String)newData[0].get("instrument"); + Integer units = (Integer)newData[0].get("units"); + trader.placeOrder(new MarketOrderRequest(instrument, units)); + + log.debug("Long exit triggered: {} of {} at price {} at time {}", + units, + instrument, + newData[0].get("current"), + newData[0].get("time")); + }); } /** diff --git a/src/main/java/ats/plugin/CandlestickCalc.java b/src/main/java/ats/plugin/CandlestickCalc.java new file mode 100644 index 0000000..2a539ce --- /dev/null +++ b/src/main/java/ats/plugin/CandlestickCalc.java @@ -0,0 +1,25 @@ +package ats.plugin; + +/** + * CandlestickCalc embodies a method of calculating candlestick + * values. For example, OHLC or Heikin-Ashi. + */ +interface CandlestickCalc { + public enum Type { OHLC, HeikinAshi }; + + /** + * Reset the calculation for the beginning of an interval. + */ + void reset(); + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + void applyValue(double value); + + /** + * Return the values calculated since the last reset point. + */ + OHLCValues getValues(); +} diff --git a/src/main/java/ats/plugin/CandlestickWindow.java b/src/main/java/ats/plugin/CandlestickWindow.java new file mode 100644 index 0000000..ee2daf6 --- /dev/null +++ b/src/main/java/ats/plugin/CandlestickWindow.java @@ -0,0 +1,17 @@ +package ats.plugin; + +import org.joda.time.DateTime; + +/** + * A CandlestickWindow runs a CandlestickCalc across a number of + * ticks and sends + */ +interface CandlestickWindow { + public enum Type { time, ticks }; + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value); + +} diff --git a/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java new file mode 100644 index 0000000..f6897c5 --- /dev/null +++ b/src/main/java/ats/plugin/HeikinAshiCandlestickCalc.java @@ -0,0 +1,102 @@ +package ats.plugin; + +/** + * HeikinAshiCandlestickCalc calculates values using the Heikin-Ashi + * technique. + * + * @see Heikin-Ashi description + */ +class HeikinAshiCandlestickCalc implements CandlestickCalc { + private Double lastOpen; + private Double lastClose; + private Double currentOpen; + private Double currentClose; + private Double currentHigh; + private Double currentLow; + + + /** + * Reset the calculation for the beginning of an interval. + */ + @Override + public void reset() { + lastOpen = currentOpen; + lastClose = currentClose; + + currentOpen = null; + currentClose = null; + currentHigh = null; + currentLow = null; + } + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + @Override + public void applyValue(double value) { + if (currentOpen == null) { + currentOpen = value; + } + + currentClose = value; + + if (currentLow == null) { + currentLow = value; + } else if (currentLow.compareTo(value) > 0) { + currentLow = value; + } + + if (currentHigh == null) { + currentHigh = value; + } else if (currentHigh.compareTo(value) < 0) { + currentHigh = value; + } + } + + /** + * HA-Open = (HA-Open(-1) + HA-Close(-1)) / 2 + */ + private double calcOpen() { + double sum = 0; + + sum += lastOpen != null ? lastOpen : currentOpen; + sum += lastClose != null ? lastClose : currentClose; + + return sum / 2d; + } + + /** + * HA-Close = (Open(0) + High(0) + Low(0) + Close(0)) / 4 + */ + private double calcClose() { + return (currentOpen + currentHigh + currentLow + currentClose) / 4d; + } + + /** + * HA-High = Maximum of the High(0), HA-Open(0) or HA-Close(0) + */ + private double calcHigh(double haOpen, double haClose) { + return Math.max(currentHigh, Math.max(haOpen, haClose)); + } + + /** + * HA-Low = Minimum of the Low(0), HA-Open(0) or HA-Close(0) + */ + private double calcLow(double haOpen, double haClose) { + return Math.min(currentLow, Math.min(haOpen, haClose)); + } + + /** + * Return our calculated values up to now. + */ + @Override + public OHLCValues getValues() { + double o = calcOpen(); + double c = calcClose(); + double h = calcHigh(o, c); + double l = calcLow(o, c); + + return new OHLCValues(o, h, l, c); + } +} diff --git a/src/main/java/ats/plugin/OHLCCandlestickCalc.java b/src/main/java/ats/plugin/OHLCCandlestickCalc.java new file mode 100644 index 0000000..b679083 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCCandlestickCalc.java @@ -0,0 +1,56 @@ +package ats.plugin; + +/** + * OHLCCandlestickCalc calculates values for OHLC. + */ +class OHLCCandlestickCalc implements CandlestickCalc { + private Double open; + private Double close; + private Double high; + private Double low; + + + /** + * Reset the calculation for the beginning of an interval. + */ + @Override + public void reset() { + open = null; + close = null; + high = null; + low = null; + } + + /** + * Accept a current tick value to apply to the current + * calculations. + */ + @Override + public void applyValue(double value) { + if (open == null) { + open = value; + } + + close = value; + + if (low == null) { + low = value; + } else if (low.compareTo(value) > 0) { + low = value; + } + + if (high == null) { + high = value; + } else if (high.compareTo(value) < 0) { + high = value; + } + } + + /** + * Return our calculated values up to now. + */ + @Override + public OHLCValues getValues() { + return new OHLCValues(open, high, low, close); + } +} diff --git a/src/main/java/ats/plugin/OHLCEvent.java b/src/main/java/ats/plugin/OHLCEvent.java index 99eaf17..99ed6f8 100644 --- a/src/main/java/ats/plugin/OHLCEvent.java +++ b/src/main/java/ats/plugin/OHLCEvent.java @@ -13,6 +13,10 @@ public class OHLCEvent { private double close; + public OHLCEvent(DateTime time, OHLCValues values) { + this(time, values.open, values.high, values.low, values.close); + } + public OHLCEvent(DateTime time, double open, double high, double low, double close) diff --git a/src/main/java/ats/plugin/OHLCPlugInView.java b/src/main/java/ats/plugin/OHLCPlugInView.java index 8f55cae..3c7548c 100644 --- a/src/main/java/ats/plugin/OHLCPlugInView.java +++ b/src/main/java/ats/plugin/OHLCPlugInView.java @@ -1,92 +1,94 @@ package ats.plugin; +import java.util.Iterator; + import com.espertech.esper.client.EventBean; import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; -import com.espertech.esper.core.service.EPStatementHandleCallback; -import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; import com.espertech.esper.epl.expression.core.ExprNode; import com.espertech.esper.event.EventAdapterService; -import com.espertech.esper.schedule.ScheduleHandleCallback; import com.espertech.esper.view.ViewSupport; -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.Iterator; + +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.joda.time.Period; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import com.espertech.esper.epl.expression.core.ExprEvaluator; -import com.espertech.esper.schedule.SchedulingService; -import org.joda.time.Duration; -import org.joda.time.format.ISODateTimeFormat; -import java.util.TimeZone; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.Instant; -import org.joda.time.PeriodType; /** * OHLCPlugInView computes OHLC bars for a given time interval. */ public class OHLCPlugInView extends ViewSupport { private static final Logger log = LoggerFactory.getLogger(OHLCPlugInView.class); - private static final int LATE_EVENT_SLACK_SECONDS = 5; - private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() - .appendDays().appendSuffix("d") - .appendHours().appendSuffix("h") - .appendMinutes().appendSuffix("m") - .appendSeconds().appendSuffix("s") - .toFormatter(); - - private final AgentInstanceViewFactoryChainContext agentContext; - private final long scheduleSlot; - private EPStatementHandleCallback handle; - - private final Duration interval; + private final AgentInstanceViewFactoryChainContext context; private final ExprNode timestampExpression; private final ExprNode valueExpression; - - private DateTime windowStartTime; - private DateTime windowEndTime; - private Double open; - private Double close; - private Double high; - private Double low; + private CandlestickWindow window; + private EventAdapterService service; private EventBean[] lastData; + /** + * Create a new plugin. + */ public OHLCPlugInView(AgentInstanceViewFactoryChainContext context, + ExprNode calcTypeExpression, + ExprNode timeTypeExpression, ExprNode intervalExpression, ExprNode timestampExpression, ExprNode valueExpression) { - agentContext = context; - scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); - - interval = parseInterval(intervalExpression); - // log.info("Interval is {}", interval); - + this.context = context; this.timestampExpression = timestampExpression; this.valueExpression = valueExpression; + service = context.getStatementContext().getEventAdapterService(); + + CandlestickCalc calc = chooseCalc(calcTypeExpression); + window = chooseWindow(context, timeTypeExpression, intervalExpression, calc); } /** - * Return the time period specified by the given expression value. + * Return the proper CandlestickCalc according to the value of the + * calc type parameter. */ - private Duration parseInterval(ExprNode interval) { - ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); - String intervalStr = (String)evaluator.evaluate(null, true, agentContext); - return parseInterval(intervalStr); + private CandlestickCalc chooseCalc(ExprNode calcTypeExpression) { + String calcType = exprNodeValue(calcTypeExpression); + + if (CandlestickCalc.Type.valueOf(calcType) == CandlestickCalc.Type.HeikinAshi) { + log.info("Using Heikin-Ashi calc type"); + return new HeikinAshiCandlestickCalc(); + } + + log.info("Using OHLC calc type"); + return new OHLCCandlestickCalc(); } /** - * Return the time period specified by the given string. + * Return the proper window according to the value of the + * calc type parameter. + * @param intervalExpression */ - public static Duration parseInterval(String interval) { - interval = interval.replaceAll("\\s+",""); - return periodFormatter.parsePeriod(interval).toStandardDuration(); + private CandlestickWindow chooseWindow(AgentInstanceViewFactoryChainContext context, + ExprNode timeTypeExpression, + ExprNode intervalExpression, + CandlestickCalc calc) + { + String timeType = exprNodeValue(timeTypeExpression); + + if (CandlestickWindow.Type.valueOf(timeType) == CandlestickWindow.Type.time) { + log.info("Using time based window type"); + return new TimeCandlestickWindow(this, calc, context, intervalExpression); + } + + log.info("Using ticks based window type"); + return new TicksCandlestickWindow(this, calc, context, intervalExpression); + } + + /** + * Return the string value of a node. + */ + private String exprNodeValue(ExprNode node) { + ExprEvaluator evaluator = node.getForge().getExprEvaluator(); + return (String)evaluator.evaluate(null, true, context); } /** @@ -94,15 +96,7 @@ public class OHLCPlugInView extends ViewSupport { */ private DateTime getTimestamp(EventBean event) { ExprEvaluator evaluator = timestampExpression.getForge().getExprEvaluator(); - return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, agentContext); - } - - /** - * Convert a bare long value to a proper DateTime entity. Assumes - * UTC time zone. - */ - public static DateTime toDateTime(long l) { - return new DateTime(l, DateTimeZone.UTC); + return (DateTime)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -110,7 +104,7 @@ public class OHLCPlugInView extends ViewSupport { */ private double getValue(EventBean event) { ExprEvaluator evaluator = valueExpression.getForge().getExprEvaluator(); - return (double)evaluator.evaluate(new EventBean[] {event}, true, agentContext); + return (double)evaluator.evaluate(new EventBean[] {event}, true, context); } /** @@ -123,150 +117,17 @@ public class OHLCPlugInView extends ViewSupport { for (EventBean event : newData) { DateTime timestamp = getTimestamp(event); double value = getValue(event); - - ensureWindow(timestamp); - applyValue(value); + window.update(timestamp, value); } } /** - * Make sure our window times are set up and current. + * Send an event to all plugin listeners. */ - private void ensureWindow(DateTime timestamp) { - if (timestamp == null) return; - - if (windowStartTime == null) { - // create open window - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - - if (!inWindow(timestamp)) { - // past current window. - // post and create a new one. - postData(); - - windowStartTime = makeWindowStartTime(timestamp, interval); - windowEndTime = makeWindowEndTime(windowStartTime, interval); - scheduleCallback(windowEndTime); - } - } - - public static DateTime makeWindowStartTime(DateTime timestamp, - Duration interval) - { - DateTime today = timestamp.withTimeAtStartOfDay(); - // log.info("Timestamp is {}", timestamp); - // log.info("Day start is {}", today); - - Duration intoToday = new Duration(today, timestamp); - - // calc how far into the current window we are - long intoPeriod = intoToday.getMillis() % interval.getMillis(); - - return timestamp.minus(intoPeriod); - } - - private static DateTime makeWindowEndTime(DateTime startTime, - Duration interval) - { - if (startTime == null || interval == null) return null; - - return startTime.plus(interval.getMillis()); - } - - /** - * Return true if the timestamp is within the current time window. - */ - private boolean inWindow(DateTime timestamp) { - if (timestamp == null) return false; - - return timestamp.compareTo(windowStartTime) >= 0 && - timestamp.compareTo(windowEndTime) < 0; - } - - private void applyValue(double value) { - if (open == null) { - open = value; - } - - close = value; - - if (low == null) { - low = value; - } else if (low.compareTo(value) > 0) { - low = value; - } - - if (high == null) { - high = value; - } else if (high.compareTo(value) < 0) { - high = value; - } - } - - /** - * Set up a callback to post an event when our time window expires. - */ - private void scheduleCallback(DateTime endTime) { - SchedulingService sched = agentContext.getStatementContext().getSchedulingService(); - if (handle != null) { - // remove old schedule - // log.info("Removing old callback"); - sched.remove(handle, scheduleSlot); - handle = null; - } - - DateTime currentTime = toDateTime(sched.getTime()); - DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); - long callbackTime = targetTime.getMillis() - currentTime.getMillis(); - - ScheduleHandleCallback callback = new ScheduleHandleCallback() { - public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { - handle = null; // clear out schedule handle - // log.info("Callback running"); - OHLCPlugInView.this.postData(); - } - }; - - handle = new EPStatementHandleCallback(agentContext.getEpStatementAgentInstanceHandle(), callback); - sched.add(callbackTime, handle, scheduleSlot); - // log.info("Scheduled callback for {}", callbackTime); - } - - DateTime lastStartTime; - - /** - * Update listeners with our new value. - */ - private void postData() { - if (open == null) { - // log.info("No data to post"); - return; - } - - if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { - log.warn("DUP START TIME"); - } - - // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); - - OHLCEvent value = new OHLCEvent(windowStartTime, open, high, low, close); - - // send - EventAdapterService service = agentContext.getStatementContext().getEventAdapterService(); - EventBean[] newBeans = new EventBean[] {service.adapterForBean(value)}; - updateChildren(newBeans, lastData); - - // reset for next post - lastData = newBeans; - open = null; - close = null; - high = null; - low = null; - - lastStartTime = windowStartTime; + public void postEvent(OHLCEvent event) { + EventBean[] newData = new EventBean[] {service.adapterForBean(event)}; + updateChildren(newData, lastData); + lastData = newData; } // @@ -279,11 +140,10 @@ public class OHLCPlugInView extends ViewSupport { */ @Override public EventType getEventType() { - return getEventType(agentContext.getStatementContext().getEventAdapterService()); + return getEventType(service); } - protected static EventType getEventType(EventAdapterService service) - { + protected static EventType getEventType(EventAdapterService service) { return service.addBeanType(OHLCEvent.class.getName(), OHLCEvent.class, false, false, false); diff --git a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java index 30c5c6e..004a831 100644 --- a/src/main/java/ats/plugin/OHLCPlugInViewFactory.java +++ b/src/main/java/ats/plugin/OHLCPlugInViewFactory.java @@ -1,5 +1,8 @@ package ats.plugin; +import java.math.BigDecimal; +import java.util.List; + import com.espertech.esper.client.EventType; import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; import com.espertech.esper.core.service.StatementContext; @@ -10,9 +13,7 @@ import com.espertech.esper.view.ViewFactory; import com.espertech.esper.view.ViewFactoryContext; import com.espertech.esper.view.ViewFactorySupport; import com.espertech.esper.view.ViewParameterException; -import java.math.BigDecimal; -import java.util.Date; -import java.util.List; + import org.joda.time.DateTime; /** @@ -21,6 +22,8 @@ import org.joda.time.DateTime; public class OHLCPlugInViewFactory extends ViewFactorySupport { private EventAdapterService eventAdapterService; private List params; + private ExprNode calcType; + private ExprNode timeType; private ExprNode interval; private ExprNode timestamp; private ExprNode value; @@ -30,13 +33,13 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Pass in EPL query view params. */ public void setViewParameters(ViewFactoryContext context, - List params) + List params) throws ViewParameterException { eventAdapterService = context.getEventAdapterService(); - if (params.size() != 3) { - throw new ViewParameterException("OHLC view takes three parameters: time interval, timestamp expression, and mid price expression."); + if (params.size() != 5) { + throw new ViewParameterException("OHLC view takes five parameters: calulation type (\"OHLC\" or \"Heikin-Ashi\"), batching type (\"time\" or \"ticks\"), time interval or tick count, timestamp expression, and mid price expression."); } this.params = params; } @@ -47,9 +50,9 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * type for resulting views. */ public void attach(EventType parentEventType, - StatementContext statementContext, - ViewFactory optionalParentFactory, - List parentViewFactories) + StatementContext statementContext, + ViewFactory optionalParentFactory, + List parentViewFactories) throws ViewParameterException { ExprNode[] validatedNodes = ViewFactorySupport.validate(getViewName(), @@ -57,20 +60,36 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { statementContext, params, true); - interval = validatedNodes[0]; - timestamp = validatedNodes[1]; - value = validatedNodes[2]; + calcType = validatedNodes[0]; + timeType = validatedNodes[1]; + interval = validatedNodes[2]; + timestamp = validatedNodes[3]; + value = validatedNodes[4]; + + Class calcTypeClass = calcType.getForge().getEvaluationType(); + if (calcTypeClass != String.class) + { + throw new ViewParameterException("OHLC view needs a String-typed calc type value for parameter 1"); + } + + Class timeTypeClass = timeType.getForge().getEvaluationType(); + if (timeTypeClass != String.class) + { + throw new ViewParameterException("OHLC view needs a String-typed time type value for parameter 2"); + } Class intervalClass = interval.getForge().getEvaluationType(); - if ((intervalClass != String.class)) + if ((intervalClass != String.class) && + (intervalClass != Integer.class) && + (intervalClass != int.class)) { - throw new ViewParameterException("OHLC view needs String-typed interval value for parameter 1."); + throw new ViewParameterException("OHLC view needs String- or integer-typed interval value for parameter 3 - got " + intervalClass); } Class timestampClass = timestamp.getForge().getEvaluationType(); if ((timestampClass != DateTime.class)) { - throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 2"); + throw new ViewParameterException("OHLC view needs DateTime typed timestamp values for parameter 4"); } Class valueClass = value.getForge().getEvaluationType(); @@ -78,7 +97,7 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { (valueClass != Double.class) && (valueClass != BigDecimal.class)) { - throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 3"); + throw new ViewParameterException("OHLC view needs double or BigDecimal values for parameter 5"); } } @@ -86,7 +105,8 @@ public class OHLCPlugInViewFactory extends ViewFactorySupport { * Create a new view using already-passed context and params. */ public View makeView(AgentInstanceViewFactoryChainContext agentContext) { - return new OHLCPlugInView(agentContext, interval, timestamp, value); + return new OHLCPlugInView(agentContext, calcType, timeType, + interval, timestamp, value); } /** diff --git a/src/main/java/ats/plugin/OHLCValueEvent.java b/src/main/java/ats/plugin/OHLCValueEvent.java deleted file mode 100644 index ed12b74..0000000 --- a/src/main/java/ats/plugin/OHLCValueEvent.java +++ /dev/null @@ -1,42 +0,0 @@ -package ats.plugin; - -import org.joda.time.DateTime; - -/** - * OHLCValueEvent stores one bar of OHLC info. - */ -public class OHLCValueEvent extends OHLCEvent { - private double value; - - - public OHLCValueEvent() { - this(null, 0, 0, 0, 0, 0); - } - - public OHLCValueEvent(DateTime time, - double open, double high, - double low, double close, - double value) - { - super(time, open, high, low, close); - this.value = value; - } - - public static OHLCValueEvent make(DateTime time, - double open, double high, - double low, double close, - double value) - { - return new OHLCValueEvent(time, open, high, low, close, value); - } - - public Double getValue() { return value; } - - /** - * Return a human readable representation of this event. - */ - public String toString() { - return String.format("OHLCValueEvent[%s, open=%.3f, high=%.3f, low=%.3f, close=%.3f, value=%.3f]", - getTime(), getOpen(), getHigh(), getLow(), getClose(), value); - } -} diff --git a/src/main/java/ats/plugin/OHLCValues.java b/src/main/java/ats/plugin/OHLCValues.java new file mode 100644 index 0000000..5a4a629 --- /dev/null +++ b/src/main/java/ats/plugin/OHLCValues.java @@ -0,0 +1,22 @@ +package ats.plugin; + +/** + * OHLCValues is a struct for storing open, high, low, and close + * values. + */ +public class OHLCValues { + public double open; + public double high; + public double low; + public double close; + + + public OHLCValues(double open, double high, + double low, double close) + { + this.open = open; + this.high = high; + this.low = low; + this.close = close; + } +} diff --git a/src/main/java/ats/plugin/TicksCandlestickWindow.java b/src/main/java/ats/plugin/TicksCandlestickWindow.java new file mode 100644 index 0000000..94801ed --- /dev/null +++ b/src/main/java/ats/plugin/TicksCandlestickWindow.java @@ -0,0 +1,85 @@ +package ats.plugin; + +import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; +import com.espertech.esper.epl.expression.core.ExprNode; + +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TicksCandlestickWindow runs a CandlestickCalc across a set number + * of ticks for each window. + */ +class TicksCandlestickWindow implements CandlestickWindow { + private static final Logger log = LoggerFactory.getLogger(TicksCandlestickWindow.class); + private final AgentInstanceViewFactoryChainContext context; + /** How many ticks in each window? */ + private final long windowTicks; + private OHLCPlugInView plugin; + private CandlestickCalc calc; + /** How many ticks into the window we are. */ + private long currentTick = 0; + /** The time the window started. */ + private DateTime windowStartTime; + + + /** + * Create a new window that calculates across a set number of + * ticks. + */ + public TicksCandlestickWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) + { + this.plugin = plugin; + this.calc = calc; + this.context = context; + windowTicks = parseTickCount(intervalExpression); + } + + /** + * Return the time period specified by the given expression value. + */ + private long parseTickCount(ExprNode intervalExpression) { + ExprEvaluator evaluator = intervalExpression.getForge().getExprEvaluator(); + Object o = evaluator.evaluate(null, true, context); + return new Long((Integer)o).longValue(); + } + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value) { + ensureWindow(timestamp); + calc.applyValue(value); + } + + /** + * Make sure our window times are set up and current. + * @param timestamp + */ + public void ensureWindow(DateTime timestamp) { + if (windowStartTime == null) + windowStartTime = timestamp; + + currentTick++; + + if (currentTick < windowTicks) return; + + OHLCValues values = calc.getValues(); + if (values == null) { + log.info("No data to post"); + return; + } + + plugin.postEvent(new OHLCEvent(windowStartTime, values)); + + // reset for next window + calc.reset(); + currentTick = 0; + windowStartTime = null; + } +} diff --git a/src/main/java/ats/plugin/TimeCandlestickWindow.java b/src/main/java/ats/plugin/TimeCandlestickWindow.java new file mode 100644 index 0000000..23ef0e8 --- /dev/null +++ b/src/main/java/ats/plugin/TimeCandlestickWindow.java @@ -0,0 +1,198 @@ +package ats.plugin; + +import com.espertech.esper.core.context.util.AgentInstanceViewFactoryChainContext; +import com.espertech.esper.core.service.EPStatementHandleCallback; +import com.espertech.esper.core.service.EngineLevelExtensionServicesContext; +import com.espertech.esper.epl.expression.core.ExprEvaluator; +import com.espertech.esper.epl.expression.core.ExprNode; +import com.espertech.esper.schedule.ScheduleHandleCallback; +import com.espertech.esper.schedule.SchedulingService; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TimeCandlestickWindow runs a CandlestickCalc across ticks for a set + * duration each window. + */ +class TimeCandlestickWindow implements CandlestickWindow { + private static final Logger log = LoggerFactory.getLogger(TimeCandlestickWindow.class); + private static final int LATE_EVENT_SLACK_SECONDS = 5; + private static final PeriodFormatter periodFormatter = new PeriodFormatterBuilder() + .appendDays().appendSuffix("d") + .appendHours().appendSuffix("h") + .appendMinutes().appendSuffix("m") + .appendSeconds().appendSuffix("s") + .toFormatter(); + private final AgentInstanceViewFactoryChainContext context; + private final long scheduleSlot; + private EPStatementHandleCallback handle; + private final Duration interval; + private OHLCPlugInView plugin; + private CandlestickCalc calc; + private DateTime windowStartTime; + private DateTime windowEndTime; + private DateTime lastStartTime; + + + /** + * Create a window over the given duration. + */ + public TimeCandlestickWindow(OHLCPlugInView plugin, + CandlestickCalc calc, + AgentInstanceViewFactoryChainContext context, + ExprNode intervalExpression) + { + this.plugin = plugin; + this.calc = calc; + this.context = context; + scheduleSlot = context.getStatementContext().getScheduleBucket().allocateSlot(); + interval = parseInterval(intervalExpression); + } + + /** + * Process a tick with the given time and value. + */ + public void update(DateTime timestamp, double value) { + ensureWindow(timestamp); + calc.applyValue(value); + } + + /** + * Return the time period specified by the given expression value. + */ + private Duration parseInterval(ExprNode interval) { + ExprEvaluator evaluator = interval.getForge().getExprEvaluator(); + String intervalStr = (String)evaluator.evaluate(null, true, context); + return parseInterval(intervalStr); + } + + /** + * Return the time period specified by the given string. + */ + public static Duration parseInterval(String interval) { + interval = interval.replaceAll("\\s+",""); + return periodFormatter.parsePeriod(interval).toStandardDuration(); + } + + /** + * Make sure our window times are set up and current. + */ + public void ensureWindow(DateTime timestamp) { + if (timestamp == null) return; + + if (windowStartTime == null) { + // create open window + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + + if (!inWindow(timestamp)) { + // past current window. + // post and create a new one. + postData(); + + windowStartTime = makeWindowStartTime(timestamp, interval); + windowEndTime = makeWindowEndTime(windowStartTime, interval); + scheduleCallback(windowEndTime); + } + } + + public static DateTime makeWindowStartTime(DateTime timestamp, + Duration interval) + { + DateTime today = timestamp.withTimeAtStartOfDay(); + // log.info("Timestamp is {}", timestamp); + // log.info("Day start is {}", today); + + Duration intoToday = new Duration(today, timestamp); + + // calc how far into the current window we are + long intoPeriod = intoToday.getMillis() % interval.getMillis(); + + return timestamp.minus(intoPeriod); + } + + private static DateTime makeWindowEndTime(DateTime startTime, + Duration interval) + { + if (startTime == null || interval == null) return null; + + return startTime.plus(interval.getMillis()); + } + + /** + * Return true if the timestamp is within the current time window. + */ + private boolean inWindow(DateTime timestamp) { + if (timestamp == null) return false; + + return timestamp.compareTo(windowStartTime) >= 0 && + timestamp.compareTo(windowEndTime) < 0; + } + + /** + * Convert a bare long value to a proper DateTime entity. Assumes + * UTC time zone. + */ + public static DateTime toDateTime(long l) { + return new DateTime(l, DateTimeZone.UTC); + } + + /** + * Set up a callback to post an event when our time window expires. + */ + private void scheduleCallback(DateTime endTime) { + SchedulingService sched = context.getStatementContext().getSchedulingService(); + if (handle != null) { + // remove old schedule + // log.info("Removing old callback"); + sched.remove(handle, scheduleSlot); + handle = null; + } + + DateTime currentTime = toDateTime(sched.getTime()); + DateTime targetTime = endTime.plusSeconds(LATE_EVENT_SLACK_SECONDS); + long callbackTime = targetTime.getMillis() - currentTime.getMillis(); + + ScheduleHandleCallback callback = new ScheduleHandleCallback() { + public void scheduledTrigger(EngineLevelExtensionServicesContext esc) { + handle = null; // clear out schedule handle + // log.info("Callback running"); + TimeCandlestickWindow.this.postData(); + } + }; + + handle = new EPStatementHandleCallback(context.getEpStatementAgentInstanceHandle(), callback); + sched.add(callbackTime, handle, scheduleSlot); + // log.info("Scheduled callback for {}", callbackTime); + } + + /** + * Update listeners with our new value. + */ + private void postData() { + if (lastStartTime != null && lastStartTime.compareTo(windowStartTime) == 0) { + log.warn("DUP START TIME"); + } + + // log.info("posting {} with {} events in {}", windowStartTime, eventCount, Thread.currentThread()); + + OHLCValues values = calc.getValues(); + if (values == null) { + // log.info("No data to post"); + return; + } + + plugin.postEvent(new OHLCEvent(windowStartTime, values)); + + calc.reset(); + lastStartTime = windowStartTime; + } +}