From 583d319ef3df9128496f7219948214fe17fdf380 Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Sat, 26 May 2018 22:50:05 -0700 Subject: [PATCH] rename *Tick classes to *TickEvent --- src/main/java/CSVReader.java | 6 +- src/main/java/DebugProcessor.java | 2 +- src/main/java/EsperProcessor.java | 91 +++++++++++++++---- .../{OANDATick.java => OANDATickEvent.java} | 23 +++-- src/main/java/Tick.java | 26 ------ src/main/java/TickEvent.java | 46 ++++++++++ src/main/java/TickProcessor.java | 2 +- .../{TrueFXTick.java => TrueFXTickEvent.java} | 59 +++++++----- 8 files changed, 176 insertions(+), 79 deletions(-) rename src/main/java/{OANDATick.java => OANDATickEvent.java} (75%) delete mode 100644 src/main/java/Tick.java create mode 100644 src/main/java/TickEvent.java rename src/main/java/{TrueFXTick.java => TrueFXTickEvent.java} (66%) diff --git a/src/main/java/CSVReader.java b/src/main/java/CSVReader.java index 2e5ff3b..79f98ac 100644 --- a/src/main/java/CSVReader.java +++ b/src/main/java/CSVReader.java @@ -16,12 +16,12 @@ public class CSVReader implements TickStreamReader { public void run(TickProcessor processor) { CsvMapper mapper = new CsvMapper(); - CsvSchema schema = mapper.schemaFor(TrueFXTick.class); + CsvSchema schema = mapper.schemaFor(TrueFXTickEvent.class); try { - MappingIterator it = mapper.readerFor(TrueFXTick.class).with(schema).readValues(file); + MappingIterator it = mapper.readerFor(TrueFXTickEvent.class).with(schema).readValues(file); while (it.hasNextValue()) { - TrueFXTick value = it.nextValue(); + TrueFXTickEvent value = it.nextValue(); processor.process(value); } } catch (IOException e) { diff --git a/src/main/java/DebugProcessor.java b/src/main/java/DebugProcessor.java index f614648..1cba28f 100644 --- a/src/main/java/DebugProcessor.java +++ b/src/main/java/DebugProcessor.java @@ -2,7 +2,7 @@ public class DebugProcessor implements TickProcessor { - public void process(Tick tick) { + public void process(TickEvent tick) { System.out.println(tick); } } diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index 4ed7398..a427156 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -24,27 +24,67 @@ public class EsperProcessor implements TickProcessor { // sense of time. Configuration config = new Configuration(); config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); - engine = EPServiceProviderManager.getDefaultProvider(config); + config.addEventType(TickEvent.class); + config.addEventType(LongEntryEvent.class); + //config.addVariable("FOO", int.class, 12); - engine.getEPAdministrator().getConfiguration().addEventType(Tick.class); + config.addEventType("OHLCTick", OHLCTick.class); + config.addEventType("OHLCValue", OHLCValue.class); + config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName()); + + engine = EPServiceProviderManager.getDefaultProvider(config); addStatements(epl); - EPStatement statement = engine.getEPAdministrator().createEPL("select *,in_trading_hours from Tick"); + // Map vars = config.getVariables(); + // log.debug("VAR MAP: {}", vars); + // for (Map.Entry entry : vars.entrySet()) { + // log.debug("VAR: {} = {}", entry.getKey(), entry.getValue()); + // } - statement.addListener((newData, oldData) -> { - logger.debug("event: {}", (Object)newData); - }); + addStatement("select * from TickEvent", + (newData, oldData) -> { + log.debug("Tick: {}", newData[0].getUnderlying()); + }); + + addStatement("select * from OHLCStream", + (newData, oldData) -> { + log.debug("OLHC: {}", newData[0].getUnderlying()); + // DateTime t = (DateTime)newData[0].get("date"); + // double f = (double)newData[0].get("first"); + // double l = (double)newData[0].get("last"); + // double x = (double)newData[0].get("max"); + // double n = (double)newData[0].get("min"); + // log.info("OHLCValue: {} {} {} {}", f,l,x,n); + }); + + // addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)", + // new OHLCUpdateListener()); + + // addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow", + // (newData, oldData) -> { + // log.debug("TickTimeWindow: {}", (Object)newData); + // }); + + // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); // statement.addListener((newData, oldData) -> { - // Date time = (Date) newData[0].get("time"); - // String name = (String) newData[0].get("instrument"); - // BigDecimal bid = (BigDecimal)newData[0].get("bid"); - // BigDecimal ask = (BigDecimal)newData[0].get("ask"); - // logger.info("Time: {}, Instr: {}, Bid: {}, Ask: {}", - // time, name, bid, ask); - // }); + // log.debug("event: {}", (Object)newData); + // }); + + // EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent"); + + // statement.addListener((newData, oldData) -> { + // log.debug("event: {}", (Object)newData); + // }); + + // addStatement("select count(*) as count from TickEvent#time(4 sec)", + // (newData, oldData) -> { + // long count = (long)newData[0].get("count"); + // log.info("Tick Count: {}", count); + // }); + } /** @@ -90,9 +130,28 @@ public class EsperProcessor implements TickProcessor { return engine.getEPAdministrator().createEPL(epl); } - public void process(Tick tick) { - Date time = tick.getTime(); - CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getTime()); + /** + * Add a single EPL statement to the Esper engine with a listener + * to respond to the Statement. + */ + private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) { + // log.debug("Adding statement aware statement: {}", epl); + + EPStatement statement = engine.getEPAdministrator().createEPL(epl); + + if (listener != null) { + statement.addListener(listener); + } + + return statement; + } + + /** + * Send a single TickEvent to Esper. + */ + public void process(TickEvent tick) { + DateTime time = tick.getTime(); + CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getMillis()); engine.getEPRuntime().sendEvent(timeEvent); engine.getEPRuntime().sendEvent(tick); } diff --git a/src/main/java/OANDATick.java b/src/main/java/OANDATickEvent.java similarity index 75% rename from src/main/java/OANDATick.java rename to src/main/java/OANDATickEvent.java index aa7fdbe..82a7813 100644 --- a/src/main/java/OANDATick.java +++ b/src/main/java/OANDATickEvent.java @@ -1,6 +1,6 @@ - import java.math.BigDecimal; import java.util.Date; +import org.joda.time.DateTime; import com.fasterxml.jackson.annotation.JsonFormat; @@ -9,10 +9,9 @@ import com.fasterxml.jackson.annotation.JsonFormat; * * {"type":"PRICE","time":"2018-04-05T20:35:20.983907480Z","bids":[{"price":"1.22376","liquidity":10000000}],"asks":[{"price":"1.22386","liquidity":10000000}],"closeoutBid":"1.22361","closeoutAsk":"1.22401","status":"tradeable","tradeable":true,"instrument":"EUR_USD"} */ -public class OANDATick implements Tick { +public class OANDATickEvent extends TickEvent { public String type; - @JsonFormat(pattern="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - private Date time; + private DateTime time; public PriceBucket[] bids; public PriceBucket[] asks; public BigDecimal closeoutBid; @@ -21,7 +20,7 @@ public class OANDATick implements Tick { public boolean tradeable; private String instrument; - + /** * @return the instrument */ @@ -41,14 +40,14 @@ public class OANDATick implements Tick { * @return the time */ @Override - public Date getTime() { + public DateTime getTime() { return time; } /** * @param time the time to set */ - public void setTime(Date time) { + public void setTime(DateTime time) { this.time = time; } @@ -68,7 +67,15 @@ public class OANDATick implements Tick { return asks[0].price; } + /** + * @return the midprice between bid and ask + */ + @Override + public BigDecimal getMid() { + return getMid(asks[0].price, bids[0].price); + } + public String toString() { - return String.format("OANDATick[%s,%s]", getInstrument(), getBid()); + return String.format("OANDATickEvent[%s,%s,%s]", getTime(), getInstrument(), getBid()); } } diff --git a/src/main/java/Tick.java b/src/main/java/Tick.java deleted file mode 100644 index c69a603..0000000 --- a/src/main/java/Tick.java +++ /dev/null @@ -1,26 +0,0 @@ - -import java.math.BigDecimal; -import java.util.Date; - -public interface Tick { - - /** - * @return the instrument - */ - public String getInstrument(); - - /** - * @return the time - */ - public Date getTime(); - - /** - * @return the bid - */ - public BigDecimal getBid(); - - /** - * @return the ask - */ - public BigDecimal getAsk(); -} diff --git a/src/main/java/TickEvent.java b/src/main/java/TickEvent.java new file mode 100644 index 0000000..a939820 --- /dev/null +++ b/src/main/java/TickEvent.java @@ -0,0 +1,46 @@ +import java.math.BigDecimal; +import java.util.Date; +import org.joda.time.DateTime; + +public abstract class TickEvent { + + /** + * @return the tick time + */ + public abstract DateTime getTime(); + + /** + * @return the trading instrument + */ + public abstract String getInstrument(); + + /** + * @return the bid price + */ + public abstract BigDecimal getBid(); + + /** + * @return the ask price + */ + public abstract BigDecimal getAsk(); + + /** + * @return the midprice between bid and ask + */ + public abstract BigDecimal getMid(); + + /** + * Calculate the midpoint between two values. + */ + public static BigDecimal getMid(BigDecimal a, BigDecimal b) { + BigDecimal diff = a.subtract(b).abs(); + return a.min(b).add(diff.divide(new BigDecimal(2))); + } + + /** + * Calculate the midpoint between two values. + */ + public double getMidDouble() { + return getMid().doubleValue(); + } +} diff --git a/src/main/java/TickProcessor.java b/src/main/java/TickProcessor.java index ab1aaa6..44ca55a 100644 --- a/src/main/java/TickProcessor.java +++ b/src/main/java/TickProcessor.java @@ -1,3 +1,3 @@ public interface TickProcessor { - public void process(Tick tick); + public void process(TickEvent tick); } diff --git a/src/main/java/TrueFXTick.java b/src/main/java/TrueFXTickEvent.java similarity index 66% rename from src/main/java/TrueFXTick.java rename to src/main/java/TrueFXTickEvent.java index e7ad1d9..661735b 100644 --- a/src/main/java/TrueFXTick.java +++ b/src/main/java/TrueFXTickEvent.java @@ -1,27 +1,38 @@ import java.math.BigDecimal; import java.util.Date; - import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import org.joda.time.DateTime; /** - * TrueFXTick holds a single tick read from TrueFX data archive csv - * file. + * TrueFXTickEvent holds a single tick read from TrueFX data archive + * csv file. * * A line looks like: - * EUR/USD,20170102 00:00:00.803,1.0523,1.05307 + * EUR/USD,20170102 00:00:00.803,1.0523,1.05307 */ @JsonPropertyOrder({ "instrument", "time", "bid", "ask" }) -public class TrueFXTick implements Tick { - private String instrument; +public class TrueFXTickEvent extends TickEvent { @JsonFormat(pattern="yyyyMMdd HH:mm:ss.SSS") - private Date time; + private DateTime time; + private String instrument; private BigDecimal bid; private BigDecimal ask; - public String toString() { - return String.format("TrueFXTick[%s,%s,%s,%s]", instrument, time, bid, ask); + /** + * @return the time + */ + @Override + public DateTime getTime() { + return time; + } + + /** + * @param time the time to set + */ + public void setTime(DateTime time) { + this.time = time; } /** @@ -39,21 +50,6 @@ public class TrueFXTick implements Tick { this.instrument = instrument; } - /** - * @return the time - */ - @Override - public Date getTime() { - return time; - } - - /** - * @param time the time to set - */ - public void setTime(Date time) { - this.time = time; - } - /** * @return the bid */ @@ -83,4 +79,19 @@ public class TrueFXTick implements Tick { public void setAsk(BigDecimal ask) { this.ask = ask; } + + /** + * @return the midprice between bid and ask + */ + @Override + public BigDecimal getMid() { + return getMid(ask, bid); + } + + /** + * Return a human readable representation of this event. + */ + public String toString() { + return String.format("TrueFXTickEvent[%s,%s,%s,%s]", time, instrument, bid, ask); + } }