EsperProcessor: tabs to spaces
This commit is contained in:
@ -26,169 +26,168 @@ public class EsperProcessor implements TickProcessor {
|
|||||||
EPServiceProvider engine;
|
EPServiceProvider engine;
|
||||||
|
|
||||||
|
|
||||||
public EsperProcessor(String epl) {
|
public EsperProcessor(String epl) {
|
||||||
// disable esper's internal clock. we use tick data for our
|
// disable esper's internal clock. we use tick data for our
|
||||||
// sense of time.
|
// sense of time.
|
||||||
Configuration config = new Configuration();
|
Configuration config = new Configuration();
|
||||||
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
|
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
|
||||||
config.addEventType(TickEvent.class);
|
config.addEventType(TickEvent.class);
|
||||||
config.addEventType(LongEntryEvent.class);
|
config.addEventType(LongEntryEvent.class);
|
||||||
//config.addVariable("FOO", int.class, 12);
|
//config.addVariable("FOO", int.class, 12);
|
||||||
|
|
||||||
config.addEventType("OHLCTick", OHLCTick.class);
|
config.addEventType("OHLCTick", OHLCTick.class);
|
||||||
config.addEventType("OHLCValue", OHLCValue.class);
|
config.addEventType("OHLCValue", OHLCValue.class);
|
||||||
config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName());
|
config.addPlugInView("ATS", "OHLC", OHLCPlugInViewFactory.class.getName());
|
||||||
|
|
||||||
engine = EPServiceProviderManager.getDefaultProvider(config);
|
engine = EPServiceProviderManager.getDefaultProvider(config);
|
||||||
|
|
||||||
addStatements(epl);
|
addStatements(epl);
|
||||||
|
|
||||||
addStatement("select * from TickEvent",
|
addStatement("select * from TickEvent",
|
||||||
(newData, oldData) -> {
|
(newData, oldData) -> {
|
||||||
log.debug("Tick: {}", newData[0].getUnderlying());
|
log.debug("Tick: {}", newData[0].getUnderlying());
|
||||||
});
|
});
|
||||||
|
|
||||||
addStatement("select * from OHLCStream",
|
addStatement("select * from OHLCStream",
|
||||||
(newData, oldData) -> {
|
(newData, oldData) -> {
|
||||||
log.debug("OLHC: {}", newData[0].getUnderlying());
|
log.debug("OLHC: {}", newData[0].getUnderlying());
|
||||||
// DateTime t = (DateTime)newData[0].get("date");
|
// DateTime t = (DateTime)newData[0].get("date");
|
||||||
// double f = (double)newData[0].get("first");
|
// double f = (double)newData[0].get("first");
|
||||||
// double l = (double)newData[0].get("last");
|
// double l = (double)newData[0].get("last");
|
||||||
// double x = (double)newData[0].get("max");
|
// double x = (double)newData[0].get("max");
|
||||||
// double n = (double)newData[0].get("min");
|
// double n = (double)newData[0].get("min");
|
||||||
// log.info("OHLCValue: {} {} {} {}", f,l,x,n);
|
// log.info("OHLCValue: {} {} {} {}", f,l,x,n);
|
||||||
});
|
});
|
||||||
|
|
||||||
// addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)",
|
// addStatement("select * from TickEvent#groupwin(instrument)#ohlcbarminute(timestamp, midDouble)",
|
||||||
// new OHLCUpdateListener());
|
// new OHLCUpdateListener());
|
||||||
|
|
||||||
// addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow",
|
// addStatement("select count(*) from TickEvent#time(4)",//"select * from TicksTimeWindow",
|
||||||
// (newData, oldData) -> {
|
// (newData, oldData) -> {
|
||||||
// log.debug("TickTimeWindow: {}", (Object)newData);
|
// log.debug("TickTimeWindow: {}", (Object)newData);
|
||||||
// });
|
// });
|
||||||
|
|
||||||
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
|
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
|
||||||
|
|
||||||
// statement.addListener((newData, oldData) -> {
|
// statement.addListener((newData, oldData) -> {
|
||||||
// log.debug("event: {}", (Object)newData);
|
// log.debug("event: {}", (Object)newData);
|
||||||
// });
|
// });
|
||||||
|
|
||||||
|
|
||||||
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
|
// EPStatement statement = engine.getEPAdministrator().createEPL("select * from TickEvent");
|
||||||
|
|
||||||
// statement.addListener((newData, oldData) -> {
|
// statement.addListener((newData, oldData) -> {
|
||||||
// log.debug("event: {}", (Object)newData);
|
// log.debug("event: {}", (Object)newData);
|
||||||
// });
|
// });
|
||||||
|
|
||||||
// addStatement("select count(*) as count from TickEvent#time(4 sec)",
|
// addStatement("select count(*) as count from TickEvent#time(4 sec)",
|
||||||
// (newData, oldData) -> {
|
// (newData, oldData) -> {
|
||||||
// long count = (long)newData[0].get("count");
|
// long count = (long)newData[0].get("count");
|
||||||
// log.info("Tick Count: {}", count);
|
// log.info("Tick Count: {}", count);
|
||||||
// });
|
// });
|
||||||
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a (possibly) multiline string of EPL statements into one
|
* Split a (possibly) multiline string of EPL statements into one
|
||||||
* statement per string. Statements are separated by at least one
|
* statement per string. Statements are separated by at least one
|
||||||
* blank line.
|
* blank line.
|
||||||
*/
|
*/
|
||||||
private static String[] splitStatements(String str) {
|
private static String[] splitStatements(String str) {
|
||||||
return str.split("(\\r?\\n){2,}");
|
return str.split("(\\r?\\n){2,}");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a string into individual lines.
|
* Split a string into individual lines.
|
||||||
*/
|
*/
|
||||||
private static String[] splitLines(String str) {
|
private static String[] splitLines(String str) {
|
||||||
return str.split("\\r?\\n");
|
return str.split("\\r?\\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return true if the given string is a comment.
|
* Return true if the given string is a comment.
|
||||||
*/
|
*/
|
||||||
private static boolean isComment(String str) {
|
private static boolean isComment(String str) {
|
||||||
return str.startsWith("//") || str.startsWith("--");
|
return str.startsWith("//") || str.startsWith("--");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return true if the given string is only whitespace.
|
* Return true if the given string is only whitespace.
|
||||||
*/
|
*/
|
||||||
private static boolean isEmpty(String str) {
|
private static boolean isEmpty(String str) {
|
||||||
return str.matches("^\\s*$");
|
return str.matches("^\\s*$");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove comment lines from the string.
|
* Remove comment lines from the string.
|
||||||
*/
|
*/
|
||||||
private static String filterComments(String str) {
|
private static String filterComments(String str) {
|
||||||
return Arrays.stream(splitLines(str))
|
return Arrays.stream(splitLines(str))
|
||||||
.filter(s -> !isComment(s))
|
.filter(s -> !isComment(s))
|
||||||
.collect(Collectors.joining("\n"));
|
.collect(Collectors.joining("\n"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a string with (possibly) multiple statements, split and
|
* Given a string with (possibly) multiple statements, split and
|
||||||
* add individually.
|
* add individually.
|
||||||
*/
|
*/
|
||||||
private void addStatements(String epl) {
|
private void addStatements(String epl) {
|
||||||
for (String s : splitStatements(filterComments(epl))) {
|
for (String s : splitStatements(filterComments(epl))) {
|
||||||
if (!isEmpty(s))
|
if (!isEmpty(s))
|
||||||
addStatement(s);
|
addStatement(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a single EPL statement to the Esper engine.
|
* Add a single EPL statement to the Esper engine.
|
||||||
*/
|
*/
|
||||||
private EPStatement addStatement(String epl) {
|
private EPStatement addStatement(String epl) {
|
||||||
return addStatement(epl, (UpdateListener)null);
|
return addStatement(epl, (UpdateListener)null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a single EPL statement to the Esper engine with a listener
|
* Add a single EPL statement to the Esper engine with a listener
|
||||||
* to respond to the Statement.
|
* to respond to the Statement.
|
||||||
*/
|
*/
|
||||||
private EPStatement addStatement(String epl, UpdateListener listener) {
|
private EPStatement addStatement(String epl, UpdateListener listener) {
|
||||||
// log.debug("Adding statement: {}", epl);
|
// log.debug("Adding statement: {}", epl);
|
||||||
|
|
||||||
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
|
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
|
||||||
|
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
statement.addListener(listener);
|
statement.addListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
return statement;
|
return statement;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a single EPL statement to the Esper engine with a listener
|
* Add a single EPL statement to the Esper engine with a listener
|
||||||
* to respond to the Statement.
|
* to respond to the Statement.
|
||||||
*/
|
*/
|
||||||
private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) {
|
private EPStatement addStatement(String epl, StatementAwareUpdateListener listener) {
|
||||||
// log.debug("Adding statement aware statement: {}", epl);
|
// log.debug("Adding statement aware statement: {}", epl);
|
||||||
|
|
||||||
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
|
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
|
||||||
|
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
statement.addListener(listener);
|
statement.addListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
return statement;
|
return statement;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a single TickEvent to Esper.
|
* Send a single TickEvent to Esper.
|
||||||
*/
|
*/
|
||||||
public void process(TickEvent tick) {
|
public void process(TickEvent tick) {
|
||||||
process(new CurrentTimeEvent(tick.getTime().getMillis()));
|
process(new CurrentTimeEvent(tick.getTime().getMillis()));
|
||||||
engine.getEPRuntime().sendEvent(tick);
|
engine.getEPRuntime().sendEvent(tick);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update Esper's internal clock.
|
* Update Esper's internal clock.
|
||||||
*/
|
*/
|
||||||
public void process(CurrentTimeEvent timeEvent) {
|
public void process(CurrentTimeEvent timeEvent) {
|
||||||
engine.getEPRuntime().sendEvent(timeEvent);
|
engine.getEPRuntime().sendEvent(timeEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user