fix whitespace

This commit is contained in:
2018-09-06 16:35:53 -07:00
parent 45a215c15b
commit 7fe1b9a48d
2 changed files with 158 additions and 153 deletions

View File

@ -18,106 +18,111 @@ import org.slf4j.LoggerFactory;
* App is the entry point for the application.
*/
public class App {
static final Logger log = LoggerFactory.getLogger("ATS_Esper");
static final Logger log = LoggerFactory.getLogger("ATS_Esper");
public static void main(String[] args) {
Options options = getCommandlineOptions();
try {
CommandLineParser parser = new DefaultParser();
CommandLine cmdline = parser.parse(options, args);
String[] extraArgs = cmdline.getArgs();
if (extraArgs == null || extraArgs.length < 1) {
usage("Missing EPL file.", options);
}
String eplFile = extraArgs[0];
TickProcessor processor = null;
try {
String epl = readFile(eplFile);
processor = new EsperProcessor(epl);
log.info("Using EPL file {}", eplFile);
} catch (IOException e) {
log.error("Error reading EPL file {}", eplFile, e);
System.exit(1);
}
TickStreamReader reader = null;
if (cmdline.hasOption("historical")) {
String csv = cmdline.getOptionValue("historical");
reader = new CSVReader(new File(csv));
log.info("Using CSV file {}", csv);
String[] instruments = getInstruments(cmdline);
log.info("Reading from stream.");
}
reader.run(processor);
} catch(ParseException e) {
usage(e.getMessage(), options);
}
}
private static String[] getInstruments(CommandLine cmdline) {
if (cmdline.hasOption("instrument")) {
String[] instrs = cmdline.getOptionValues("instrument");
for (String instr : instrs) {
log.info("Using instrument: {}", instr);
}
return instrs;
} else {
log.debug("Using default instrument: EUR_USD");
return new String[] { "EUR_USD" };
}
}
/**
* Build and return arg parsing options.
*/
private static Options getCommandlineOptions() {
Options options = new Options();
// TODO: take an arg which is the OANDA authentication info
options.addOption("l", "live", false, "Process OANDA live data stream. This is the default unless the historical option is used.");
options.addOption(Option.builder("h")
.longOpt("historical")
.argName("TrueFX.csv")
.required(false)
.hasArg()
.desc("Process historical data in TrueFX CSV format.")
.build());
options.addOption(Option.builder("i")
.longOpt("instrument")
.argName("EUR_USD")
.required(false)
.hasArg()
.desc("Live security to stream. In the format \"EUR_USD\".")
.valueSeparator(',')
.build());
return options;
}
/**
* Return the contents of a text file.
*/
private static String readFile(String path) throws IOException
{
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, StandardCharsets.UTF_8);
}
/**
* Print app command line usage info.
*/
private static void usage(String message, Options options) {
System.out.println(message);
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("java -jar ATS_Esper-all.jar [args] rules.epl", options);
System.exit(1);
}
Config config = new Config();
Options options = getCommandlineOptions();
try {
CommandLineParser parser = new DefaultParser();
CommandLine cmdline = parser.parse(options, args);
String[] extraArgs = cmdline.getArgs();
if (extraArgs == null || extraArgs.length < 1) {
usage("Missing EPL file.", options);
}
TickStreamReader reader = null;
if (cmdline.hasOption("historical")) {
String csv = cmdline.getOptionValue("historical");
reader = new CSVReader(new File(csv));
log.info("Using CSV file {}", csv);
} else {
String[] instruments = getInstruments(cmdline);
reader = new OANDAReader(config, instruments);
log.info("Reading from stream.");
}
String eplFile = extraArgs[0];
TickProcessor processor = null;
try {
String epl = readFile(eplFile);
processor = new EsperProcessor(epl, trader);
log.info("Using EPL file {}", eplFile);
} catch (IOException e) {
log.error("Error reading EPL file {}", eplFile, e);
System.exit(1);
}
reader.run(processor);
// TODO: wait for all scheduled esper handlers to fire before exiting.
//try { Thread.sleep(10 * 1000); } catch (InterruptedException e) {}
} catch(ParseException e) {
usage(e.getMessage(), options);
}
}
private static String[] getInstruments(CommandLine cmdline) {
if (cmdline.hasOption("instrument")) {
String[] instrs = cmdline.getOptionValues("instrument");
for (String instr : instrs) {
log.info("Using instrument: {}", instr);
}
return instrs;
} else {
log.debug("Using default instrument: EUR_USD");
return new String[] { "EUR_USD" };
}
}
/**
* Build and return arg parsing options.
*/
private static Options getCommandlineOptions() {
Options options = new Options();
// TODO: take an arg which is the OANDA authentication info
options.addOption("l", "live", false, "Process OANDA live data stream. This is the default unless the historical option is used.");
options.addOption(Option.builder("h")
.longOpt("historical")
.argName("TrueFX.csv")
.required(false)
.hasArg()
.desc("Process historical data in TrueFX CSV format.")
.build());
options.addOption(Option.builder("i")
.longOpt("instrument")
.argName("EUR_USD")
.required(false)
.hasArg()
.desc("Live security to stream. In the format \"EUR_USD\".")
.valueSeparator(',')
.build());
return options;
}
/**
* Return the contents of a text file.
*/
private static String readFile(String path) throws IOException
{
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, StandardCharsets.UTF_8);
}
/**
* Print app command line usage info.
*/
private static void usage(String message, Options options) {
System.out.println(message);
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("java -jar ATS_Esper-all.jar [args] rules.epl", options);
System.exit(1);
}
}

View File

@ -21,84 +21,84 @@ import org.slf4j.LoggerFactory;
public class OANDAReader implements TickStreamReader {
final Logger log = LoggerFactory.getLogger(OANDAReader.class);
ObjectMapper mapper;
String[] instruments;
Config config;
String[] instruments;
public OANDAReader(Config config, String[] instruments) {
this.config = config;
this.instruments = instruments;
mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
}
mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
}
public void run(TickProcessor processor) {
readConnection(streamURL(), processor);
}
readConnection(streamURL(), processor);
}
private String streamURL() {
String instrList = String.join(",", instruments);
String query = "";
try {
query = String.format("instruments=%s", URLEncoder.encode(instrList, "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("Creating stream URL", e);
}
private String streamURL() {
String type = config.isLiveAccount() ? "fxlive" : "fxpractice";
String instrList = String.join(",", instruments);
String query = "";
try {
query = String.format("instruments=%s", URLEncoder.encode(instrList, "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error("Creating stream URL", e);
}
return String.format("https://stream-%s.oanda.com/v3/accounts/%s/pricing/stream?%s",
type, config.accountID(), query);
}
private void readConnection(String urlStr, TickProcessor processor) {
HttpsURLConnection httpConn = null;
String line = null;
try {
URL url = new URL(urlStr);
URLConnection urlConn = url.openConnection();
if (!(urlConn instanceof HttpsURLConnection)) {
throw new IOException ("URL is not an https URL");
}
httpConn = (HttpsURLConnection)urlConn;
httpConn.setAllowUserInteraction(false);
//httpConn.setInstanceFollowRedirects(true);
httpConn.setRequestMethod("GET");
//httpConn.setReadTimeout(50 * 1000);
BufferedReader is =
private void readConnection(String urlStr, TickProcessor processor) {
HttpsURLConnection httpConn = null;
String line = null;
try {
URL url = new URL(urlStr);
URLConnection urlConn = url.openConnection();
if (!(urlConn instanceof HttpsURLConnection)) {
throw new IOException ("URL is not an https URL");
}
httpConn = (HttpsURLConnection)urlConn;
httpConn.setAllowUserInteraction(false);
//httpConn.setInstanceFollowRedirects(true);
httpConn.setRequestMethod("GET");
//httpConn.setReadTimeout(50 * 1000);
httpConn.setRequestProperty("Authorization", "Bearer " + config.accessToken());
BufferedReader is =
new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
while ((line = is.readLine( )) != null) {
processLine(line, processor);
}
} catch (IOException e) {
log.error("Reading OANDA stream", e);
} finally {
httpConn.disconnect();
}
}
while ((line = is.readLine()) != null) {
processLine(line, processor);
}
} catch (IOException e) {
log.error("Reading OANDA stream", e);
} finally {
httpConn.disconnect();
}
}
private void processLine(String line, TickProcessor processor) {
if (line.indexOf ("PRICE") > -1) {
OANDATickEvent tick;
try {
tick = mapper.readValue(line, OANDATickEvent.class);
processor.process(tick);
} catch (JsonParseException | JsonMappingException e) {
log.error("Parsing OANDA tick", e);
} catch (IOException e) {
log.error("Parsing OANDA tick", e);
}
} else if (line.indexOf ("HEARTBEAT") > -1) {
private void processLine(String line, TickProcessor processor) {
if (line.indexOf ("PRICE") > -1) {
OANDATickEvent tick;
try {
tick = mapper.readValue(line, OANDATickEvent.class);
processor.process(tick);
} catch (JsonParseException | JsonMappingException e) {
log.error("Parsing OANDA tick", e);
} catch (IOException e) {
log.error("Parsing OANDA tick", e);
}
} else if (line.indexOf ("HEARTBEAT") > -1) {
try {
OANDAHeartbeatEvent beat = mapper.readValue(line, OANDAHeartbeatEvent.class);
processor.process(new CurrentTimeEvent(beat.getTime().getMillis()));
} catch (IOException e) {
} catch (IOException e) {
log.error("Parsing OANDA heartbeat", e);
}
} else {
log.warn("Unknown type: {}", line);
}
}
}
} else {
log.warn("Unknown type: {}", line);
}
}
}