From a92a3e4788cc07264dec99087d6efe6377c80edc Mon Sep 17 00:00:00 2001 From: Seth Ladygo Date: Tue, 12 Feb 2019 09:57:56 -0800 Subject: [PATCH] hook VizDataStream to the viz log file --- epl/trading_system_1.epl | 57 ++++++++++++++++++++++++++++++- src/main/java/EsperProcessor.java | 43 ++++++++++++----------- 2 files changed, 79 insertions(+), 21 deletions(-) diff --git a/epl/trading_system_1.epl b/epl/trading_system_1.epl index 3d25e63..fc74e2b 100644 --- a/epl/trading_system_1.epl +++ b/epl/trading_system_1.epl @@ -400,6 +400,61 @@ insert into LogStream select 'LongEntryDistinct' as stream, EPLHelpers.str(*) as -- insert into LogStream select 'LXPrice' as stream, EPLHelpers.str(mid, LongEntryPrice, LongEntryPrice - (10 * PipSize)) as event from TickEvent -- where InLongEntry and mid < LongEntryPrice -insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream +-- insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream + +-- insert into LogStream select 'LongExitStream' as stream, EPLHelpers.str(*) as event from LongExitStream + +-- on LongEntryDistinct +-- insert into LogStream select 'OrderTable' as stream, +-- EPLHelpers.str(OrderTable.time, +-- OrderTable.instrument, +-- OrderTable.id, +-- OrderTable.units, +-- OrderTable.open, +-- OrderTable.stopBarCount, +-- OrderTable.price) as event +-- from OrderTable order by OrderTable.time + +-- @Priority(-9999) +-- @Name("LogOrderTableClose") +-- on LongExitStream +-- insert into LogStream select 'OrderTable_CLOSE' as stream, +-- EPLHelpers.str(OrderTable.time, +-- OrderTable.instrument, +-- OrderTable.id, +-- OrderTable.units, +-- OrderTable.open, +-- OrderTable.stopBarCount, +-- OrderTable.price) as event +-- from OrderTable order by OrderTable.time + +-- on LongExitStream +-- insert into LogStream select 'OrderTable' as stream, EPLHelpers.str(count(*)) as event from OrderTable + +-- insert into LogStream select 'OpenOrderStream' as stream, EPLHelpers.str(*) as event from OpenOrderStream + +-- insert into LogStream select 'CloseOrderStream' as stream, EPLHelpers.str(*) as event from CloseOrderStream -- TODO (for Seth): look into LogSink http://esper.espertech.com/release-7.1.0/esper-reference/html/dataflow.html#dataflow-reference-logsink + + +-- +-- Visualization data logging +-- + +create schema VizDataStream as (event Map) + +-- Enable visualizing the events on specific streams by uncommenting +-- individual lines below. + +insert into VizDataStream select ats.viz.VizLog.eventString('tick', 'C', *) as event from TickEvent + +insert into VizDataStream select ats.viz.VizLog.eventString('bar', 'ohlc', *) as event from OHLCStream + +-- insert into VizDataStream select 'le' as tag, VizLog.eventString(*) as event from LongEntryDistinct + +-- insert into VizDataStream select 'lx' as tag, VizLog.eventString(*) as event from LongExitStream + +insert into VizDataStream select ats.viz.VizLog.eventString('event', 'open', *) as event from OpenOrderStream + +insert into VizDataStream select ats.viz.VizLog.eventString('event', 'close', *) as event from CloseOrderStream diff --git a/src/main/java/EsperProcessor.java b/src/main/java/EsperProcessor.java index d24e5f3..3c1455c 100644 --- a/src/main/java/EsperProcessor.java +++ b/src/main/java/EsperProcessor.java @@ -1,9 +1,11 @@ +import java.math.BigDecimal; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPOnDemandPreparedQueryParameterized; -import com.espertech.esper.client.EPOnDemandQueryResult; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; @@ -18,11 +20,11 @@ import org.slf4j.LoggerFactory; import ats.plugin.OHLCEvent; import ats.plugin.OHLCPlugInViewFactory; +import ats.viz.VizLog; public class EsperProcessor implements TickProcessor { static final Logger log = LoggerFactory.getLogger(EsperProcessor.class); - static final Logger viz = LoggerFactory.getLogger(Viz.LOG_NAME); EPServiceProvider engine; TradingManager trader; @@ -48,6 +50,12 @@ public class EsperProcessor implements TickProcessor { addStatements(epl); + // prevents errors on updating stopBarCount + Map values = new HashMap(); + values.put("id", "dummy"); + values.put("units", new BigDecimal(0)); + sendEvent("OpenOrderStream", values); + // addLogStatement("TickEvent"); addVizDataStreamHandler(); @@ -169,9 +177,7 @@ public class EsperProcessor implements TickProcessor { */ private EPStatement addVizDataStreamHandler() { return addStatement("select * from VizDataStream", - (newData, oldData) -> { - viz.info(newData[0].get("event").toString()); - }); + (newData, oldData) -> { VizLog.log(newData); }); } /** @@ -181,7 +187,9 @@ public class EsperProcessor implements TickProcessor { private EPStatement addLogStreamHandler() { return addStatement("select * from LogStream", (newData, oldData) -> { - log.debug("{}: {}", newData[0].get("stream"), newData[0].get("event")); + log.debug("{}: {}", + newData[0].get("stream"), + newData[0].get("event")); }); } @@ -216,33 +224,28 @@ public class EsperProcessor implements TickProcessor { * Add a single fire and forget EPL statement to the Esper engine. */ public void executeQuery(String query) { - // EPStatement statement = engine.getEPAdministrator().create(query); - // log.debug("Adding query: {}", statement.getText()); - - EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query); - log.info(result.toString()); + engine.getEPRuntime().executeQuery(query); } /** * Add a single fire and forget EPL statement to the Esper engine. */ public void executeQuery(EPStatementObjectModel query) { - // EPStatement statement = engine.getEPAdministrator().create(query); - // log.debug("Adding query: {}", statement.getText()); - - EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query); - log.info(result.toString()); + engine.getEPRuntime().executeQuery(query); } /** * Add a single fire and forget EPL statement to the Esper engine. */ public void executeQuery(EPOnDemandPreparedQueryParameterized query) { - // EPStatement statement = engine.getEPAdministrator().create(query); - // log.debug("Adding query: {}", statement.getText()); + engine.getEPRuntime().executeQuery(query); + } - EPOnDemandQueryResult result = engine.getEPRuntime().executeQuery(query); - log.info(result.toString()); + /** + * Send an event to the Esper runtime. + */ + public void sendEvent(String name, Map values) { + engine.getEPRuntime().sendEvent(values, name); } /**