diff --git a/src/main/java/DebugProcessor.java b/src/main/java/DebugProcessor.java index 1cba28f..0da4b91 100644 --- a/src/main/java/DebugProcessor.java +++ b/src/main/java/DebugProcessor.java @@ -1,3 +1,4 @@ +import com.espertech.esper.client.time.CurrentTimeEvent; public class DebugProcessor implements TickProcessor { @@ -5,4 +6,7 @@ public class DebugProcessor implements TickProcessor { public void process(TickEvent tick) { System.out.println(tick); } + + public void process(CurrentTimeEvent time) { + } } diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index 15a4760..834f4df 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -181,9 +181,14 @@ public class EsperProcessor implements TickProcessor { * Send a single TickEvent to Esper. */ public void process(TickEvent tick) { - DateTime time = tick.getTime(); - CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getMillis()); - engine.getEPRuntime().sendEvent(timeEvent); + process(new CurrentTimeEvent(tick.getTime().getMillis())); engine.getEPRuntime().sendEvent(tick); } + + /** + * Update Esper's internal clock. + */ + public void process(CurrentTimeEvent timeEvent) { + engine.getEPRuntime().sendEvent(timeEvent); + } } diff --git a/src/main/java/OANDAHeartbeatEvent.java b/src/main/java/OANDAHeartbeatEvent.java new file mode 100644 index 0000000..7a23ccf --- /dev/null +++ b/src/main/java/OANDAHeartbeatEvent.java @@ -0,0 +1,33 @@ +import org.joda.time.DateTime; + +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * OANDAHeartbeatEvent is a heartbeat message delivered on the OANDA + * stream. + * + * {"type":"HEARTBEAT","time":"2018-05-27T06:42:33.820251354Z"} + */ +public class OANDAHeartbeatEvent { + public String type; + private DateTime time; + + + /** + * @return the time + */ + public DateTime getTime() { + return time; + } + + /** + * @param time the time to set + */ + public void setTime(DateTime time) { + this.time = time; + } + + public String toString() { + return String.format("OANDAHeartbeatEvent[%s]", getTime()); + } +} diff --git a/src/main/java/OANDAReader.java b/src/main/java/OANDAReader.java index d126fdb..de4313b 100644 --- a/src/main/java/OANDAReader.java +++ b/src/main/java/OANDAReader.java @@ -10,6 +10,7 @@ import java.net.URLEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.espertech.esper.client.time.CurrentTimeEvent; import javax.net.ssl.HttpsURLConnection; @@ -84,16 +85,19 @@ public class OANDAReader implements TickStreamReader { OANDATickEvent tick; try { tick = mapper.readValue(line, OANDATickEvent.class); - // log.info(line); processor.process(tick); } catch (JsonParseException | JsonMappingException e) { - log.error("Parsing OANDA data", e); + log.error("Parsing OANDA tick", e); } catch (IOException e) { - log.error("Parsing OANDA data", e); + log.error("Parsing OANDA tick", e); } } else if (line.indexOf ("HEARTBEAT") > -1) { - // ignore - log.debug(line); + try { + OANDAHeartbeatEvent beat = mapper.readValue(line, OANDAHeartbeatEvent.class); + processor.process(new CurrentTimeEvent(beat.getTime().getMillis())); + } catch (IOException e) { + log.error("Parsing OANDA heartbeat", e); + } } else { log.warn("Unknown type: {}", line); } diff --git a/src/main/java/OANDATickEvent.java b/src/main/java/OANDATickEvent.java index 82a7813..8345b7f 100644 --- a/src/main/java/OANDATickEvent.java +++ b/src/main/java/OANDATickEvent.java @@ -1,5 +1,4 @@ import java.math.BigDecimal; -import java.util.Date; import org.joda.time.DateTime; import com.fasterxml.jackson.annotation.JsonFormat; diff --git a/src/main/java/TickProcessor.java b/src/main/java/TickProcessor.java index 44ca55a..e0e0055 100644 --- a/src/main/java/TickProcessor.java +++ b/src/main/java/TickProcessor.java @@ -1,3 +1,6 @@ +import com.espertech.esper.client.time.CurrentTimeEvent; + public interface TickProcessor { public void process(TickEvent tick); + public void process(CurrentTimeEvent time); }