//PlazaTrader.cs
//Copyright (c) 2013 StockSharp LLC, all rights reserved.
//This code module is part of StockSharp library.
//This code is licensed under the GNU GENERAL PUBLIC LICENSE Version 3.
//See the file License.txt for the license details.
//More info on: http://stocksharp.com
namespace StockSharp.Plaza
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using Ecng.Collections;
using Ecng.Common;
using Ecng.Serialization;
using StockSharp.Logging;
using StockSharp.BusinessEntities;
using StockSharp.Algo;
using StockSharp.Plaza.Metadata;
using StockSharp.Plaza.Properties;
///
/// Реализация интерфейса для взаимодействия со шлюзом РТС через Plaza 2.
///
public class PlazaTrader : BaseTrader
{
private sealed class PlazaTableSet : SynchronizedSet
{
private readonly PlazaTrader _parent;
public PlazaTableSet(PlazaTrader parent)
{
if (parent == null)
throw new ArgumentNullException("parent");
_parent = parent;
}
protected override bool OnAdding(PlazaTable item)
{
ChechStartedExport();
return base.OnAdding(item);
}
protected override bool OnInserting(int index, PlazaTable item)
{
ChechStartedExport();
return base.OnInserting(index, item);
}
protected override bool OnRemoving(PlazaTable item)
{
ChechStartedExport();
return base.OnRemoving(item);
}
protected override bool OnClearing()
{
ChechStartedExport();
//Необходимо обнулить таблицы данных для стаканов
_parent.DefaultFutureDepthTable = null;
_parent.DefaultOptionDepthTable = null;
return base.OnClearing();
}
private void ChechStartedExport()
{
if (_parent.IsExportStarted)
throw new InvalidOperationException("Запущена передача данных. Смена потоков данных сервера Плазы невозможна.");
}
}
private readonly SynchronizedSet _multilegSecurities = new SynchronizedSet();
private int? _futSessionId;
private int? _optSessionId;
private readonly List _futSessions = new List();
private readonly List _optSessions = new List();
private readonly HashSet> _nativeIds = new HashSet>();
private readonly Dictionary>> _suspendedIsinEvents = new Dictionary>>();
private readonly SynchronizedDictionary _isinSecurities = new SynchronizedDictionary();
private readonly SynchronizedDictionary> _varMaring =
new SynchronizedDictionary>();
private readonly SynchronizedDictionary _varMaringIntercl =
new SynchronizedDictionary();
private const string _futuresClass = "FUT";
private const string _optionsClass = "OPT";
private const string _iniFile = "P2ClientGate.ini";
private readonly PlazaConnectionPool _connectionPool;
private readonly PlazaDepthBuilder _depthBuilder;
private readonly PlazaTableManager _tableManager;
///
/// Создать .
///
public PlazaTrader()
: this(new IPEndPoint(IPAddress.Loopback, PlazaAddresses.DefaultPlaza2Port))
{
}
///
/// Создать .
///
/// Адрес сервера.
public PlazaTrader(EndPoint address)
{
if (!File.Exists(_iniFile))
File.WriteAllText(_iniFile, Resources.P2ClientGate, Encoding.UTF8);
if (!File.Exists(TransactionFactory.MessagesFileName))
File.WriteAllText(TransactionFactory.MessagesFileName, Resources.p2fortsgate_messages, Encoding.UTF8);
Tables = new PlazaTableSet(this);
DefaultSecurityClassInfo = new RefPair(SecurityTypes.Stock, ExchangeBoard.Micex);
SecurityClassInfo.Add(_futuresClass, new RefPair(SecurityTypes.Future, ExchangeBoard.Forts));
SecurityClassInfo.Add(_optionsClass, new RefPair(SecurityTypes.Option, ExchangeBoard.Forts));
_connectionPool = new PlazaConnectionPool(RaiseConnected, RaiseDisconnected, RaiseConnectionError, RaiseProcessDataError, (type, msg) => this.AddLog(type, () => msg)) { Address = address };
_streamManager = new PlazaStreamManager(_connectionPool, RaiseProcessDataError);
_tableRegistry = new PlazaTableRegistry();
SubscribeEvents(_tableRegistry);
_depthBuilder = new PlazaDepthBuilder(this);
_tableManager = new PlazaTableManager(this);
_transactionManager = new TransactionManager(_connectionPool, TransactionIdGenerator);
_transactionManager.ProcessResponse += ProcessReply;
OnlySystemTrades = true;
// mika
// http://forum.rts.ru/viewtopic.asp?t=23038
// не корректно работает при переподключении. пускай кому нужно, тот и выставляет
//UseLocalProtocol = address.IsLocalIpAddress();
}
///
/// Вызвать событие .
///
/// Ошибка соединения.
protected override void RaiseConnectionError(Exception exception)
{
// РТС Техподдержка: "при обрыве связи нужно вызвать метод Disconnect"
_connectionPool.Disconnect();
this.AddInfoLog("RaiseConnectionError - disconnected");
base.RaiseConnectionError(exception);
}
///
/// Логин.
///
public string Login
{
get { return _connectionPool.Login; }
set { _connectionPool.Login = value; }
}
///
/// Пароль.
///
public string Password
{
get { return _connectionPool.Password; }
set { _connectionPool.Password = value; }
}
///
/// Адрес сервера. По-умолчанию значение равно адресу локального компьютера 127.0.0.1:4001.
///
public EndPoint Address
{
get { return _connectionPool.Address; }
set { _connectionPool.Address = value; }
}
///
/// Текстовое описание шлюза.
///
public override string DisplayName
{
get { return Address.ToString(); }
}
///
/// Префикс в имени приложения Plaza соединения.
///
///
/// Все созданные соединения именуются как _N, где N - порядковый номер соединение в пределах одного робота.
/// Если планируется подключать несколько к одному роутеру, они должны иметь уникальные .
///
public string AppName
{
get { return _connectionPool.AppName; }
set { _connectionPool.AppName = value; }
}
///
/// Время, в течение которого ожидается получение сообщения из потока данных или отправка транзакций.
///
///
/// По-умолчанию равно 100 милисекундам.
///
public TimeSpan PollTimeOut
{
get { return _connectionPool.PollTimeOut; }
set { _connectionPool.PollTimeOut = value; }
}
private readonly PlazaStreamManager _streamManager;
///
/// Менеджер потоков данных.
///
public PlazaStreamManager StreamManager
{
get { return _streamManager; }
}
///
/// Использовать SharedMem протокол при соединении с локальным роутером. По-умолчанию не используется.
///
public bool UseLocalProtocol
{
get { return _connectionPool.UseLocalProtocol; }
set { _connectionPool.UseLocalProtocol = value; }
}
///
/// Использовать лог заявок (orders log) для создания стаканов. По-умолчанию выключено.
///
///
/// Режим автоматически добавляет или удаляет из таблицу .
///
public override bool CreateDepthFromOrdersLog
{
get { return _tableManager.CreateDepthFromOrdersLog; }
set { _tableManager.CreateDepthFromOrdersLog = value; }
}
///
/// Использовать лог заявок (orders log) для создания тиковых сделок. По-умолчанию выключено.
///
///
/// Режим автоматически добавляет или удаляет из таблицу .
///
public override bool CreateTradesFromOrdersLog
{
get { return _tableManager.CreateTradesFromOrdersLog; }
set { _tableManager.CreateTradesFromOrdersLog = value; }
}
private readonly PlazaTableRegistry _tableRegistry;
///
/// Реестр всех таблиц сервера Плазы.
///
public PlazaTableRegistry TableRegistry
{
get { return _tableRegistry; }
}
private PlazaSessionState _sessionState;
///
/// Состояние торговой сессии.
///
public PlazaSessionState SessionState
{
get { return _sessionState; }
private set
{
if (_sessionState != value)
{
_sessionState = value;
SessionStateChanged.SafeInvoke();
}
}
}
///
/// Событие изменения .
///
public event Action SessionStateChanged;
private readonly object _routerLock = new object();
private PlazaRouter _router;
///
/// Получить управляение и настройки программы P2MQRouter.exe, которая осуществляет подключение к Plaza 2.
///
public PlazaRouter Router
{
get
{
lock (_routerLock)
return _router ?? (_router = new PlazaRouter(_connectionPool));
}
}
private readonly TransactionManager _transactionManager;
///
/// Менеджер транзакций.
///
public TransactionManager TransactionManager
{
get { return _transactionManager; }
}
///
/// Передавать через событие только системные сделки. По-умолчанию значение равно true.
///
public bool OnlySystemTrades { get; set; }
///
/// Таблица данных для стакана фьючерсных контрактов, которая будет запускаться при вызове метода .
///
///
/// Значение по умолчанию равно .
///
public PlazaTable DefaultFutureDepthTable
{
get { return _tableManager.DefaultFutureDepthTable; }
set { _tableManager.DefaultFutureDepthTable = value; }
}
///
/// Таблица данных для стакана опционных контрактов, которая будет запускаться при вызове метода .
///
///
/// Значение по умолчанию равно .
///
public PlazaTable DefaultOptionDepthTable
{
get { return _tableManager.DefaultOptionDepthTable; }
set { _tableManager.DefaultOptionDepthTable = value; }
}
///
/// Проверить соединение.
///
public override bool IsConnected
{
get { return _connectionPool != null && _connectionPool.IsConnected; }
}
///
/// Запущен ли экспорт. Экспорт запускается через метод .
///
public override bool IsExportStarted
{
get { return StreamManager != null && StreamManager.IsExportStarted; }
}
///
/// Таблицы, которые будут запущены при вызове метода .
///
public ISynchronizedCollection Tables { get; private set; }
///
/// Классы инструментов ММВБ, которые получаются при экспорте.
///
///
/// По умолчанию - акции.
/// Подробное описание доступно по адресу http://fs.rts.micex.ru/files/707
///
public readonly SynchronizedSet MicexClasses = new SynchronizedSet
{
"EQBR",
"EQBS",
"EQNE",
"EQNL",
"EQLV",
"EQLI"
};
private void SubscribeEvents(PlazaTableRegistry tableRegistry)
{
if (tableRegistry == null)
throw new ArgumentNullException("tableRegistry");
tableRegistry.CommonFuture.Inserted += OnCommonDerivativeInserted;
tableRegistry.CommonOption.Inserted += OnCommonDerivativeInserted;
tableRegistry.SessionContentsFuture.Inserted += OnSessionContentsFutureInserted;
tableRegistry.SessionContentsOption.Inserted += OnSessionContentsOptionInserted;
tableRegistry.Portfolios.Inserted += OnPortfoliosInserted;
tableRegistry.Positions.Inserted += OnPositionsInserted;
tableRegistry.Aggregation5Option.Inserted += OnAggregationInserted;
tableRegistry.Aggregation5Future.Inserted += OnAggregationInserted;
tableRegistry.Aggregation20Option.Inserted += OnAggregationInserted;
tableRegistry.Aggregation20Future.Inserted += OnAggregationInserted;
tableRegistry.Aggregation50Option.Inserted += OnAggregationInserted;
tableRegistry.Aggregation50Future.Inserted += OnAggregationInserted;
tableRegistry.Session.Inserted += OnSessionInserted;
tableRegistry.TradeFuture.Inserted += OnTradeInserted;
tableRegistry.TradeOption.Inserted += OnTradeInserted;
tableRegistry.UsdOnline.Inserted += OnUsdOnlineInserted;
tableRegistry.OrdersLogFuture.Inserted += OnOrdersLogInserted;
tableRegistry.OrdersLogOption.Inserted += OnOrdersLogInserted;
tableRegistry.Index.Inserted += OnIndexInserted;
tableRegistry.IndexLog.Inserted += OnIndexInserted;
tableRegistry.VarMarginFuture.Inserted += OnVarMarginInserted;
tableRegistry.VarMarginOption.Inserted += OnVarMarginInserted;
tableRegistry.Volatility.Inserted += OnVolatilityInserted;
tableRegistry.AnonymousOrdersLog.Inserted += OnAnonymousOrdersLogInserted;
tableRegistry.McxSecuritiesSpot.Inserted += OnMcxSecuritiesSpotInserted;
tableRegistry.McxSecuritiesCurrency.Inserted += OnMcxSecuritiesCurrencyInserted;
tableRegistry.McxCommonSpot.Inserted += OnMcxCommonSpotInserted;
tableRegistry.McxCommonCurrency.Inserted += OnMcxCommonCurrencyInserted;
tableRegistry.McxTradesSpot.Inserted += OnMcxTradesSpotInserted;
tableRegistry.McxTradesCurrency.Inserted += OnMcxTradesCurrencyInserted;
tableRegistry.McxOrderBookSpot.Inserted += OnMcxOrderBookInserted;
tableRegistry.McxOrderBookCurrency.Inserted += OnMcxOrderBookInserted;
SubscribeDepthTable(tableRegistry.McxOrderBookSpot);
SubscribeDepthTable(tableRegistry.McxOrderBookCurrency);
SubscribeDepthTable(tableRegistry.Aggregation5Future);
SubscribeDepthTable(tableRegistry.Aggregation20Future);
SubscribeDepthTable(tableRegistry.Aggregation50Future);
SubscribeDepthTable(tableRegistry.Aggregation5Option);
SubscribeDepthTable(tableRegistry.Aggregation20Option);
SubscribeDepthTable(tableRegistry.Aggregation50Option);
}
private void UnSubscribeEvents(PlazaTableRegistry tableRegistry)
{
if (tableRegistry == null)
throw new ArgumentNullException("tableRegistry");
tableRegistry.CommonFuture.Inserted -= OnCommonDerivativeInserted;
tableRegistry.CommonOption.Inserted -= OnCommonDerivativeInserted;
tableRegistry.SessionContentsFuture.Inserted -= OnSessionContentsFutureInserted;
tableRegistry.SessionContentsOption.Inserted -= OnSessionContentsOptionInserted;
tableRegistry.Portfolios.Inserted -= OnPortfoliosInserted;
tableRegistry.Positions.Inserted -= OnPositionsInserted;
tableRegistry.Aggregation5Option.Inserted -= OnAggregationInserted;
tableRegistry.Aggregation5Future.Inserted -= OnAggregationInserted;
tableRegistry.Aggregation20Option.Inserted -= OnAggregationInserted;
tableRegistry.Aggregation20Future.Inserted -= OnAggregationInserted;
tableRegistry.Aggregation50Option.Inserted -= OnAggregationInserted;
tableRegistry.Aggregation50Future.Inserted -= OnAggregationInserted;
tableRegistry.Session.Inserted -= OnSessionInserted;
tableRegistry.TradeFuture.Inserted -= OnTradeInserted;
tableRegistry.TradeOption.Inserted -= OnTradeInserted;
tableRegistry.UsdOnline.Inserted -= OnUsdOnlineInserted;
tableRegistry.OrdersLogFuture.Inserted -= OnOrdersLogInserted;
tableRegistry.OrdersLogOption.Inserted -= OnOrdersLogInserted;
tableRegistry.Index.Inserted -= OnIndexInserted;
tableRegistry.IndexLog.Inserted -= OnIndexInserted;
tableRegistry.VarMarginFuture.Inserted -= OnVarMarginInserted;
tableRegistry.VarMarginOption.Inserted -= OnVarMarginInserted;
tableRegistry.Volatility.Inserted -= OnVolatilityInserted;
tableRegistry.AnonymousOrdersLog.Inserted -= OnAnonymousOrdersLogInserted;
tableRegistry.McxSecuritiesSpot.Inserted -= OnMcxSecuritiesSpotInserted;
tableRegistry.McxSecuritiesCurrency.Inserted -= OnMcxSecuritiesCurrencyInserted;
tableRegistry.McxCommonSpot.Inserted -= OnMcxCommonSpotInserted;
tableRegistry.McxCommonCurrency.Inserted -= OnMcxCommonCurrencyInserted;
tableRegistry.McxTradesSpot.Inserted -= OnMcxTradesSpotInserted;
tableRegistry.McxTradesCurrency.Inserted -= OnMcxTradesCurrencyInserted;
tableRegistry.McxOrderBookSpot.Inserted -= OnMcxOrderBookInserted;
tableRegistry.McxOrderBookCurrency.Inserted -= OnMcxOrderBookInserted;
UnSubscribeDepthTable(tableRegistry.McxOrderBookSpot);
UnSubscribeDepthTable(tableRegistry.McxOrderBookCurrency);
UnSubscribeDepthTable(tableRegistry.Aggregation5Future);
UnSubscribeDepthTable(tableRegistry.Aggregation20Future);
UnSubscribeDepthTable(tableRegistry.Aggregation50Future);
UnSubscribeDepthTable(tableRegistry.Aggregation5Option);
UnSubscribeDepthTable(tableRegistry.Aggregation20Option);
UnSubscribeDepthTable(tableRegistry.Aggregation50Option);
}
private void SubscribeDepthTable(PlazaTable table)
{
if (table == null)
throw new ArgumentNullException("table");
table.End += OnAggregationEnd;
table.Flushed += OnAggregationFlushed;
table.Cleared += OnAggregationCleared;
}
private void UnSubscribeDepthTable(PlazaTable table)
{
if (table == null)
throw new ArgumentNullException("table");
table.Cleared -= OnAggregationCleared;
table.Flushed -= OnAggregationFlushed;
table.End -= OnAggregationEnd;
}
///
/// Подключиться к торговой системе.
///
protected override void OnConnect()
{
// Для удобства проверяем, запущен ли P2MQRouter.exe?
// Если это не делать, COM-библиотека Плазы будет выкидывать одну и ту же ошибку ('Couldn't resolve MQ service'),
// и когда P2MQRouter.exe не запущен,
// и когда нет связи с сервером Плазы или не ведутся торги.
//if (_connectionPool.IsLocal && Router.Status != ServiceControllerStatus.Running)
// throw new InvalidOperationException("На компьютере не установлена или не запущена программа P2MQRouter.exe.");
Router.Restart();
while (Router.Status != System.ServiceProcess.ServiceControllerStatus.Running)
{
System.Threading.Thread.Sleep(200);
}
_transactionManager.Start();
_connectionPool.Connect();
this.AddInfoLog("OnConnect");
}
///
/// Отключиться от торговой системы.
///
protected override void OnDisconnect()
{
// Иначе при выгрузке в StopStream будет срабатывать исключение.
if (IsExportStarted)
StopExport();
_transactionManager.Stop();
_connectionPool.Disconnect();
this.AddInfoLog("OnDisconnect");
}
///
/// Запустить экспорт данных из торговой системы в программу (получение портфелей, инструментов, заявок и т.д.).
///
public override void StartExport()
{
if (IsExportStarted)
StopExport();
this.AddInfoLog("Запуск экспорта.");
StreamManager.StartExport(CheckTables());
base.StartExport();
}
private IDictionary CheckTables()
{
lock (Tables.SyncRoot)
{
if (Tables.IsEmpty())
throw new InvalidOperationException("Коллекция таблиц данных пуста.");
if (Tables.Count(t => _tableRegistry.AggregationFutures.Contains(t)) > 1 || Tables.Count(t => _tableRegistry.AggregationOptions.Contains(t)) > 1)
throw new InvalidOperationException("Нельзя одновременно экспортировать потоки стаканов с разной глубиной.");
return Tables.SyncGet(c => c.GroupBy(t => t.Stream).ToDictionary(g => g.Key, g => g.ToArray()));
}
}
///
/// Остановить запущенный экспорт данных из торговой системы в программу.
///
public override void StopExport()
{
this.AddInfoLog("Остановка экспорта.");
if (!IsExportStarted)
{
this.AddInfoLog("Экспорт не был запущен.");
return;
}
StreamManager.StopExport();
base.StopExport();
}
private static void FillExtensionInfo(IExtendableEntity entity, PlazaRecord record)
{
if (record.Table.Columns.NonMandatoryColumns.IsEmpty())
return;
if (entity.ExtensionInfo == null)
entity.ExtensionInfo = new Dictionary