add trading managers

This commit is contained in:
2018-09-21 10:09:18 -07:00
parent aef74dfa59
commit 6edc589248
8 changed files with 151 additions and 17 deletions

View File

@ -13,6 +13,7 @@ dependencies {
compile 'com.google.guava:guava:23.0' compile 'com.google.guava:guava:23.0'
testCompile 'junit:junit:4.12' testCompile 'junit:junit:4.12'
testCompile 'org.skyscreamer:jsonassert:1.5.0'
implementation 'com.espertech:esper:7.1.0' implementation 'com.espertech:esper:7.1.0'
implementation 'com.oanda.v20:v20:3.0.21' implementation 'com.oanda.v20:v20:3.0.21'

View File

@ -20,7 +20,7 @@ create constant variable int EndTimeHour = 17
create constant variable string OHLCInterval = '10s' create constant variable string OHLCInterval = '10s'
-- Amount to be traded, measured in units. -- Amount to be traded, measured in units.
create constant variable int TradeSize = 100000 create constant variable int TradeSize = 10
-- How many events to use for simple moving average calculation -- How many events to use for simple moving average calculation
create constant variable int SMASize = 5 create constant variable int SMASize = 5

View File

@ -34,15 +34,21 @@ public class App {
} }
TickStreamReader reader = null; TickStreamReader reader = null;
TradingManager trader = null;
if (cmdline.hasOption("historical")) { if (cmdline.hasOption("historical")) {
String csv = cmdline.getOptionValue("historical"); String csv = cmdline.getOptionValue("historical");
reader = new CSVReader(new File(csv)); reader = new CSVReader(new File(csv));
log.info("Using CSV file {}", csv); log.info("Using CSV file {}", csv);
trader = new DebugTradingManager();
log.info("Using debug trading manager");
} else { } else {
String[] instruments = getInstruments(cmdline); String[] instruments = getInstruments(cmdline);
reader = new OANDAReader(config, instruments); reader = new OANDAReader(config, instruments);
log.info("Reading from stream."); log.info("Reading from stream.");
trader = new OANDATradingManager(config);
log.info("Using OANDA trading manager");
} }
String eplFile = extraArgs[0]; String eplFile = extraArgs[0];

View File

@ -0,0 +1,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ats.orders.OrderRequest;
/**
* DebugTradingManager logs each request but has no other effect.
*/
public class DebugTradingManager implements TradingManager {
static final Logger log = LoggerFactory.getLogger("DebugTradingManager");
/**
* Place an order request.
*/
@Override
public void placeOrder(OrderRequest order) {
log.info("Place order: {}", order);
}
}

View File

@ -12,6 +12,7 @@ import com.espertech.esper.client.time.CurrentTimeEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import ats.orders.MarketOrderRequest;
import ats.plugin.OHLCEvent; import ats.plugin.OHLCEvent;
import ats.plugin.OHLCPlugInViewFactory; import ats.plugin.OHLCPlugInViewFactory;
import ats.plugin.OHLCValueEvent; import ats.plugin.OHLCValueEvent;
@ -20,9 +21,11 @@ import ats.plugin.OHLCValueEvent;
public class EsperProcessor implements TickProcessor { public class EsperProcessor implements TickProcessor {
final Logger log = LoggerFactory.getLogger(EsperProcessor.class); final Logger log = LoggerFactory.getLogger(EsperProcessor.class);
EPServiceProvider engine; EPServiceProvider engine;
TradingManager trader;
public EsperProcessor(String epl) { public EsperProcessor(String epl, TradingManager trader) {
this.trader = trader;
Configuration config = new Configuration(); Configuration config = new Configuration();
// disable esper's internal clock. we use tick data for our // disable esper's internal clock. we use tick data for our
@ -39,8 +42,6 @@ public class EsperProcessor implements TickProcessor {
engine = EPServiceProviderManager.getDefaultProvider(config); engine = EPServiceProviderManager.getDefaultProvider(config);
//engine.getEPRuntime().setVariableValue("FOO", 12d);
addStatements(epl); addStatements(epl);
// addLogStatement("TickEvent"); // addLogStatement("TickEvent");
@ -50,7 +51,13 @@ public class EsperProcessor implements TickProcessor {
// respond to long entry events // respond to long entry events
addStatement("select * from LongEntryDistinct", addStatement("select * from LongEntryDistinct",
(newData, oldData) -> { (newData, oldData) -> {
log.debug("Long entry triggered: {} at {}", String instrument = (String)newData[0].get("instrument");
Integer units = (Integer)newData[0].get("units");
trader.placeOrder(new MarketOrderRequest(instrument, units));
log.debug("Long entry triggered: {} of {} at price {} at time {}",
units,
instrument,
newData[0].get("current"), newData[0].get("current"),
newData[0].get("time")); newData[0].get("time"));
}); });

View File

@ -0,0 +1,99 @@
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ats.orders.OrderRequest;
/**
* OANDATradingManager manages orders via the OANDA API.
*/
public class OANDATradingManager implements TradingManager {
static final Logger log = LoggerFactory.getLogger("OANDATradingManager");
private Config config;
/**
* Create a new trading manager with OANDA endpoint configuration
* provided by the Config argument.
*/
public OANDATradingManager(Config config) {
this.config = config;
}
/**
* Place an order request.
*/
@Override
public void placeOrder(OrderRequest order) {
log.info("Placing order: {}", order);
try {
String orderJSON = order.toJSON();
byte[] postData = orderJSON.getBytes(StandardCharsets.UTF_8);
int postDataLength = postData.length;
URL url = new URL(orderURL());
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("charset", "utf-8");
conn.setRequestProperty("Content-Length", Integer.toString(postDataLength));
conn.setRequestProperty("Authorization", "Bearer " + config.accessToken());
conn.setUseCaches(false);
try (DataOutputStream dos = new DataOutputStream(conn.getOutputStream())) {
dos.write(postData);
dos.flush();
dos.close();
}
int responseCode = conn.getResponseCode();
log.debug("Sending request to " + url);
log.debug("Params: " + new String(postData));
log.debug("Response code: " + responseCode);
// read response
InputStream is;
if (responseCode < 400) {
log.debug("Reading input stream");
is = conn.getInputStream();
} else {
log.debug("Reading error stream");
is = conn.getErrorStream();
}
BufferedReader in = new BufferedReader(new InputStreamReader(is));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
log.info("Response: " + response);
log.info("Finished placing order.");
} catch (IOException e) {
log.warn("Error sending order", e);
}
}
/**
* Return the OANDA API endpoint for placing orders.
*/
private String orderURL() {
String type = config.isLiveAccount() ? "fxlive" : "fxpractice";
return String.format("https://api-%s.oanda.com/v3/accounts/%s/orders",
type, config.accountID());
}
}

View File

@ -0,0 +1,13 @@
import ats.orders.OrderRequest;
/**
* TradingManager describes the interface with a trading system.
*/
interface TradingManager {
/**
* Place an order request.
*/
public void placeOrder(OrderRequest order);
}

View File

@ -28,18 +28,6 @@ public class OrderRequest {
this.type = orderType; this.type = orderType;
} }
/**
* Return all properties of the request. Subclasses should
* override to add their own properties to the parent class'.
*/
// public Map<String, Object> getOrderParams() {
// Map<String, Object> params = new HashMap<>();
// params.put("type", orderType.toString());
// return params;
// }
/** /**
* Create a JSON representation of the order suitable for sending * Create a JSON representation of the order suitable for sending
* to the OANDA API. * to the OANDA API.