//PlazaTrader.cs //Copyright (c) 2013 StockSharp LLC, all rights reserved. //This code module is part of StockSharp library. //This code is licensed under the GNU GENERAL PUBLIC LICENSE Version 3. //See the file License.txt for the license details. //More info on: http://stocksharp.com namespace StockSharp.Plaza { using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Text; using Ecng.Collections; using Ecng.Common; using Ecng.Serialization; using StockSharp.Logging; using StockSharp.BusinessEntities; using StockSharp.Algo; using StockSharp.Plaza.Metadata; using StockSharp.Plaza.Properties; /// /// Реализация интерфейса для взаимодействия со шлюзом РТС через Plaza 2. /// public class PlazaTrader : BaseTrader { private sealed class PlazaTableSet : SynchronizedSet { private readonly PlazaTrader _parent; public PlazaTableSet(PlazaTrader parent) { if (parent == null) throw new ArgumentNullException("parent"); _parent = parent; } protected override bool OnAdding(PlazaTable item) { ChechStartedExport(); return base.OnAdding(item); } protected override bool OnInserting(int index, PlazaTable item) { ChechStartedExport(); return base.OnInserting(index, item); } protected override bool OnRemoving(PlazaTable item) { ChechStartedExport(); return base.OnRemoving(item); } protected override bool OnClearing() { ChechStartedExport(); //Необходимо обнулить таблицы данных для стаканов _parent.DefaultFutureDepthTable = null; _parent.DefaultOptionDepthTable = null; return base.OnClearing(); } private void ChechStartedExport() { if (_parent.IsExportStarted) throw new InvalidOperationException("Запущена передача данных. Смена потоков данных сервера Плазы невозможна."); } } private readonly SynchronizedSet _multilegSecurities = new SynchronizedSet(); private int? _futSessionId; private int? _optSessionId; private readonly List _futSessions = new List(); private readonly List _optSessions = new List(); private readonly HashSet> _nativeIds = new HashSet>(); private readonly Dictionary>> _suspendedIsinEvents = new Dictionary>>(); private readonly SynchronizedDictionary _isinSecurities = new SynchronizedDictionary(); private readonly SynchronizedDictionary> _varMaring = new SynchronizedDictionary>(); private readonly SynchronizedDictionary _varMaringIntercl = new SynchronizedDictionary(); private const string _futuresClass = "FUT"; private const string _optionsClass = "OPT"; private const string _iniFile = "P2ClientGate.ini"; private readonly PlazaConnectionPool _connectionPool; private readonly PlazaDepthBuilder _depthBuilder; private readonly PlazaTableManager _tableManager; /// /// Создать . /// public PlazaTrader() : this(new IPEndPoint(IPAddress.Loopback, PlazaAddresses.DefaultPlaza2Port)) { } /// /// Создать . /// /// Адрес сервера. public PlazaTrader(EndPoint address) { if (!File.Exists(_iniFile)) File.WriteAllText(_iniFile, Resources.P2ClientGate, Encoding.UTF8); if (!File.Exists(TransactionFactory.MessagesFileName)) File.WriteAllText(TransactionFactory.MessagesFileName, Resources.p2fortsgate_messages, Encoding.UTF8); Tables = new PlazaTableSet(this); DefaultSecurityClassInfo = new RefPair(SecurityTypes.Stock, ExchangeBoard.Micex); SecurityClassInfo.Add(_futuresClass, new RefPair(SecurityTypes.Future, ExchangeBoard.Forts)); SecurityClassInfo.Add(_optionsClass, new RefPair(SecurityTypes.Option, ExchangeBoard.Forts)); _connectionPool = new PlazaConnectionPool(RaiseConnected, RaiseDisconnected, RaiseConnectionError, RaiseProcessDataError, (type, msg) => this.AddLog(type, () => msg)) { Address = address }; _streamManager = new PlazaStreamManager(_connectionPool, RaiseProcessDataError); _tableRegistry = new PlazaTableRegistry(); SubscribeEvents(_tableRegistry); _depthBuilder = new PlazaDepthBuilder(this); _tableManager = new PlazaTableManager(this); _transactionManager = new TransactionManager(_connectionPool, TransactionIdGenerator); _transactionManager.ProcessResponse += ProcessReply; OnlySystemTrades = true; // mika // http://forum.rts.ru/viewtopic.asp?t=23038 // не корректно работает при переподключении. пускай кому нужно, тот и выставляет //UseLocalProtocol = address.IsLocalIpAddress(); } /// /// Вызвать событие . /// /// Ошибка соединения. protected override void RaiseConnectionError(Exception exception) { // РТС Техподдержка: "при обрыве связи нужно вызвать метод Disconnect" _connectionPool.Disconnect(); this.AddInfoLog("RaiseConnectionError - disconnected"); base.RaiseConnectionError(exception); } /// /// Логин. /// public string Login { get { return _connectionPool.Login; } set { _connectionPool.Login = value; } } /// /// Пароль. /// public string Password { get { return _connectionPool.Password; } set { _connectionPool.Password = value; } } /// /// Адрес сервера. По-умолчанию значение равно адресу локального компьютера 127.0.0.1:4001. /// public EndPoint Address { get { return _connectionPool.Address; } set { _connectionPool.Address = value; } } /// /// Текстовое описание шлюза. /// public override string DisplayName { get { return Address.ToString(); } } /// /// Префикс в имени приложения Plaza соединения. /// /// /// Все созданные соединения именуются как _N, где N - порядковый номер соединение в пределах одного робота. /// Если планируется подключать несколько к одному роутеру, они должны иметь уникальные . /// public string AppName { get { return _connectionPool.AppName; } set { _connectionPool.AppName = value; } } /// /// Время, в течение которого ожидается получение сообщения из потока данных или отправка транзакций. /// /// /// По-умолчанию равно 100 милисекундам. /// public TimeSpan PollTimeOut { get { return _connectionPool.PollTimeOut; } set { _connectionPool.PollTimeOut = value; } } private readonly PlazaStreamManager _streamManager; /// /// Менеджер потоков данных. /// public PlazaStreamManager StreamManager { get { return _streamManager; } } /// /// Использовать SharedMem протокол при соединении с локальным роутером. По-умолчанию не используется. /// public bool UseLocalProtocol { get { return _connectionPool.UseLocalProtocol; } set { _connectionPool.UseLocalProtocol = value; } } /// /// Использовать лог заявок (orders log) для создания стаканов. По-умолчанию выключено. /// /// /// Режим автоматически добавляет или удаляет из таблицу . /// public override bool CreateDepthFromOrdersLog { get { return _tableManager.CreateDepthFromOrdersLog; } set { _tableManager.CreateDepthFromOrdersLog = value; } } /// /// Использовать лог заявок (orders log) для создания тиковых сделок. По-умолчанию выключено. /// /// /// Режим автоматически добавляет или удаляет из таблицу . /// public override bool CreateTradesFromOrdersLog { get { return _tableManager.CreateTradesFromOrdersLog; } set { _tableManager.CreateTradesFromOrdersLog = value; } } private readonly PlazaTableRegistry _tableRegistry; /// /// Реестр всех таблиц сервера Плазы. /// public PlazaTableRegistry TableRegistry { get { return _tableRegistry; } } private PlazaSessionState _sessionState; /// /// Состояние торговой сессии. /// public PlazaSessionState SessionState { get { return _sessionState; } private set { if (_sessionState != value) { _sessionState = value; SessionStateChanged.SafeInvoke(); } } } /// /// Событие изменения . /// public event Action SessionStateChanged; private readonly object _routerLock = new object(); private PlazaRouter _router; /// /// Получить управляение и настройки программы P2MQRouter.exe, которая осуществляет подключение к Plaza 2. /// public PlazaRouter Router { get { lock (_routerLock) return _router ?? (_router = new PlazaRouter(_connectionPool)); } } private readonly TransactionManager _transactionManager; /// /// Менеджер транзакций. /// public TransactionManager TransactionManager { get { return _transactionManager; } } /// /// Передавать через событие только системные сделки. По-умолчанию значение равно true. /// public bool OnlySystemTrades { get; set; } /// /// Таблица данных для стакана фьючерсных контрактов, которая будет запускаться при вызове метода . /// /// /// Значение по умолчанию равно . /// public PlazaTable DefaultFutureDepthTable { get { return _tableManager.DefaultFutureDepthTable; } set { _tableManager.DefaultFutureDepthTable = value; } } /// /// Таблица данных для стакана опционных контрактов, которая будет запускаться при вызове метода . /// /// /// Значение по умолчанию равно . /// public PlazaTable DefaultOptionDepthTable { get { return _tableManager.DefaultOptionDepthTable; } set { _tableManager.DefaultOptionDepthTable = value; } } /// /// Проверить соединение. /// public override bool IsConnected { get { return _connectionPool != null && _connectionPool.IsConnected; } } /// /// Запущен ли экспорт. Экспорт запускается через метод . /// public override bool IsExportStarted { get { return StreamManager != null && StreamManager.IsExportStarted; } } /// /// Таблицы, которые будут запущены при вызове метода . /// public ISynchronizedCollection Tables { get; private set; } /// /// Классы инструментов ММВБ, которые получаются при экспорте. /// /// /// По умолчанию - акции. /// Подробное описание доступно по адресу http://fs.rts.micex.ru/files/707 /// public readonly SynchronizedSet MicexClasses = new SynchronizedSet { "EQBR", "EQBS", "EQNE", "EQNL", "EQLV", "EQLI" }; private void SubscribeEvents(PlazaTableRegistry tableRegistry) { if (tableRegistry == null) throw new ArgumentNullException("tableRegistry"); tableRegistry.CommonFuture.Inserted += OnCommonDerivativeInserted; tableRegistry.CommonOption.Inserted += OnCommonDerivativeInserted; tableRegistry.SessionContentsFuture.Inserted += OnSessionContentsFutureInserted; tableRegistry.SessionContentsOption.Inserted += OnSessionContentsOptionInserted; tableRegistry.Portfolios.Inserted += OnPortfoliosInserted; tableRegistry.Positions.Inserted += OnPositionsInserted; tableRegistry.Aggregation5Option.Inserted += OnAggregationInserted; tableRegistry.Aggregation5Future.Inserted += OnAggregationInserted; tableRegistry.Aggregation20Option.Inserted += OnAggregationInserted; tableRegistry.Aggregation20Future.Inserted += OnAggregationInserted; tableRegistry.Aggregation50Option.Inserted += OnAggregationInserted; tableRegistry.Aggregation50Future.Inserted += OnAggregationInserted; tableRegistry.Session.Inserted += OnSessionInserted; tableRegistry.TradeFuture.Inserted += OnTradeInserted; tableRegistry.TradeOption.Inserted += OnTradeInserted; tableRegistry.UsdOnline.Inserted += OnUsdOnlineInserted; tableRegistry.OrdersLogFuture.Inserted += OnOrdersLogInserted; tableRegistry.OrdersLogOption.Inserted += OnOrdersLogInserted; tableRegistry.Index.Inserted += OnIndexInserted; tableRegistry.IndexLog.Inserted += OnIndexInserted; tableRegistry.VarMarginFuture.Inserted += OnVarMarginInserted; tableRegistry.VarMarginOption.Inserted += OnVarMarginInserted; tableRegistry.Volatility.Inserted += OnVolatilityInserted; tableRegistry.AnonymousOrdersLog.Inserted += OnAnonymousOrdersLogInserted; tableRegistry.McxSecuritiesSpot.Inserted += OnMcxSecuritiesSpotInserted; tableRegistry.McxSecuritiesCurrency.Inserted += OnMcxSecuritiesCurrencyInserted; tableRegistry.McxCommonSpot.Inserted += OnMcxCommonSpotInserted; tableRegistry.McxCommonCurrency.Inserted += OnMcxCommonCurrencyInserted; tableRegistry.McxTradesSpot.Inserted += OnMcxTradesSpotInserted; tableRegistry.McxTradesCurrency.Inserted += OnMcxTradesCurrencyInserted; tableRegistry.McxOrderBookSpot.Inserted += OnMcxOrderBookInserted; tableRegistry.McxOrderBookCurrency.Inserted += OnMcxOrderBookInserted; SubscribeDepthTable(tableRegistry.McxOrderBookSpot); SubscribeDepthTable(tableRegistry.McxOrderBookCurrency); SubscribeDepthTable(tableRegistry.Aggregation5Future); SubscribeDepthTable(tableRegistry.Aggregation20Future); SubscribeDepthTable(tableRegistry.Aggregation50Future); SubscribeDepthTable(tableRegistry.Aggregation5Option); SubscribeDepthTable(tableRegistry.Aggregation20Option); SubscribeDepthTable(tableRegistry.Aggregation50Option); } private void UnSubscribeEvents(PlazaTableRegistry tableRegistry) { if (tableRegistry == null) throw new ArgumentNullException("tableRegistry"); tableRegistry.CommonFuture.Inserted -= OnCommonDerivativeInserted; tableRegistry.CommonOption.Inserted -= OnCommonDerivativeInserted; tableRegistry.SessionContentsFuture.Inserted -= OnSessionContentsFutureInserted; tableRegistry.SessionContentsOption.Inserted -= OnSessionContentsOptionInserted; tableRegistry.Portfolios.Inserted -= OnPortfoliosInserted; tableRegistry.Positions.Inserted -= OnPositionsInserted; tableRegistry.Aggregation5Option.Inserted -= OnAggregationInserted; tableRegistry.Aggregation5Future.Inserted -= OnAggregationInserted; tableRegistry.Aggregation20Option.Inserted -= OnAggregationInserted; tableRegistry.Aggregation20Future.Inserted -= OnAggregationInserted; tableRegistry.Aggregation50Option.Inserted -= OnAggregationInserted; tableRegistry.Aggregation50Future.Inserted -= OnAggregationInserted; tableRegistry.Session.Inserted -= OnSessionInserted; tableRegistry.TradeFuture.Inserted -= OnTradeInserted; tableRegistry.TradeOption.Inserted -= OnTradeInserted; tableRegistry.UsdOnline.Inserted -= OnUsdOnlineInserted; tableRegistry.OrdersLogFuture.Inserted -= OnOrdersLogInserted; tableRegistry.OrdersLogOption.Inserted -= OnOrdersLogInserted; tableRegistry.Index.Inserted -= OnIndexInserted; tableRegistry.IndexLog.Inserted -= OnIndexInserted; tableRegistry.VarMarginFuture.Inserted -= OnVarMarginInserted; tableRegistry.VarMarginOption.Inserted -= OnVarMarginInserted; tableRegistry.Volatility.Inserted -= OnVolatilityInserted; tableRegistry.AnonymousOrdersLog.Inserted -= OnAnonymousOrdersLogInserted; tableRegistry.McxSecuritiesSpot.Inserted -= OnMcxSecuritiesSpotInserted; tableRegistry.McxSecuritiesCurrency.Inserted -= OnMcxSecuritiesCurrencyInserted; tableRegistry.McxCommonSpot.Inserted -= OnMcxCommonSpotInserted; tableRegistry.McxCommonCurrency.Inserted -= OnMcxCommonCurrencyInserted; tableRegistry.McxTradesSpot.Inserted -= OnMcxTradesSpotInserted; tableRegistry.McxTradesCurrency.Inserted -= OnMcxTradesCurrencyInserted; tableRegistry.McxOrderBookSpot.Inserted -= OnMcxOrderBookInserted; tableRegistry.McxOrderBookCurrency.Inserted -= OnMcxOrderBookInserted; UnSubscribeDepthTable(tableRegistry.McxOrderBookSpot); UnSubscribeDepthTable(tableRegistry.McxOrderBookCurrency); UnSubscribeDepthTable(tableRegistry.Aggregation5Future); UnSubscribeDepthTable(tableRegistry.Aggregation20Future); UnSubscribeDepthTable(tableRegistry.Aggregation50Future); UnSubscribeDepthTable(tableRegistry.Aggregation5Option); UnSubscribeDepthTable(tableRegistry.Aggregation20Option); UnSubscribeDepthTable(tableRegistry.Aggregation50Option); } private void SubscribeDepthTable(PlazaTable table) { if (table == null) throw new ArgumentNullException("table"); table.End += OnAggregationEnd; table.Flushed += OnAggregationFlushed; table.Cleared += OnAggregationCleared; } private void UnSubscribeDepthTable(PlazaTable table) { if (table == null) throw new ArgumentNullException("table"); table.Cleared -= OnAggregationCleared; table.Flushed -= OnAggregationFlushed; table.End -= OnAggregationEnd; } /// /// Подключиться к торговой системе. /// protected override void OnConnect() { // Для удобства проверяем, запущен ли P2MQRouter.exe? // Если это не делать, COM-библиотека Плазы будет выкидывать одну и ту же ошибку ('Couldn't resolve MQ service'), // и когда P2MQRouter.exe не запущен, // и когда нет связи с сервером Плазы или не ведутся торги. //if (_connectionPool.IsLocal && Router.Status != ServiceControllerStatus.Running) // throw new InvalidOperationException("На компьютере не установлена или не запущена программа P2MQRouter.exe."); Router.Restart(); while (Router.Status != System.ServiceProcess.ServiceControllerStatus.Running) { System.Threading.Thread.Sleep(200); } _transactionManager.Start(); _connectionPool.Connect(); this.AddInfoLog("OnConnect"); } /// /// Отключиться от торговой системы. /// protected override void OnDisconnect() { // Иначе при выгрузке в StopStream будет срабатывать исключение. if (IsExportStarted) StopExport(); _transactionManager.Stop(); _connectionPool.Disconnect(); this.AddInfoLog("OnDisconnect"); } /// /// Запустить экспорт данных из торговой системы в программу (получение портфелей, инструментов, заявок и т.д.). /// public override void StartExport() { if (IsExportStarted) StopExport(); this.AddInfoLog("Запуск экспорта."); StreamManager.StartExport(CheckTables()); base.StartExport(); } private IDictionary CheckTables() { lock (Tables.SyncRoot) { if (Tables.IsEmpty()) throw new InvalidOperationException("Коллекция таблиц данных пуста."); if (Tables.Count(t => _tableRegistry.AggregationFutures.Contains(t)) > 1 || Tables.Count(t => _tableRegistry.AggregationOptions.Contains(t)) > 1) throw new InvalidOperationException("Нельзя одновременно экспортировать потоки стаканов с разной глубиной."); return Tables.SyncGet(c => c.GroupBy(t => t.Stream).ToDictionary(g => g.Key, g => g.ToArray())); } } /// /// Остановить запущенный экспорт данных из торговой системы в программу. /// public override void StopExport() { this.AddInfoLog("Остановка экспорта."); if (!IsExportStarted) { this.AddInfoLog("Экспорт не был запущен."); return; } StreamManager.StopExport(); base.StopExport(); } private static void FillExtensionInfo(IExtendableEntity entity, PlazaRecord record) { if (record.Table.Columns.NonMandatoryColumns.IsEmpty()) return; if (entity.ExtensionInfo == null) entity.ExtensionInfo = new Dictionary(); foreach (var column in record.Table.Columns.NonMandatoryColumns) entity.ExtensionInfo[column] = record.Get(column); } private bool IfMcxClassFiltered(string secClass) { return !MicexClasses.Contains(secClass); } private void OnMcxSecuritiesInserted(PlazaRecord record, Action initHandler) { var metadata = (PlazaMcxSecuritiesColumns)record.Table.Metadata; var secClass = record.GetString(metadata.SecBoard); if (IfMcxClassFiltered(secClass)) return; var secCode = record.GetString(metadata.SecCode).Trim(); ProcessEvents(() => GetSecurity(CreateSecurityId(secCode, secClass), id => { var security = EntityFactory.CreateSecurity(id); security.ExchangeBoard = ExchangeBoard.Micex; security.Name = record.GetString(metadata.SecName); security.ShortName = record.GetString(metadata.ShortName); security.Code = secCode; security.Class = secClass; security.MinStepPrice = security.MinStepSize = record.GetDecimal(metadata.MinStep); security.MinLotSize = record.GetInt(metadata.LotSize); security.Decimals = record.GetInt(metadata.Decimals); security.ExpiryDate = record.GetDateTime(metadata.MatDate); initHandler(security); return security; }, security => { using (security.BeginUpdate()) { security.State = record.GetString(metadata.Status).ToMicexSecurityState() == MicexSecurityState.Active ? SecurityStates.Trading : SecurityStates.Stoped; security.ClosePrice = record.GetDecimal(metadata.PrevPrice); security.LastChangeTime = security.GetMarketTime(); FillExtensionInfo(security, record); } return true; })); } private void OnMcxSecuritiesSpotInserted(PlazaRecord record) { OnMcxSecuritiesInserted(record, security => security.Type = SecurityTypes.Stock); } private void OnMcxSecuritiesCurrencyInserted(PlazaRecord record) { OnMcxSecuritiesInserted(record, security => security.Type = SecurityTypes.Currency); } private void OnMcxCommonInserted(PlazaRecord record) { var metadata = (PlazaMcxCommonColumns) record.Table.Metadata; var secClass = record.GetString(metadata.SecBoard); if (IfMcxClassFiltered(secClass)) return; var secCode = record.GetString(metadata.SecCode).Trim(); ProcessSecurityAction(CreateSecurityId(secCode, secClass), null, security => { using (security.BeginUpdate()) { var table = record.Table; if (table.Columns.Contains(metadata.Bid) || table.Columns.Contains(metadata.BidDepth)) { var bestBid = new Quote { Security = security, OrderDirection = OrderDirections.Buy, }; if (table.Columns.Contains(metadata.Bid)) bestBid.Price = record.GetDecimal(metadata.Bid); if (table.Columns.Contains(metadata.BidDepth)) bestBid.Volume = record.GetDecimal(metadata.BidDepth); security.BestBid = bestBid; } if (table.Columns.Contains(metadata.Offer) || table.Columns.Contains(metadata.OfferDepth)) { var bestAsk = new Quote { Security = security, OrderDirection = OrderDirections.Sell, }; if (table.Columns.Contains(metadata.Offer)) bestAsk.Price = record.GetDecimal(metadata.Offer); if (table.Columns.Contains(metadata.OfferDepth)) bestAsk.Volume = record.GetDecimal(metadata.OfferDepth); security.BestAsk = bestAsk; } Trade lastTrade = null; if (table.Columns.Contains(metadata.Quantity)) { lastTrade = EntityFactory.CreateTrade(security, 0); lastTrade.Volume = record.GetDecimal(metadata.Quantity); } if (record.Table.Columns.Contains(metadata.Last)) { lastTrade = lastTrade ?? EntityFactory.CreateTrade(security, 0); lastTrade.Price = record.GetDecimal(metadata.Last); } if (table.Columns.Contains(metadata.Time)) { lastTrade = lastTrade ?? EntityFactory.CreateTrade(security, 0); lastTrade.Time = record.GetDateTime(metadata.Time); lastTrade.InitLatency(); } if (lastTrade != null) security.LastTrade = lastTrade; else security.LastChangeTime = security.GetMarketTime(); if (table.Columns.Contains(metadata.Low)) security.LowPrice = record.GetDecimal(metadata.Low); if (table.Columns.Contains(metadata.High)) security.HighPrice = record.GetDecimal(metadata.High); if (table.Columns.Contains(metadata.Open)) security.OpenPrice = record.GetDecimal(metadata.Open); FillExtensionInfo(security, record); } }, true); } private void OnMcxCommonSpotInserted(PlazaRecord record) { OnMcxCommonInserted(record); } private void OnMcxCommonCurrencyInserted(PlazaRecord record) { OnMcxCommonInserted(record); } private void OnMcxOrderBookInserted(PlazaRecord record) { var metadata = (PlazaMcxOrderBookColumns)record.Table.Metadata; var secClass = record.GetString(metadata.SecBoard); if (IfMcxClassFiltered(secClass)) return; var secCode = record.GetString(metadata.SecCode).Trim(); ProcessSecurityAction(CreateSecurityId(secCode, secClass), null, security => _depthBuilder.UpdateQuote(record.Table, record, security, record.GetDecimal(metadata.Price), record.GetDecimal(metadata.Quantity), record.GetString(metadata.BuySell).ToOrderDirections(), security.GetMarketTime() // в бою недоступно поле (переписка с техподдержкой) /*record.GetDateTime(metadata.Time)*/), true); } private void OnSessionContentsDerivativeInserted(PlazaRecord record, PlazaSessionContentsDerivativeColumns metadata, Action initHandler, Action updateHandler) { ProcessEvents(() => { var isinId = record.GetInt(metadata.IsinId); var sessionId = record.GetInt(metadata.SessionId); var secCode = record.GetString(metadata.ShortIsin); var s = GetSecurity(CreateSecurityId(secCode, "RTS"), id => { var security = EntityFactory.CreateSecurity(id); security.ExchangeBoard = ExchangeBoard.Forts; security.Name = record.GetString(metadata.Name); security.ShortName = record.GetString(metadata.Isin); security.MinLotSize = record.GetInt(metadata.LotVolume); security.MinStepSize = record.GetDecimal(metadata.MinStep); security.Decimals = record.GetInt(metadata.RoundTo); security.ExpiryDate = record.GetDateTime(metadata.DateEndTrade); security.Code = secCode; initHandler(security); return security; }, security => { using (security.BeginUpdate()) { security.MinStepPrice = record.GetDecimal(metadata.StepPrice); security.ClosePrice = record.GetDecimal(metadata.LastClearingQuote); security.LastChangeTime = security.GetMarketTime(); var isLimited = record.GetBool(metadata.IsLimited); if (isLimited) { //http://forum.rts.ru/viewtopic.asp?t=21271 security.MaxPrice = security.ClosePrice + record.GetDecimal(metadata.LimitUp); security.MinPrice = Math.Max(security.MinStepSize, security.ClosePrice - record.GetDecimal(metadata.LimitDown)); } else { security.MinPrice = security.MinStepSize; } FillExtensionInfo(security, record); updateHandler(security); } return true; }); List> evts; lock (_isinSecurities.SyncRoot) { _isinSecurities[isinId] = s; var nativeId = new Tuple(isinId, sessionId); if (_nativeIds.Add(nativeId)) AddNativeSecurityId(s, nativeId); // обработать ассоциированные с данным инструментом действия, которые произошли в других сессиях (если таковые были) foreach (var sessId in (s.Type == SecurityTypes.Future ? _futSessions : _optSessions)) { nativeId = new Tuple(isinId, sessId); if (_nativeIds.Add(nativeId)) AddNativeSecurityId(s, nativeId); } evts = _suspendedIsinEvents.TryGetValue(isinId); if (evts != null) _suspendedIsinEvents.Remove(isinId); } if (evts != null) { foreach (var evt in evts) evt(s); } }); } private void OnSessionContentsFutureInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.SessionContentsFuture; var state = (PlazaSecurityState)record.GetInt(metadata.State); if (state == PlazaSecurityState.Ended) return; //Сессия закончена OnSessionContentsDerivativeInserted(record, metadata, security => { security.Class = _futuresClass; security.Type = SecurityTypes.Future; security.UnderlyingSecurityId = record.GetString(metadata.CodeVcb); //Если РЕПО или своп - добавляем в список для дальнейшей фильтрации при построении стаканов. if (record.GetInt(metadata.MultilegType) != 0) _multilegSecurities.Add(security); }, security => { security.MarginBuy = record.GetDecimal(metadata.BuyDeposit); security.MarginSell = record.GetDecimal(metadata.SellDeposit); }); } private void OnSessionContentsOptionInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.SessionContentsOption; OnSessionContentsDerivativeInserted(record, metadata, security => { security.Class = _optionsClass; security.Type = SecurityTypes.Option; security.ExpiryDate = record.GetDateTime(metadata.ExpirationEndDate); security.Strike = record.GetDecimal(metadata.Strike); security.OptionType = record.GetBool(metadata.IsPut) ? OptionTypes.Put : OptionTypes.Call; var isinId = record.GetInt(metadata.FuturesIsinId); ProcessIsinAction(underlyingSecurity => security.UnderlyingSecurityId = underlyingSecurity.Id, isinId); }, security => { security.MarginBuy = record.GetDecimal(metadata.BgoBuy); // mika транслируется наихудший показатель, тоесть ГО при отстутствии покрытия через фьючерсы security.MarginSell = record.GetDecimal(metadata.BgoNc); }); } private void OnIndexInserted(PlazaRecord record) { ProcessEvents(() => { var metadata = TableRegistry.ColumnRegistry.Index; //Индексы ММВБ дополняются в конце лишними пробелами var secName = record.GetString(metadata.Name).Trim(); var secClass = secName.ContainsIgnoreCase("MICEX") || secName.ContainsIgnoreCase("MCX") // доп. проверка на русскую С: // http://forum.rts.ru/viewtopic.asp?p=130715 || secName.ContainsIgnoreCase("MСX") ? "MICEX" : "RTS"; GetSecurity(CreateSecurityId(secName, secClass), id => { var security = EntityFactory.CreateSecurity(id); security.Code = secName; security.Name = secName; security.ShortName = secName; security.Class = secClass; security.ExchangeBoard = secClass == "RTS" ? ExchangeBoard.Forts : ExchangeBoard.Micex; security.Type = SecurityTypes.Index; security.State = SecurityStates.Trading; // предположение о том, что все индексы имеют одинаковый шаг цены security.MinStepPrice = security.MinStepSize = 0.01m; return security; }, security => { using (security.BeginUpdate()) { security.LastTrade = GetTrade(security, record.ReplRevision, id => { var trade = EntityFactory.CreateTrade(security, id); trade.Price = record.GetDecimal(metadata.Value); trade.Time = record.GetDateTime(metadata.Moment); trade.InitLatency(); return trade; }); security.ClosePrice = record.GetDecimal(metadata.PrevCloseValue); security.OpenPrice = record.GetDecimal(metadata.OpenValue); security.LowPrice = record.GetDecimal(metadata.MinValue); security.HighPrice = record.GetDecimal(metadata.MaxValue); FillExtensionInfo(security, record); } return true; }); }); } private void OnUsdOnlineInserted(PlazaRecord record) { ProcessEvents(() => GetSecurity("USDONLINE@RTS", id => { var security = EntityFactory.CreateSecurity(id); security.Code = "USDONLINE"; security.Name = "Usd Online"; security.ExchangeBoard = ExchangeBoard.Forts; security.Type = SecurityTypes.Index; security.State = SecurityStates.Trading; // предположение о том, что все индексы имеют одинаковый шаг цены security.MinStepPrice = security.MinStepSize = 0.0001m; return security; }, security => { using (security.BeginUpdate()) { var metadata = (PlazaUsdOnlineColumns)record.Table.Metadata; security.LastTrade = GetTrade(security, record.ReplRevision, id => { var trade = EntityFactory.CreateTrade(security, id); trade.Price = record.GetDecimal(metadata.Rate); trade.Time = record.GetDateTime(metadata.Moment); trade.InitLatency(); return trade; }); } return true; })); } private void ProcessPlazaStream(PlazaRecord record, PlazaColumn isinId, PlazaColumn sessionId, Action action, bool ignoreIfNotExist = false) { ProcessEvents(() => ProcessPlazaSecurity(record, isinId, sessionId, action, ignoreIfNotExist)); } private void ProcessPlazaSecurity(PlazaRecord record, PlazaColumn isinId, PlazaColumn sessionId, Action action, bool ignoreIfNotExist = false) { ProcessPlazaSecurity(record.GetInt(isinId), record.GetInt(sessionId), ignoreIfNotExist, action); } private void ProcessPlazaSecurity(int isinId, int sessionId, bool ignoreIfNotExist, Action action) { var nativeId = new Tuple(isinId, sessionId); ProcessSecurityAction(null, nativeId, action, ignoreIfNotExist); } private void OnCommonDerivativeInserted(PlazaRecord record) { var metadata = (PlazaCommonDerivativeColumns)record.Table.Metadata; ProcessPlazaStream(record, metadata.IsinId, metadata.SessionId, security => { using (security.BeginUpdate()) { var table = record.Table; security.OpenInterest = record.GetDecimal(metadata.Position); if (table.Columns.Contains(metadata.BestBuy) || table.Columns.Contains(metadata.AmountToBuy)) { var bestBid = new Quote { Security = security, OrderDirection = OrderDirections.Buy, }; if (table.Columns.Contains(metadata.BestBuy)) bestBid.Price = record.GetDecimal(metadata.BestBuy); if (table.Columns.Contains(metadata.AmountToBuy)) bestBid.Volume = record.GetDecimal(metadata.AmountToBuy); security.BestBid = bestBid; } if (table.Columns.Contains(metadata.BestSell) || table.Columns.Contains(metadata.AmountToSell)) { var bestAsk = new Quote { Security = security, OrderDirection = OrderDirections.Sell, }; if (table.Columns.Contains(metadata.BestSell)) bestAsk.Price = record.GetDecimal(metadata.BestSell); if (table.Columns.Contains(metadata.AmountToSell)) bestAsk.Volume = record.GetDecimal(metadata.AmountToSell); security.BestAsk = bestAsk; } Trade lastTrade = null; if (table.Columns.Contains(metadata.Price)) { lastTrade = EntityFactory.CreateTrade(security, 0); lastTrade.Price = record.GetDecimal(metadata.Price); } if (table.Columns.Contains(metadata.Amount)) { lastTrade = lastTrade ?? EntityFactory.CreateTrade(security, 0); lastTrade.Volume = record.GetDecimal(metadata.Amount); } if (table.Columns.Contains(metadata.DealTime)) { lastTrade = lastTrade ?? EntityFactory.CreateTrade(security, 0); lastTrade.Time = record.GetDateTime(metadata.DealTime); lastTrade.InitLatency(); } if (lastTrade != null) security.LastTrade = lastTrade; else security.LastChangeTime = security.GetMarketTime(); if (table.Columns.Contains(metadata.MinPrice)) security.LowPrice = record.GetDecimal(metadata.MinPrice); if (table.Columns.Contains(metadata.MaxPrice)) security.HighPrice = record.GetDecimal(metadata.MaxPrice); if (table.Columns.Contains(metadata.OpeningPrice)) security.OpenPrice = record.GetDecimal(metadata.OpeningPrice); if (table.Columns.Contains(metadata.OrdersBuyAmount)) security.BidsVolume = record.GetDecimal(metadata.OrdersBuyAmount); if (table.Columns.Contains(metadata.OrdersBuyQuantity)) security.BidsCount = record.GetDecimal(metadata.OrdersBuyQuantity); if (table.Columns.Contains(metadata.OrdersSellAmount)) security.AsksVolume = record.GetDecimal(metadata.OrdersSellAmount); if (table.Columns.Contains(metadata.OrdersSellQuantity)) security.AsksCount = record.GetDecimal(metadata.OrdersSellQuantity); FillExtensionInfo(security, record); } }, true); } private void OnMcxTradesInserted(PlazaRecord record, PlazaMcxTradesColumns metadata, Action updateHandler) { var secClass = record.GetString(metadata.SecBoard); if (IfMcxClassFiltered(secClass)) return; var secCode = record.GetString(metadata.SecCode).Trim(); ProcessSecurityAction(CreateSecurityId(secCode, secClass), null, security => { var trade = GetTrade(security, record.GetLong(metadata.TradeId), dealId => { var t = EntityFactory.CreateTrade(security, dealId); t.Price = record.GetDecimal(metadata.Price); t.Volume = record.GetLong(metadata.Quantity); t.Time = record.GetDateTime(metadata.TradeTime); t.InitLatency(); return t; }); updateHandler(trade); FillExtensionInfo(trade, record); using (security.BeginUpdate()) { security.LastTrade = trade; } RaiseSecurityChanged(security); }); } private void OnMcxTradesCurrencyInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.McxTradesCurrency; OnMcxTradesInserted(record, metadata, trade => { }); } private void OnMcxTradesSpotInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.McxTradesSpot; OnMcxTradesInserted(record, metadata, trade => { trade.OrderDirection = record.GetString(metadata.BuySell).ToOrderDirections(); }); } private void OnTradeInserted(PlazaRecord record) { var metadata = (PlazaTradeDerivativeColumns)record.Table.Metadata; var isSystem = !record.GetBool(metadata.NoSystem); if (OnlySystemTrades && !isSystem) return; ProcessPlazaStream(record, metadata.IsinId, metadata.SessionId, security => { var trade = GetTrade(security, record.GetLong(metadata.DealId), dealId => { var t = EntityFactory.CreateTrade(security, dealId); t.Price = record.GetDecimal(metadata.Price); t.Volume = record.GetDecimal(metadata.Amount); t.Time = record.GetDateTime(metadata.Moment); t.IsSystem = isSystem; t.InitLatency(); // mika Данные колонки обязательные, и нет смысла проверять их наличие // //if (record.Table.Columns.Contains(metadata.BuyOrderId) && record.Table.Columns.Contains(metadata.SellOrderId)) //{ t.OrderDirection = record.GetLong(metadata.BuyOrderId) > record.GetLong(metadata.SellOrderId) ? OrderDirections.Buy : OrderDirections.Sell; //} //if (record.Table.Columns.Contains(metadata.Position)) t.OpenInterest = record.GetInt(metadata.Position); if (t.OrderDirection == OrderDirections.Buy) { if (record.Table.Columns.Contains(metadata.BuyStatus)) t.Status = record.GetInt(metadata.BuyStatus); } else if (t.OrderDirection == OrderDirections.Sell) { if (record.Table.Columns.Contains(metadata.SellStatus)) t.Status = record.GetInt(metadata.SellStatus); } return t; }); if (isSystem) { using (security.BeginUpdate()) security.LastTrade = trade; RaiseSecurityChanged(security); } var orderTransactionId = record.GetInt(metadata.BuyExternalId); if (orderTransactionId == 0) orderTransactionId = record.GetInt(metadata.SellExternalId); // собственная сделка if (orderTransactionId != 0) { AddMyTrade(security, 0, orderTransactionId, trade); } FillExtensionInfo(trade, record); }); } private void OnOrdersLogInserted(PlazaRecord record) { var metadata = (PlazaOrdersLogDerivativeColumns)record.Table.Metadata; var transactionId = record.GetInt(metadata.ExtId); // чужие заявки игнорируем, так как это старый ордер лог if (transactionId == 0) return; ProcessPlazaStream(record, metadata.IsinId, metadata.SessionId, security => GetOrder(security, record.GetLong(metadata.OrderId), orderId => { //http://forum.rts.ru/viewtopic.asp?p=115933 //10. Изменен порядок прихода записей по итогам торговой транзакции в логе заявок. //Ранее при постановке заявки первой шла запись о постановке заявки инициатора транзакции, //вслед за которой шли записи о сделках. //В версии 3.8 запись о постановке заявки добавляется последней, после записей о сделках. var order = GetOrderByTransactionId(transactionId); if (order == null) { order = EntityFactory.CreateOrder(security, orderId); order.TransactionId = transactionId; } using (order.BeginUpdate()) { order.Id = orderId; InitOrder(order, record, metadata); var clientCode = record.GetString(metadata.ClientCode); if (!clientCode.IsEmpty()) order.Portfolio = GetPortfolio(clientCode); order.Comment = record.GetString(metadata.Comment); order.ExpiryDate = record.GetDateTime(metadata.DateOrderExpiration); } return order; }, order => { using (order.BeginUpdate()) { UpdateOrder(order, record, metadata); FillExtensionInfo(order, record); } return true; })); } private static void InitOrder(Order order, PlazaRecord record, PlazaBaseOrdersColumns metadata) { if (order == null) throw new ArgumentNullException("order"); if (record == null) throw new ArgumentNullException("record"); if (metadata == null) throw new ArgumentNullException("metadata"); order.Price = record.GetDecimal(metadata.Price); order.Volume = record.GetDecimal(metadata.AmountOperation); order.Direction = record.GetBool(metadata.Direction) ? OrderDirections.Buy : OrderDirections.Sell; order.Time = order.LastChangeTime = record.GetDateTime(metadata.Moment); order.State = OrderStates.Active; var status = record.GetInt(metadata.Status); if (status.HasBits(0x01)) order.ExecutionCondition = OrderExecutionConditions.PutInQueue; else if (status.HasBits(0x02)) order.ExecutionCondition = OrderExecutionConditions.CancelBalance; order.Status = (OrderStatus)status; order.IsSystem = !status.HasBits(0x04); } /// /// Действие, произошедшее с заявкой. /// private enum OrderLogActions { /// /// Заявка удалена. /// Canceled, /// /// Заявка добавлена. /// Registered, /// /// Заявка сведена в сделку. /// Matched, } private static void UpdateOrder(Order order, PlazaRecord record, PlazaBaseOrdersColumns metadata) { var action = record.Get(metadata.Action); switch (action) { case OrderLogActions.Canceled: // удалена order.LastChangeTime = record.GetDateTime(metadata.Moment); order.Balance = record.GetDecimal(metadata.AmountOperation); order.State = OrderStates.Done; break; case OrderLogActions.Registered: // добавлена var time = record.GetDateTime(metadata.Moment); order.LastChangeTime = time; // LastChangeTime - Биржевое время установки/снятия/сведения заявки // если NewOrder генерится по транзакции, оно пустое. // Поэтому нам НУЖНО сгенерировать OrderChanged //if (order.Time == time) // isChanged = false; //else // order.Time = time; order.Time = time; //Завяка может быть отправлена на регистрацию с нулевым объемом, поэтому //инициализируем объем и баланс, чтобы она не считалась исполненной раньше времени order.Volume = order.Balance = record.GetDecimal(metadata.AmountOperation); break; case OrderLogActions.Matched: // исполнена order.Balance = record.GetDecimal(metadata.AmountRest); order.LastChangeTime = record.GetDateTime(metadata.Moment); if (order.Balance == 0) order.State = OrderStates.Done; break; default: throw new InvalidOperationException("Неизвестное действие заявки {0}.".Put(action)); } } private void OnPortfoliosInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.Portfolios; var clientCode = record.GetString(metadata.ClientCode); _varMaringIntercl[clientCode] = record.GetDecimal(metadata.VmInterClearing); ProcessEvents(() => GetPortfolio(clientCode, portfolio => { using (portfolio.BeginUpdate()) { portfolio.Name = record.GetString(metadata.ClientCode); portfolio.BeginValue = record.GetDecimal(metadata.MoneyOld); portfolio.CurrentValue = record.GetDecimal(metadata.MoneyAmount); portfolio.Commission = record.GetDecimal(metadata.Fee); portfolio.Leverage = record.GetDecimal(metadata.CoeffGo); portfolio.ExchangeBoard = ExchangeBoard.Forts; FillExtensionInfo(portfolio, record); } return true; })); } private void OnPositionsInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.Positions; Action suspendedAction = security => UpdatePosition(GetPortfolio(record.GetString(metadata.ClientCode)), security, position => { using (position.BeginUpdate()) { position.BeginValue = record.GetDecimal(metadata.OpenQty); position.CurrentValue = record.GetDecimal(metadata.Pos); FillExtensionInfo(position, record); } return true; }); var isinId = record.GetInt(metadata.IsinId); ProcessIsinAction(suspendedAction, isinId); } private void ProcessIsinAction(Action suspendedAction, int isinId) { Security s; lock (_isinSecurities.SyncRoot) { s = _isinSecurities.TryGetValue(isinId); if (s == null) { _suspendedIsinEvents.SafeAdd(isinId).Add(suspendedAction); return; } } suspendedAction(s); } private void OnAggregationFlushed(PlazaTable table) { _depthBuilder.ClearQuotes(table); } private void OnAggregationInserted(PlazaRecord record) { var sessionId = record.Table.IsFuture ? _futSessionId : _optSessionId; //TODO: сессия могла ещё и не придти if (sessionId == null) return; var metadata = (PlazaAggregationColumns)record.Table.Metadata; ProcessPlazaSecurity(record.GetInt(metadata.IsinId), (int)sessionId, true, security => { //Стаканы для свопов и РЕПО строятся иначе - может быть Bid>=Ask. Для свопов стаканы строятся по процентам. if (!_multilegSecurities.Contains(security)) { _depthBuilder.UpdateQuote(record.Table, record, security, record.GetDecimal(metadata.Price), record.GetLong(metadata.Volume), record.GetBool(metadata.Direction) ? OrderDirections.Buy : OrderDirections.Sell, record.GetDateTime(metadata.Moment)); } }); } private void OnAggregationCleared(PlazaTable table, long revision) { _depthBuilder.RemoveQuotesByRev(table, revision); // mika // DatumDeleted - вне транзакции // http://forum.rts.ru/viewtopic.asp?p=126849#126849 // OnAggregationEnd(table); } private void OnAggregationEnd(PlazaTable table) { var depths = _depthBuilder.FlushChanges(table); depths.ForEach(RaiseMarketDepthChanged); } private void OnSessionInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.Session; SessionState = (PlazaSessionState)record.GetInt(metadata.State); // http://forum.rts.ru/viewtopic.asp?t=20203 // Снимаем активные заявки с датой истечения сегодня if (SessionState == PlazaSessionState.Ended) { var endSessionTime = LoggingHelper.Now; foreach (var order in Orders.Where(o => o.State == OrderStates.Active && o.ExpiryDate == null)) { using (order.BeginUpdate()) { order.LastChangeTime = endSessionTime; order.State = OrderStates.Done; } RaiseOrderChanged(order); } // Очистить стакан в клиринг // // upd mika: Стакан лучше оставить как есть, и перестроить его при первом изменении из новой сессии. // Плюс это сделает поведение стаканов совместимыми с тем, что получаются из агрегированных потоков. // //if (CreateDepthFromOrdersLog) //{ // foreach (var depth in Securities.Select(GetMarketDepth).Where(depth => depth != null)) // { // depth.Update(new Quote[0], new Quote[0], true, endSessionTime); // } //} } else if (SessionState == PlazaSessionState.Assigned) { _varMaring.Clear(); _varMaringIntercl.Clear(); foreach (var portfolio in Portfolios) { portfolio.VariationMargin = 0.0m; } } lock (_isinSecurities.SyncRoot) { _futSessionId = record.GetInt(metadata.SessionId); _optSessionId = record.GetInt(metadata.OptSessionId); _futSessions.Add((int)_futSessionId); _optSessions.Add((int)_optSessionId); foreach (var pair in _isinSecurities) { var security = pair.Value; var nativeId = new Tuple(pair.Key, security.Type == SecurityTypes.Future ? (int)_futSessionId : (int)_optSessionId); if (_nativeIds.Add(nativeId)) AddNativeSecurityId(security, nativeId); } } } private void OnAnonymousOrdersLogInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.AnonymousOrdersLog; ProcessPlazaStream(record, metadata.IsinId, metadata.SessionId, security => { var order = EntityFactory.CreateOrder(security, record.GetLong(metadata.OrderId)); order.Portfolio = Portfolio.AnonymousPortfolio; // создаем для записи ордер лога уникальный идентификатор order.TransactionId = record.ReplId; InitOrder(order, record, metadata); UpdateOrder(order, record, metadata); var price = record.GetDecimal(metadata.DealPrice); Trade trade = null; var action = record.Get(metadata.Action); if (action == OrderLogActions.Matched) { trade = EntityFactory.CreateTrade(security, record.GetLong(metadata.DealId)); trade.Price = price; trade.Time = record.GetDateTime(metadata.Moment); trade.Volume = order.Volume; trade.IsSystem = order.IsSystem; if (CreateTradesFromOrdersLog) { GetTrade(security, trade.Id, id => { var t = trade.Clone(); t.OrderDirection = order.Direction.Invert(); return t; }); } } var logItem = EntityFactory.CreateOrderLogItem(order, trade); logItem.InitLatency(); RaiseNewOrderLogItem(logItem); if (CreateDepthFromOrdersLog) { var depth = GetMarketDepth(security); if (depth.Update(logItem)) RaiseMarketDepthChanged(depth); } }); } private void OnVolatilityInserted(PlazaRecord record) { var metadata = TableRegistry.ColumnRegistry.Volatility; ProcessPlazaStream(record, metadata.IsinId, metadata.SessionId, security => { using (security.BeginUpdate()) { security.ImpliedVolatility = record.GetDecimal(metadata.ImpliedVolatility); security.TheorPrice = record.GetDecimal(metadata.TheorPrice); security.LastChangeTime = security.GetMarketTime(); } }, true); } private void OnVarMarginInserted(PlazaRecord record) { var metadata = (PlazaVarMarginDerivativeColumns)record.Table.Metadata; var clientCode = record.GetString(metadata.ClientCode); ProcessEvents(() => GetPortfolio(clientCode, portfolio => { var isinId = record.GetInt(metadata.IsinId); var curr = record.GetDecimal(metadata.VarMarginReal); using (portfolio.BeginUpdate()) { portfolio.VariationMargin = _varMaring.SyncGet(d => { var clientDict = _varMaring.TryGetValue(clientCode); if (clientDict == null) { clientDict = new SynchronizedDictionary(); _varMaring[clientCode] = clientDict; } clientDict[isinId] = curr; return clientDict.Values.Sum() + _varMaringIntercl.TryGetValue(clientCode); }); } return true; })); } /// /// Зарегистрировать заявку на бирже. /// /// Заявка, содержащая информацию для регистрации. protected override void OnRegisterOrder(Order order) { if (order.Type == OrderTypes.Conditional) throw new NotSupportedException("Условные заявки не поддерживаются протоколом Plaza2."); var transaction = _transactionManager.Factory.CreateRegister(order); transaction .SetPrice(order.Price) .SetVolume((int)order.Volume) .SetSecurity(order.Security) .SetClientCode(order.Portfolio.ToClientCode()) .SetDirection(order.Direction) .SetExecutionCondition(order.ExecutionCondition) .SetTransactionId(order.TransactionId); if (!order.Comment.IsEmpty()) transaction.SetComment(order.Comment); if (order.ExpiryDate != null) transaction.SetExpiryDate(order.ExpiryDate.Value); _transactionManager.SendTransaction(transaction); } /// /// Отменить заявку на бирже. /// /// Заявка, которую нужно отменять. protected override void OnCancelOrder(Order order) { if (order.Id == 0) { //Отменяем заявку по внешнему номеру var transaction = CreateCancelGroupTransaction(order.Security, order.Direction, order.Portfolio, order.TransactionId, order); _transactionManager.SendTransaction(transaction); } else { var transaction = _transactionManager.Factory.CreateCancel(order); transaction.SetOrderId(order.Id); _transactionManager.SendTransaction(transaction); } } /// /// Перерегистрировать заявку на бирже. /// /// Заявка, которую нужно снять. /// Новая заявка, которую нужно зарегистрировать. protected override void OnReRegisterOrder(Order oldOrder, Order newOrder) { InitNewOrder(newOrder); var transaction = _transactionManager.Factory.CreateReRegister(oldOrder, newOrder); transaction .SetMoveMode(newOrder.Volume == 0) .SetFirstOrderId(oldOrder.Id) .SetFirstVolume((int)newOrder.Volume) .SetFirstPrice(newOrder.Price) .SetFirstTransactionId(newOrder.TransactionId); _transactionManager.SendTransaction(transaction); } /// /// Перерегистрировать пару заявок на бирже. /// /// Первая заявка, которую нужно снять. /// Первая новая заявка, которую нужно зарегистрировать. /// Вторая заявка, которую нужно снять. /// Вторая новая заявка, которую нужно зарегистрировать. protected override void OnReRegisterOrderPair(Order oldOrder1, Order newOrder1, Order oldOrder2, Order newOrder2) { InitNewOrder(newOrder1); InitNewOrder(newOrder2); var transaction = _transactionManager.Factory.CreateReRegisterPair(oldOrder1, newOrder1, oldOrder2, newOrder2); transaction .SetMoveMode(newOrder1.Volume == 0 && newOrder2.Volume == 0) .SetFirstOrderId(oldOrder1.Id) .SetFirstVolume((int)newOrder1.Volume) .SetFirstPrice(newOrder1.Price) .SetFirstTransactionId(newOrder1.TransactionId) .SetSecondOrderId(oldOrder2.Id) .SetSecondVolume((int)newOrder2.Volume) .SetSecondPrice(newOrder2.Price) .SetSecondTransactionId(newOrder2.TransactionId); _transactionManager.SendTransaction(transaction); } /// /// Отменить группу заявок на бирже по фильтру. /// /// True, если нужно отменить только стоп-заявки, false - если только обычный и null - если оба типа. /// Портфель. Если значение равно null, то портфель не попадает в фильтр снятия заявок. /// Направление заявки. Если значение равно null, то направление не попадает в фильтр снятия заявок. /// Код класса. Если переданная строка пустая, то код не попадает в фильтр снятия заявок. /// Инструмент. Если значение равно null, то инструмент не попадает в фильтр снятия заявок. public override void CancelOrders(bool? isStopOrder = null, Portfolio portfolio = null, OrderDirections? direction = null, string classCode = null, Security security = null) { if (security == null) base.CancelOrders(isStopOrder, portfolio, direction, classCode); else { //TODO: передавать заявки на отмену при создании транзакции //var orders = Orders.Where(order => // { // if (order.State != OrderStates.Active) // { // return false; // } // if (isStopOrder != null) // { // if (isStopOrder.Value && order.Type != OrderTypes.Conditional) // { // return false; // } // else if (order.Type != OrderTypes.Limit) // { // return false; // } // } // if (portfolio != null && order.Portfolio != portfolio) // { // return false; // } // if (direction != null && order.Direction != direction) // { // return false; // } // if (!classCode.IsEmpty() && order.Security.Class != classCode) // { // return false; // } // if (security != null && order.Security == security) // { // return false; // } // return true; // } //); //var transaction = CreateCancelGroupTransaction(security, direction, portfolio, 0, orders.ToArray()); var transaction = CreateCancelGroupTransaction(security, direction, portfolio, 0); _transactionManager.SendTransaction(transaction); } } private Transaction CreateCancelGroupTransaction(Security security, OrderDirections? direction, Portfolio portfolio, long transactionId, params Order[] orders) { var transaction = _transactionManager.Factory.CreateCancelGroup(security.Type, orders); transaction .SetSecurity(security) .SetDirection(direction) .SetOrderType(null); if (transactionId != 0) transaction.SetTransactionId(transactionId); if (portfolio != null) transaction.SetClientCode(portfolio.ToClientCode()); return transaction; } private void ProcessReply(Transaction replyTransaction) { replyTransaction.CheckOrdersLength(); ProcessEvents(() => { if (replyTransaction.ErrorInfo != null) { foreach (var order in replyTransaction.Orders) { RaiseOrderFailed(order, replyTransaction.ErrorInfo); } } else { switch (replyTransaction.OriginTransaction.Message) { case Messages.FutureRegister: case Messages.OptionRegister: { AddNewOrderByIdIfNeeded(replyTransaction.Orders[0], replyTransaction.GetOrderId()); break; } case Messages.FutureReRegister: case Messages.OptionReRegister: { switch (replyTransaction.Orders.Length) { case 4: //Перерегистрация пары заявок { CancelOrderIfNeeded(replyTransaction.Orders[0]); AddNewOrderByIdIfNeeded(replyTransaction.Orders[1], replyTransaction.GetFirstOrderId()); CancelOrderIfNeeded(replyTransaction.Orders[2]); AddNewOrderByIdIfNeeded(replyTransaction.Orders[3], replyTransaction.GetSecondOrderId()); break; } case 2: //Перерегистрация одной заявки { CancelOrderIfNeeded(replyTransaction.Orders[0]); AddNewOrderByIdIfNeeded(replyTransaction.Orders[1], replyTransaction.GetFirstOrderId()); break; } } break; } case Messages.FutureCancel: case Messages.OptionCancel: { CancelOrderIfNeeded(replyTransaction.Orders[0], replyTransaction.GetVolume()); break; } case Messages.FutureCancelGroup: case Messages.OptionCancelGroup: { var ordersCount = replyTransaction.GetOrderCount(); if (ordersCount == replyTransaction.Orders.Length) { foreach (var order in replyTransaction.Orders) CancelOrderIfNeeded(order); } else if (ordersCount == 0) { foreach (var order in replyTransaction.Orders) RaiseOrderFailed(order, new InvalidOperationException("Не удалось снять заявку {0}.".Put(order.TransactionId))); } break; } default: throw new ArgumentOutOfRangeException(); } } }); } private void AddNewOrderByIdIfNeeded(Order order, long orderId) { if (orderId > 0) { // mika // может приводить к deadlock-ам // http://stocksharp.com/forum/yaf_postst1960_Nie-prikhodit-informatsiia-o-zaiavkakh.aspx //using (order.BeginRead(true)) using (order.BeginRead()) { //Нулевой объём возможен при перерегистрации. //В этом случае ждём информации по потоку данных. if (order.Id != 0 || order.Volume == 0) return; } GetOrder(order.Security, orderId, key => { using (order.BeginUpdate()) { order.Id = orderId; order.Time = order.Security.GetMarketTime(); order.State = OrderStates.Active; return order; } }, o => false); } else { var exception = new InvalidOperationException("Не удалось зарегистрировать заявку {0}.".Put(order.TransactionId)); RaiseOrderFailed(order, exception); } } private void CancelOrderIfNeeded(Order order, int? newBalance = null) { using (order.BeginRead(true)) { //Проверка на OrderStates.None нужна, т.к. можем снять заявку по ext_id, //при этом получить вначале ответ по Cancel, затем лишь по Register. if (order.State == OrderStates.None || order.State == OrderStates.Active) { using (order.BeginUpdate()) { if (newBalance != null) order.Balance = newBalance.Value; else if (order.Balance > 1) { // мы не знаем оставшийся баланс (newBalance). // Если баланс в заявке баланс больше 1, то он ещё может обновиться для отменённой заявки по таблице заявок. // Поэтому такую заявку обновляем только по таблице заявок. return; } order.LastChangeTime = order.Security.GetMarketTime(); order.State = OrderStates.Done; } RaiseOrderChanged(order); } } } /// /// Загрузить настройки. /// /// Хранилище настроек. public override void Load(SettingsStorage storage) { base.Load(storage); Address = storage.GetValue("Address"); Login = storage.GetValue("Login"); Password = storage.GetValue("Password"); AppName = storage.GetValue("AppName"); UseLocalProtocol = storage.GetValue("UseLocalProtocol"); PollTimeOut = storage.GetValue("PollTimeOut"); OnlySystemTrades = storage.GetValue("OnlySystemTrades"); var tableIds = storage.GetValue("Tables"); if (tableIds != null) { this.SyncTables(tableIds); } CreateDepthFromOrdersLog = storage.GetValue("CreateDepthFromOrdersLog"); CreateTradesFromOrdersLog = storage.GetValue("CreateTradesFromOrdersLog"); var transactionStorage = storage.GetValue("TransactionManager"); if (transactionStorage != null) { TransactionManager.IsAsync = transactionStorage.GetValue("IsAsync"); TransactionManager.TransactionTimeOut = transactionStorage.GetValue("TransactionTimeOut"); TransactionManager.ThreadCount = transactionStorage.GetValue("ThreadCount"); } var streamStorage = storage.GetValue("StreamManager"); if (streamStorage != null) { StreamManager.ConfigPath = streamStorage.GetValue("ConfigPath"); var conGroupping = streamStorage.GetValue>("ConnectionGroupping"); foreach (var pair in conGroupping) { var stream = TableRegistry.StreamRegistry.AllStreams.First(s => s.Name == pair.Key); StreamManager.ConnectionGroupping[stream] = pair.Value; } } } /// /// Сохранить настройки. /// /// Хранилище настроек. public override void Save(SettingsStorage storage) { base.Save(storage); storage.SetValue("Address", Address.To()); storage.SetValue("Login", Login); storage.SetValue("Password", Password); storage.SetValue("AppName", AppName); storage.SetValue("UseLocalProtocol", UseLocalProtocol); storage.SetValue("PollTimeOut", PollTimeOut); storage.SetValue("OnlySystemTrades", OnlySystemTrades); storage.SetValue("Tables", Tables.SyncGet(c => c.Select(t => t.Id).ToArray())); if (Tables.Contains(TableRegistry.AnonymousOrdersLog)) { storage.SetValue("CreateDepthFromOrdersLog", CreateDepthFromOrdersLog); storage.SetValue("CreateTradesFromOrdersLog", CreateTradesFromOrdersLog); } var streamStorage = new SettingsStorage(); streamStorage.SetValue("ConfigPath", StreamManager.ConfigPath); streamStorage.SetValue("ConnectionGroupping", StreamManager.ConnectionGroupping.ToDictionary(p => p.Key.Name, p => p.Value)); storage.SetValue("StreamManager", streamStorage); var transactionStorage = new SettingsStorage(); transactionStorage.SetValue("IsAsync", TransactionManager.IsAsync); transactionStorage.SetValue("TransactionTimeOut", TransactionManager.TransactionTimeOut); transactionStorage.SetValue("ThreadCount", TransactionManager.ThreadCount); storage.SetValue("TransactionManager", transactionStorage); } /// /// Освободить занятые ресурсы. /// protected override void DisposeManaged() { UnSubscribeEvents(TableRegistry); if (DefaultFutureDepthTable != null) UnSubscribeDepthTable(DefaultFutureDepthTable); if (DefaultOptionDepthTable != null) UnSubscribeDepthTable(DefaultOptionDepthTable); StopExport(); StreamManager.Dispose(); _transactionManager.ProcessResponse -= ProcessReply; _connectionPool.Dispose(); } } }