Files
ATS_Esper/src/main/java/EsperProcessor.java

100 lines
2.9 KiB
Java

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import java.math.BigDecimal;
import java.io.File;
import java.util.Date;
import com.espertech.esper.client.time.CurrentTimeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.stream.Stream;
import java.util.stream.Collectors;
public class EsperProcessor implements TickProcessor {
final Logger logger = LoggerFactory.getLogger(EsperProcessor.class);
EPServiceProvider engine;
public EsperProcessor(String epl) {
// disable esper's internal clock. we use tick data for our
// sense of time.
Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
engine = EPServiceProviderManager.getDefaultProvider(config);
engine.getEPAdministrator().getConfiguration().addEventType(Tick.class);
addStatements(epl);
EPStatement statement = engine.getEPAdministrator().createEPL("select *,in_trading_hours from Tick");
statement.addListener((newData, oldData) -> {
logger.debug("event: {}", (Object)newData);
});
// statement.addListener((newData, oldData) -> {
// Date time = (Date) newData[0].get("time");
// String name = (String) newData[0].get("instrument");
// BigDecimal bid = (BigDecimal)newData[0].get("bid");
// BigDecimal ask = (BigDecimal)newData[0].get("ask");
// logger.info("Time: {}, Instr: {}, Bid: {}, Ask: {}",
// time, name, bid, ask);
// });
}
/**
* Split a (possibly) multiline string of EPL statements into one
* statement per string. Statements are separated by at least one
* blank line.
*/
private static String[] splitStatements(String str) {
return str.split("(\\r?\\n){2,}");
}
/**
* Split a string into individual lines.
*/
private static String[] splitLines(String str) {
return str.split("\\r?\\n");
}
/**
* Return true if the given string is a comment.
*/
private static boolean isComment(String str) {
return str.startsWith("//") || str.startsWith("--");
}
/**
* Remove comment lines from the string.
*/
private static String filterComments(String str) {
return Arrays.stream(splitLines(str))
.filter(s -> !isComment(s))
.collect(Collectors.joining("\n"));
}
private void addStatements(String epl) {
for (String s : splitStatements(filterComments(epl))) {
addStatement(s);
}
}
private EPStatement addStatement(String epl) {
logger.debug("Adding statement: {}", epl);
return engine.getEPAdministrator().createEPL(epl);
}
public void process(Tick tick) {
Date time = tick.getTime();
CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getTime());
engine.getEPRuntime().sendEvent(timeEvent);
engine.getEPRuntime().sendEvent(tick);
}
}