rename *Tick classes to *TickEvent

This commit is contained in:
2018-05-26 22:50:05 -07:00
parent 89dd9575d0
commit 583d319ef3
8 changed files with 176 additions and 79 deletions

View File

@ -24,27 +24,67 @@ public class EsperProcessor implements TickProcessor {
// sense of time.
Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
engine = EPServiceProviderManager.getDefaultProvider(config);
config.addEventType(TickEvent.class);
config.addEventType(LongEntryEvent.class);
//config.addVariable("FOO", int.class, 12);
engine.getEPAdministrator().getConfiguration().addEventType(Tick.class);
config.addEventType("OHLCTick", OHLCTick.class);
config.addEventType("OHLCValue", OHLCValue.class);
config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName());
engine = EPServiceProviderManager.getDefaultProvider(config);
addStatements(epl);
EPStatement statement = engine.getEPAdministrator().createEPL("select *,in_trading_hours from Tick");
// Map<String,ConfigurationVariable> vars = config.getVariables();
// log.debug("VAR MAP: {}", vars);
// for (Map.Entry<String, ConfigurationVariable> entry : vars.entrySet()) {
// log.debug("VAR: {} = {}", entry.getKey(), entry.getValue());
// }
statement.addListener((newData, oldData) -> {
logger.debug("event: {}", (Object)newData);
});
addStatement("select * from TickEvent",
(newData, oldData) -> {
log.debug("Tick: {}", newData[0].getUnderlying());
});
addStatement("select * from OHLCStream",
(newData, oldData) -> {
log.debug("OLHC: {}", newData[0].getUnderlying());
// DateTime t = (DateTime)newData[0].get("date");
// double f = (double)newData[0].get("first");
// double l = (double)newData[0].get("last");
// double x = (double)newData[0].get("max");
// double n = (double)newData[0].get("min");
// log.info("OHLCValue: {} {} {} {}", f,l,x,n);
});
// addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)",
// new OHLCUpdateListener());
// addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow",
// (newData, oldData) -> {
// log.debug("TickTimeWindow: {}", (Object)newData);
// });
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
// statement.addListener((newData, oldData) -> {
// Date time = (Date) newData[0].get("time");
// String name = (String) newData[0].get("instrument");
// BigDecimal bid = (BigDecimal)newData[0].get("bid");
// BigDecimal ask = (BigDecimal)newData[0].get("ask");
// logger.info("Time: {}, Instr: {}, Bid: {}, Ask: {}",
// time, name, bid, ask);
// });
// log.debug("event: {}", (Object)newData);
// });
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
// statement.addListener((newData, oldData) -> {
// log.debug("event: {}", (Object)newData);
// });
// addStatement("select count(*) as count from TickEvent#time(4 sec)",
// (newData, oldData) -> {
// long count = (long)newData[0].get("count");
// log.info("Tick Count: {}", count);
// });
}
/**
@ -90,9 +130,28 @@ public class EsperProcessor implements TickProcessor {
return engine.getEPAdministrator().createEPL(epl);
}
public void process(Tick tick) {
Date time = tick.getTime();
CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getTime());
/**
* Add a single EPL statement to the Esper engine with a listener
* to respond to the Statement.
*/
private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) {
// log.debug("Adding statement aware statement: {}", epl);
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
if (listener != null) {
statement.addListener(listener);
}
return statement;
}
/**
* Send a single TickEvent to Esper.
*/
public void process(TickEvent tick) {
DateTime time = tick.getTime();
CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getMillis());
engine.getEPRuntime().sendEvent(timeEvent);
engine.getEPRuntime().sendEvent(tick);
}