diff --git a/src/main/java/App.java b/src/main/java/App.java index 995dbbb..d986da4 100644 --- a/src/main/java/App.java +++ b/src/main/java/App.java @@ -18,106 +18,111 @@ import org.slf4j.LoggerFactory; * App is the entry point for the application. */ public class App { - static final Logger log = LoggerFactory.getLogger("ATS_Esper"); + static final Logger log = LoggerFactory.getLogger("ATS_Esper"); public static void main(String[] args) { - Options options = getCommandlineOptions(); - try { - CommandLineParser parser = new DefaultParser(); - CommandLine cmdline = parser.parse(options, args); - - String[] extraArgs = cmdline.getArgs(); - if (extraArgs == null || extraArgs.length < 1) { - usage("Missing EPL file.", options); - } - - String eplFile = extraArgs[0]; - TickProcessor processor = null; - try { - String epl = readFile(eplFile); - processor = new EsperProcessor(epl); - log.info("Using EPL file {}", eplFile); - } catch (IOException e) { - log.error("Error reading EPL file {}", eplFile, e); - System.exit(1); - } - - TickStreamReader reader = null; - if (cmdline.hasOption("historical")) { - String csv = cmdline.getOptionValue("historical"); - reader = new CSVReader(new File(csv)); - log.info("Using CSV file {}", csv); - String[] instruments = getInstruments(cmdline); - log.info("Reading from stream."); - } - - reader.run(processor); - } catch(ParseException e) { - usage(e.getMessage(), options); - } - } - - private static String[] getInstruments(CommandLine cmdline) { - if (cmdline.hasOption("instrument")) { - String[] instrs = cmdline.getOptionValues("instrument"); - for (String instr : instrs) { - log.info("Using instrument: {}", instr); - } - return instrs; - } else { - log.debug("Using default instrument: EUR_USD"); - return new String[] { "EUR_USD" }; - } - } - - /** - * Build and return arg parsing options. - */ - private static Options getCommandlineOptions() { - Options options = new Options(); - - // TODO: take an arg which is the OANDA authentication info - options.addOption("l", "live", false, "Process OANDA live data stream. This is the default unless the historical option is used."); - - options.addOption(Option.builder("h") - .longOpt("historical") - .argName("TrueFX.csv") - .required(false) - .hasArg() - .desc("Process historical data in TrueFX CSV format.") - .build()); - - options.addOption(Option.builder("i") - .longOpt("instrument") - .argName("EUR_USD") - .required(false) - .hasArg() - .desc("Live security to stream. In the format \"EUR_USD\".") - .valueSeparator(',') - .build()); - - return options; - } - - /** - * Return the contents of a text file. - */ - private static String readFile(String path) throws IOException - { - byte[] encoded = Files.readAllBytes(Paths.get(path)); - return new String(encoded, StandardCharsets.UTF_8); - } - - /** - * Print app command line usage info. - */ - private static void usage(String message, Options options) { - System.out.println(message); - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("java -jar ATS_Esper-all.jar [args] rules.epl", options); - System.exit(1); - } Config config = new Config(); + Options options = getCommandlineOptions(); + try { + CommandLineParser parser = new DefaultParser(); + CommandLine cmdline = parser.parse(options, args); + + String[] extraArgs = cmdline.getArgs(); + if (extraArgs == null || extraArgs.length < 1) { + usage("Missing EPL file.", options); + } + + TickStreamReader reader = null; + if (cmdline.hasOption("historical")) { + String csv = cmdline.getOptionValue("historical"); + reader = new CSVReader(new File(csv)); + log.info("Using CSV file {}", csv); + } else { + String[] instruments = getInstruments(cmdline); reader = new OANDAReader(config, instruments); + log.info("Reading from stream."); + + } + + String eplFile = extraArgs[0]; + TickProcessor processor = null; + try { + String epl = readFile(eplFile); + processor = new EsperProcessor(epl, trader); + log.info("Using EPL file {}", eplFile); + } catch (IOException e) { + log.error("Error reading EPL file {}", eplFile, e); + System.exit(1); + } + + reader.run(processor); + + // TODO: wait for all scheduled esper handlers to fire before exiting. + //try { Thread.sleep(10 * 1000); } catch (InterruptedException e) {} + } catch(ParseException e) { + usage(e.getMessage(), options); + } + } + + private static String[] getInstruments(CommandLine cmdline) { + if (cmdline.hasOption("instrument")) { + String[] instrs = cmdline.getOptionValues("instrument"); + for (String instr : instrs) { + log.info("Using instrument: {}", instr); + } + return instrs; + } else { + log.debug("Using default instrument: EUR_USD"); + return new String[] { "EUR_USD" }; + } + } + + /** + * Build and return arg parsing options. + */ + private static Options getCommandlineOptions() { + Options options = new Options(); + + // TODO: take an arg which is the OANDA authentication info + options.addOption("l", "live", false, "Process OANDA live data stream. This is the default unless the historical option is used."); + + options.addOption(Option.builder("h") + .longOpt("historical") + .argName("TrueFX.csv") + .required(false) + .hasArg() + .desc("Process historical data in TrueFX CSV format.") + .build()); + + options.addOption(Option.builder("i") + .longOpt("instrument") + .argName("EUR_USD") + .required(false) + .hasArg() + .desc("Live security to stream. In the format \"EUR_USD\".") + .valueSeparator(',') + .build()); + + return options; + } + + /** + * Return the contents of a text file. + */ + private static String readFile(String path) throws IOException + { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + return new String(encoded, StandardCharsets.UTF_8); + } + + /** + * Print app command line usage info. + */ + private static void usage(String message, Options options) { + System.out.println(message); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("java -jar ATS_Esper-all.jar [args] rules.epl", options); + System.exit(1); + } } diff --git a/src/main/java/OANDAReader.java b/src/main/java/OANDAReader.java index f7c607d..d8dec66 100644 --- a/src/main/java/OANDAReader.java +++ b/src/main/java/OANDAReader.java @@ -21,84 +21,84 @@ import org.slf4j.LoggerFactory; public class OANDAReader implements TickStreamReader { final Logger log = LoggerFactory.getLogger(OANDAReader.class); ObjectMapper mapper; - String[] instruments; Config config; + String[] instruments; public OANDAReader(Config config, String[] instruments) { this.config = config; this.instruments = instruments; - mapper = new ObjectMapper(); - mapper.registerModule(new JodaModule()); - } + mapper = new ObjectMapper(); + mapper.registerModule(new JodaModule()); + } public void run(TickProcessor processor) { - readConnection(streamURL(), processor); - } + readConnection(streamURL(), processor); + } - private String streamURL() { - String instrList = String.join(",", instruments); - String query = ""; - try { - query = String.format("instruments=%s", URLEncoder.encode(instrList, "UTF-8")); - } catch (UnsupportedEncodingException e) { - log.error("Creating stream URL", e); - } + private String streamURL() { String type = config.isLiveAccount() ? "fxlive" : "fxpractice"; + String instrList = String.join(",", instruments); + String query = ""; + try { + query = String.format("instruments=%s", URLEncoder.encode(instrList, "UTF-8")); + } catch (UnsupportedEncodingException e) { + log.error("Creating stream URL", e); + } return String.format("https://stream-%s.oanda.com/v3/accounts/%s/pricing/stream?%s", type, config.accountID(), query); } - private void readConnection(String urlStr, TickProcessor processor) { - HttpsURLConnection httpConn = null; - String line = null; - try { - URL url = new URL(urlStr); - URLConnection urlConn = url.openConnection(); - if (!(urlConn instanceof HttpsURLConnection)) { - throw new IOException ("URL is not an https URL"); - } - httpConn = (HttpsURLConnection)urlConn; - httpConn.setAllowUserInteraction(false); - //httpConn.setInstanceFollowRedirects(true); - httpConn.setRequestMethod("GET"); - //httpConn.setReadTimeout(50 * 1000); - BufferedReader is = + private void readConnection(String urlStr, TickProcessor processor) { + HttpsURLConnection httpConn = null; + String line = null; + try { + URL url = new URL(urlStr); + URLConnection urlConn = url.openConnection(); + if (!(urlConn instanceof HttpsURLConnection)) { + throw new IOException ("URL is not an https URL"); + } + httpConn = (HttpsURLConnection)urlConn; + httpConn.setAllowUserInteraction(false); + //httpConn.setInstanceFollowRedirects(true); + httpConn.setRequestMethod("GET"); + //httpConn.setReadTimeout(50 * 1000); httpConn.setRequestProperty("Authorization", "Bearer " + config.accessToken()); + BufferedReader is = new BufferedReader(new InputStreamReader(httpConn.getInputStream())); - while ((line = is.readLine( )) != null) { - processLine(line, processor); - } - } catch (IOException e) { - log.error("Reading OANDA stream", e); - } finally { - httpConn.disconnect(); - } - } + while ((line = is.readLine()) != null) { + processLine(line, processor); + } + } catch (IOException e) { + log.error("Reading OANDA stream", e); + } finally { + httpConn.disconnect(); + } + } - private void processLine(String line, TickProcessor processor) { - if (line.indexOf ("PRICE") > -1) { - OANDATickEvent tick; - try { - tick = mapper.readValue(line, OANDATickEvent.class); - processor.process(tick); - } catch (JsonParseException | JsonMappingException e) { - log.error("Parsing OANDA tick", e); - } catch (IOException e) { - log.error("Parsing OANDA tick", e); - } - } else if (line.indexOf ("HEARTBEAT") > -1) { + private void processLine(String line, TickProcessor processor) { + if (line.indexOf ("PRICE") > -1) { + OANDATickEvent tick; + try { + tick = mapper.readValue(line, OANDATickEvent.class); + processor.process(tick); + } catch (JsonParseException | JsonMappingException e) { + log.error("Parsing OANDA tick", e); + } catch (IOException e) { + log.error("Parsing OANDA tick", e); + } + } else if (line.indexOf ("HEARTBEAT") > -1) { try { OANDAHeartbeatEvent beat = mapper.readValue(line, OANDAHeartbeatEvent.class); processor.process(new CurrentTimeEvent(beat.getTime().getMillis())); - } catch (IOException e) { + } catch (IOException e) { log.error("Parsing OANDA heartbeat", e); - } - } else { - log.warn("Unknown type: {}", line); - } - } + } + } else { + log.warn("Unknown type: {}", line); + } + } }