hook VizDataStream to the viz log file

This commit is contained in:
2019-02-12 09:57:56 -08:00
parent 4444482efa
commit a92a3e4788
2 changed files with 79 additions and 21 deletions

View File

@ -400,6 +400,61 @@ insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as
-- insert into LogStream select 'LXPrice' as stream, EPLHelpers.str(mid, LongEntryPrice, LongEntryPrice - (10 * PipSize)) as event from TickEvent -- insert into LogStream select 'LXPrice' as stream, EPLHelpers.str(mid, LongEntryPrice, LongEntryPrice - (10 * PipSize)) as event from TickEvent
-- where InLongEntry and mid < LongEntryPrice -- where InLongEntry and mid < LongEntryPrice
insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream -- insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream
-- insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream
-- on LongEntryDistinct
-- insert into LogStream select 'OrderTable' as stream,
-- EPLHelpers.str(OrderTable.time,
-- OrderTable.instrument,
-- OrderTable.id,
-- OrderTable.units,
-- OrderTable.open,
-- OrderTable.stopBarCount,
-- OrderTable.price) as event
-- from OrderTable order by OrderTable.time
-- @Priority(-9999)
-- @Name("LogOrderTableClose")
-- on LongExitStream
-- insert into LogStream select 'OrderTable_CLOSE' as stream,
-- EPLHelpers.str(OrderTable.time,
-- OrderTable.instrument,
-- OrderTable.id,
-- OrderTable.units,
-- OrderTable.open,
-- OrderTable.stopBarCount,
-- OrderTable.price) as event
-- from OrderTable order by OrderTable.time
-- on LongExitStream
-- insert into LogStream select 'OrderTable' as stream, EPLHelpers.str(count(*)) as event from OrderTable
-- insert into LogStream select 'OpenOrderStream' as stream, EPLHelpers.str(*) as event from OpenOrderStream
-- insert into LogStream select 'CloseOrderStream' as stream, EPLHelpers.str(*) as event from CloseOrderStream
-- TODO (for Seth): look into LogSink http://esper.espertech.com/release-7.1.0/esper-reference/html/dataflow.html#dataflow-reference-logsink -- TODO (for Seth): look into LogSink http://esper.espertech.com/release-7.1.0/esper-reference/html/dataflow.html#dataflow-reference-logsink
--
-- Visualization data logging
--
create schema VizDataStream as (event Map)
-- Enable visualizing the events on specific streams by uncommenting
-- individual lines below.
insert into VizDataStream select ats.viz.VizLog.eventString('tick', 'C', *) as event from TickEvent
insert into VizDataStream select ats.viz.VizLog.eventString('bar', 'ohlc', *) as event from OHLCStream
-- insert into VizDataStream select 'le' as tag, VizLog.eventString(*) as event from LongEntryDistinct
-- insert into VizDataStream select 'lx' as tag, VizLog.eventString(*) as event from LongExitStream
insert into VizDataStream select ats.viz.VizLog.eventString('event', 'open', *) as event from OpenOrderStream
insert into VizDataStream select ats.viz.VizLog.eventString('event', 'close', *) as event from CloseOrderStream

View File

@ -1,9 +1,11 @@
import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.espertech.esper.client.Configuration; import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPOnDemandPreparedQueryParameterized; import com.espertech.esper.client.EPOnDemandPreparedQueryParameterized;
import com.espertech.esper.client.EPOnDemandQueryResult;
import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EPStatement;
@ -18,11 +20,11 @@ import org.slf4j.LoggerFactory;
import ats.plugin.OHLCEvent; import ats.plugin.OHLCEvent;
import ats.plugin.OHLCPlugInViewFactory; import ats.plugin.OHLCPlugInViewFactory;
import ats.viz.VizLog;
public class EsperProcessor implements TickProcessor { public class EsperProcessor implements TickProcessor {
static final Logger log = LoggerFactory.getLogger(EsperProcessor.class); static final Logger log = LoggerFactory.getLogger(EsperProcessor.class);
static final Logger viz = LoggerFactory.getLogger(Viz.LOG_NAME);
EPServiceProvider engine; EPServiceProvider engine;
TradingManager trader; TradingManager trader;
@ -48,6 +50,12 @@ public class EsperProcessor implements TickProcessor {
addStatements(epl); addStatements(epl);
// prevents errors on updating stopBarCount
Map<String,Object> values = new HashMap<String,Object>();
values.put("id", "dummy");
values.put("units", new BigDecimal(0));
sendEvent("OpenOrderStream", values);
// addLogStatement("TickEvent"); // addLogStatement("TickEvent");
addVizDataStreamHandler(); addVizDataStreamHandler();
@ -169,9 +177,7 @@ public class EsperProcessor implements TickProcessor {
*/ */
private EPStatement addVizDataStreamHandler() { private EPStatement addVizDataStreamHandler() {
return addStatement("select * from VizDataStream", return addStatement("select * from VizDataStream",
(newData, oldData) -> { (newData, oldData) -> { VizLog.log(newData); });
viz.info(newData[0].get("event").toString());
});
} }
/** /**
@ -181,7 +187,9 @@ public class EsperProcessor implements TickProcessor {
private EPStatement addLogStreamHandler() { private EPStatement addLogStreamHandler() {
return addStatement("select * from LogStream", return addStatement("select * from LogStream",
(newData, oldData) -> { (newData, oldData) -> {
log.debug("{}: {}", newData[0].get("stream"), newData[0].get("event")); log.debug("{}: {}",
newData[0].get("stream"),
newData[0].get("event"));
}); });
} }
@ -216,33 +224,28 @@ public class EsperProcessor implements TickProcessor {
* Add a single fire and forget EPL statement to the Esper engine. * Add a single fire and forget EPL statement to the Esper engine.
*/ */
public void executeQuery(String query) { public void executeQuery(String query) {
// EPStatement statement = engine.getEPAdministrator().create(query); engine.getEPRuntime().executeQuery(query);
// log.debug("Adding query: {}", statement.getText());
EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query);
log.info(result.toString());
} }
/** /**
* Add a single fire and forget EPL statement to the Esper engine. * Add a single fire and forget EPL statement to the Esper engine.
*/ */
public void executeQuery(EPStatementObjectModel query) { public void executeQuery(EPStatementObjectModel query) {
// EPStatement statement = engine.getEPAdministrator().create(query); engine.getEPRuntime().executeQuery(query);
// log.debug("Adding query: {}", statement.getText());
EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query);
log.info(result.toString());
} }
/** /**
* Add a single fire and forget EPL statement to the Esper engine. * Add a single fire and forget EPL statement to the Esper engine.
*/ */
public void executeQuery(EPOnDemandPreparedQueryParameterized query) { public void executeQuery(EPOnDemandPreparedQueryParameterized query) {
// EPStatement statement = engine.getEPAdministrator().create(query); engine.getEPRuntime().executeQuery(query);
// log.debug("Adding query: {}", statement.getText()); }
EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query); /**
log.info(result.toString()); * Send an event to the Esper runtime.
*/
public void sendEvent(String name, Map values) {
engine.getEPRuntime().sendEvent(values, name);
} }
/** /**