diff --git a/build.gradle b/build.gradle index 2024dc1..535993b 100644 --- a/build.gradle +++ b/build.gradle @@ -23,14 +23,25 @@ dependencies { implementation 'com.oanda.v20:v20:3.0.21' implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.5' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.9.5' + implementation 'org.slf4j:slf4j-api:1.6.1' + implementation 'ch.qos.logback:logback-core:1.3.0-alpha4' + implementation 'ch.qos.logback:logback-classic:1.3.0-alpha4' + implementation 'org.fusesource.jansi:jansi:1.8' + implementation 'commons-cli:commons-cli:1.4' } repositories { - // Use jcenter for resolving your dependencies. - // You can declare any Maven/Ivy/file repository here. jcenter() } +run { + systemProperty 'logback.configurationFile', 'src/main/resources/logback.xml' + + if (project.hasProperty('runArgs')) { + args findProperty('runArgs').split('\\s+') + } +} + task javadocs(type: Javadoc) { source = sourceSets.main.allJava } diff --git a/logs/README b/logs/README new file mode 100644 index 0000000..f1502a0 --- /dev/null +++ b/logs/README @@ -0,0 +1,3 @@ +Runtime logs are kept in this directory. + +This file is so git can track the otherwise empty dir. diff --git a/src/main/java/App.java b/src/main/java/App.java index 5a08797..3359e35 100644 --- a/src/main/java/App.java +++ b/src/main/java/App.java @@ -1,30 +1,124 @@ -import java.io.*; -import java.net.MalformedURLException; -import java.net.SocketTimeoutException; -import java.net.URL; -import java.net.URLConnection; +import java.io.File; +import java.nio.file.Files; +import java.io.IOException; +import java.nio.file.Paths; +import java.nio.charset.StandardCharsets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.HelpFormatter; -import javax.net.ssl.HttpsURLConnection; - -import com.espertech.esper.client.EPServiceProvider; -import com.espertech.esper.client.EPServiceProviderManager; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.MappingIterator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; +/** + * App is the entry point for the application. + */ public class App { + static final Logger logger = LoggerFactory.getLogger("ATS_Esper"); + public static void main(String[] args) { - //EPServiceProvider esper = EPServiceProviderManager.getDefaultProvider(); + Options options = getCommandlineOptions(); + try { + CommandLineParser parser = new DefaultParser(); + CommandLine cmdline = parser.parse(options, args); - File f = new File("/home/alx/Nextcloud/projects/ATS_Esper/EURUSD-2017-01-small.csv"); - // new CSVReader(f).run(new DebugProcessor()); + String[] extraArgs = cmdline.getArgs(); + if (extraArgs == null || extraArgs.length < 1) { + usage("Missing EPL file.", options); + } - AccountInfo acctInfo = new AccountInfo(); - String[] instruments = new String[] { "USD_CAD", "EUR_USD", "USD_JPY" }; - new OANDAReader(acctInfo, instruments).run(new DebugProcessor()); + String eplFile = extraArgs[0]; + TickProcessor processor = null; + try { + String epl = readFile(eplFile); + processor = new EsperProcessor(epl); + logger.info("Using EPL file {}", eplFile); + } catch (IOException e) { + logger.error("Error reading EPL file {}", eplFile, e); + System.exit(1); + } + + TickStreamReader reader = null; + if (cmdline.hasOption("historical")) { + String csv = cmdline.getOptionValue("historical"); + reader = new CSVReader(new File(csv)); + logger.info("Using CSV file {}", csv); + } else { + // TODO: separate + AccountInfo acctInfo = new AccountInfo(); + String[] instruments = getInstruments(cmdline); + reader = new OANDAReader(acctInfo, instruments); + logger.info("Reading from stream."); + } + + reader.run(processor); + } catch(ParseException e) { + usage(e.getMessage(), options); + } + } + + private static String[] getInstruments(CommandLine cmdline) { + if (cmdline.hasOption("instrument")) { + String[] instrs = cmdline.getOptionValues("instrument"); + for (String instr : instrs) { + logger.info("using instrument: {}", instr); + } + return instrs; + } else { + logger.debug("Using default security: EUR_USD"); + return new String[] { "EUR_USD" }; + } + } + + /** + * Build and return arg parsing options. + */ + private static Options getCommandlineOptions() { + Options options = new Options(); + + // TODO: take an arg which is the OANDA authentication info + options.addOption("l", "live", false, "Process OANDA live data stream. This is the default unless the historical option is used."); + + options.addOption(Option.builder("h") + .longOpt("historical") + .argName("TrueFX.csv") + .required(false) + .hasArg() + .desc("Process historical data in TrueFX CSV format.") + .build()); + + options.addOption(Option.builder("i") + .longOpt("instrument") + .argName("EUR_USD [USD_CAD ...]") + .required(false) + .hasArgs() + .desc("Live security to stream. In the format \"EUR_USD\".") + .valueSeparator(',') + .build()); + + return options; + } + + /** + * Return the contents of a text file. + */ + private static String readFile(String path) throws IOException + { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + return new String(encoded, StandardCharsets.UTF_8); + } + + /** + * Print app command line usage info. + */ + private static void usage(String message, Options options) { + System.out.println(message); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("java -jar ATS_Esper-all.jar [args] rules.epl", options); + System.exit(1); } } diff --git a/src/main/java/CSVReader.java b/src/main/java/CSVReader.java index 39825e6..2e5ff3b 100644 --- a/src/main/java/CSVReader.java +++ b/src/main/java/CSVReader.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; -public class CSVReader /*implements TickStreamReader*/ { +public class CSVReader implements TickStreamReader { File file; diff --git a/src/main/java/EPLHelpers.java b/src/main/java/EPLHelpers.java new file mode 100644 index 0000000..fcb3218 --- /dev/null +++ b/src/main/java/EPLHelpers.java @@ -0,0 +1,12 @@ +import java.util.Date; +import java.util.Calendar; + + +public class EPLHelpers { + + public static int getHour(Date date) { + Calendar c = Calendar.getInstance(); + c.setTime(date); + return c.get(Calendar.HOUR_OF_DAY); + } +} diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java new file mode 100644 index 0000000..4ed7398 --- /dev/null +++ b/src/main/java/EsperProcessor.java @@ -0,0 +1,99 @@ +import com.espertech.esper.client.Configuration; +import com.espertech.esper.client.EPServiceProvider; +import com.espertech.esper.client.EPServiceProviderManager; +import com.espertech.esper.client.EPStatement; +import java.math.BigDecimal; +import java.io.File; +import java.util.Date; +import com.espertech.esper.client.time.CurrentTimeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.stream.Stream; +import java.util.stream.Collectors; + + +public class EsperProcessor implements TickProcessor { + final Logger logger = LoggerFactory.getLogger(EsperProcessor.class); + EPServiceProvider engine; + + + public EsperProcessor(String epl) { + // disable esper's internal clock. we use tick data for our + // sense of time. + Configuration config = new Configuration(); + config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); + engine = EPServiceProviderManager.getDefaultProvider(config); + + engine.getEPAdministrator().getConfiguration().addEventType(Tick.class); + + addStatements(epl); + + EPStatement statement = engine.getEPAdministrator().createEPL("select *,in_trading_hours from Tick"); + + statement.addListener((newData, oldData) -> { + logger.debug("event: {}", (Object)newData); + }); + + // statement.addListener((newData, oldData) -> { + // Date time = (Date) newData[0].get("time"); + // String name = (String) newData[0].get("instrument"); + // BigDecimal bid = (BigDecimal)newData[0].get("bid"); + // BigDecimal ask = (BigDecimal)newData[0].get("ask"); + // logger.info("Time: {}, Instr: {}, Bid: {}, Ask: {}", + // time, name, bid, ask); + // }); + + } + + /** + * Split a (possibly) multiline string of EPL statements into one + * statement per string. Statements are separated by at least one + * blank line. + */ + private static String[] splitStatements(String str) { + return str.split("(\\r?\\n){2,}"); + } + + /** + * Split a string into individual lines. + */ + private static String[] splitLines(String str) { + return str.split("\\r?\\n"); + } + + /** + * Return true if the given string is a comment. + */ + private static boolean isComment(String str) { + return str.startsWith("//") || str.startsWith("--"); + } + + /** + * Remove comment lines from the string. + */ + private static String filterComments(String str) { + return Arrays.stream(splitLines(str)) + .filter(s -> !isComment(s)) + .collect(Collectors.joining("\n")); + } + + private void addStatements(String epl) { + for (String s : splitStatements(filterComments(epl))) { + addStatement(s); + } + } + + private EPStatement addStatement(String epl) { + logger.debug("Adding statement: {}", epl); + return engine.getEPAdministrator().createEPL(epl); + } + + public void process(Tick tick) { + Date time = tick.getTime(); + CurrentTimeEvent timeEvent = new CurrentTimeEvent(time.getTime()); + engine.getEPRuntime().sendEvent(timeEvent); + engine.getEPRuntime().sendEvent(tick); + } +} diff --git a/src/main/java/OANDAReader.java b/src/main/java/OANDAReader.java index 7c1ba41..39f6516 100644 --- a/src/main/java/OANDAReader.java +++ b/src/main/java/OANDAReader.java @@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; -public class OANDAReader { +public class OANDAReader implements TickStreamReader { AccountInfo accountInfo; String[] instruments; diff --git a/src/main/java/TickStreamReader.java b/src/main/java/TickStreamReader.java new file mode 100644 index 0000000..c33fefb --- /dev/null +++ b/src/main/java/TickStreamReader.java @@ -0,0 +1,4 @@ + +interface TickStreamReader { + public void run(TickProcessor processor); +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..4b2c13d --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,37 @@ + + + + + + + true + + + + %highlight(%-5level) %cyan(%logger{15}) - %msg %n + + + + + logs/run.log + false + + %date %-5level %logger{35}: %msg%n + + + + + + + + + + + + + diff --git a/test.epl b/test.epl new file mode 100644 index 0000000..bfb83d0 --- /dev/null +++ b/test.epl @@ -0,0 +1,6 @@ +-- in_trading_hours is set to true if the current time +-- is inside the normal +create variable bool in_trading_hours + +-- update in_trading_hours variable on each tick +on Tick as t set in_trading_hours = (EPLHelpers.getHour(t.time) >= 9 and EPLHelpers.getHour(t.time) < 17)