update Esper time on OANDA heartbeat

This commit is contained in:
2018-05-30 14:21:10 -07:00
parent 26b199a4f9
commit 7fbf75bacd
6 changed files with 57 additions and 9 deletions

View File

@ -1,3 +1,4 @@
import com.espertech.esper.client.time.CurrentTimeEvent;
public class DebugProcessor implements TickProcessor { public class DebugProcessor implements TickProcessor {
@ -5,4 +6,7 @@ public class DebugProcessor implements TickProcessor {
public void process(TickEvent tick) { public void process(TickEvent tick) {
System.out.println(tick); System.out.println(tick);
} }
public void process(CurrentTimeEvent time) {
}
} }

View File

@ -181,9 +181,14 @@ public class EsperProcessor implements TickProcessor {
* Send a single TickEvent to Esper. * Send a single TickEvent to Esper.
*/ */
public void process(TickEvent tick) { public void process(TickEvent tick) {
DateTime time = tick.getTime(); process(new CurrentTimeEvent(tick.getTime().getMillis()));
CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getMillis());
engine.getEPRuntime().sendEvent(timeEvent);
engine.getEPRuntime().sendEvent(tick); engine.getEPRuntime().sendEvent(tick);
} }
/**
* Update Esper's internal clock.
*/
public void process(CurrentTimeEvent timeEvent) {
engine.getEPRuntime().sendEvent(timeEvent);
}
} }

View File

@ -0,0 +1,33 @@
import org.joda.time.DateTime;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* OANDAHeartbeatEvent is a heartbeat message delivered on the OANDA
* stream.
*
* {"type":"HEARTBEAT","time":"2018-05-27T06:42:33.820251354Z"}
*/
public class OANDAHeartbeatEvent {
public String type;
private DateTime time;
/**
* @return the time
*/
public DateTime getTime() {
return time;
}
/**
* @param time the time to set
*/
public void setTime(DateTime time) {
this.time = time;
}
public String toString() {
return String.format("OANDAHeartbeatEvent[%s]", getTime());
}
}

View File

@ -10,6 +10,7 @@ import java.net.URLEncoder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.datatype.joda.JodaModule; import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.espertech.esper.client.time.CurrentTimeEvent;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
@ -84,16 +85,19 @@ public class OANDAReader implements TickStreamReader {
OANDATickEvent tick; OANDATickEvent tick;
try { try {
tick = mapper.readValue(line, OANDATickEvent.class); tick = mapper.readValue(line, OANDATickEvent.class);
// log.info(line);
processor.process(tick); processor.process(tick);
} catch (JsonParseException | JsonMappingException e) { } catch (JsonParseException | JsonMappingException e) {
log.error("Parsing OANDA data", e); log.error("Parsing OANDA tick", e);
} catch (IOException e) { } catch (IOException e) {
log.error("Parsing OANDA data", e); log.error("Parsing OANDA tick", e);
} }
} else if (line.indexOf ("HEARTBEAT") > -1) { } else if (line.indexOf ("HEARTBEAT") > -1) {
// ignore try {
log.debug(line); OANDAHeartbeatEvent beat = mapper.readValue(line, OANDAHeartbeatEvent.class);
processor.process(new CurrentTimeEvent(beat.getTime().getMillis()));
} catch (IOException e) {
log.error("Parsing OANDA heartbeat", e);
}
} else { } else {
log.warn("Unknown type: {}", line); log.warn("Unknown type: {}", line);
} }

View File

@ -1,5 +1,4 @@
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Date;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;

View File

@ -1,3 +1,6 @@
import com.espertech.esper.client.time.CurrentTimeEvent;
public interface TickProcessor { public interface TickProcessor {
public void process(TickEvent tick); public void process(TickEvent tick);
public void process(CurrentTimeEvent time);
} }