-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexecution_context.py
More file actions
173 lines (144 loc) · 5.01 KB
/
execution_context.py
File metadata and controls
173 lines (144 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# -*- coding: utf-8 -*-
#
# Copyright 2017 Ricequant, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import wraps
from contextlib import contextmanager
from .utils.i18n import gettext as _
from .utils.exception import CustomException, patch_user_exc
from .utils import get_upper_underlying_symbol
from .utils.default_future_info import DEFAULT_FUTURE_INFO
class ContextStack(object):
def __init__(self):
self.stack = []
def push(self, obj):
self.stack.append(obj)
def pop(self):
try:
return self.stack.pop()
except IndexError:
raise RuntimeError("stack is empty")
@contextmanager
def pushed(self, obj):
self.push(obj)
try:
yield self
finally:
self.pop()
@property
def top(self):
try:
return self.stack[-1]
except IndexError:
raise RuntimeError("stack is empty")
class ExecutionContext(object):
stack = ContextStack()
config = None
data_proxy = None
account = None
accounts = None
broker = None
calendar_dt = None
trading_dt = None
plots = None
def __init__(self, phase, bar_dict=None):
self.phase = phase
self.bar_dict = bar_dict
def _push(self):
self.stack.push(self)
def _pop(self):
popped = self.stack.pop()
if popped is not self:
raise RuntimeError("Popped wrong context")
return self
def __enter__(self):
self._push()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Restore the algo instance stored in __enter__.
"""
if exc_type is None:
self._pop()
return False
# 处理嵌套ExecutionContext
last_exc_val = exc_val
while isinstance(exc_val, CustomException):
last_exc_val = exc_val
if exc_val.error.exc_val is not None:
exc_val = exc_val.error.exc_val
else:
break
if isinstance(last_exc_val, CustomException):
raise last_exc_val
from .utils import create_custom_exception
user_exc = create_custom_exception(exc_type, exc_val, exc_tb, self.config.base.strategy_file)
raise user_exc
@classmethod
def get_active(cls):
return cls.stack.top
@classmethod
def get_current_bar_dict(cls):
ctx = cls.get_active()
return ctx.bar_dict
@classmethod
def get_current_calendar_dt(cls):
return ExecutionContext.calendar_dt
@classmethod
def get_current_trading_dt(cls):
return ExecutionContext.trading_dt
@classmethod
def get_current_run_id(cls):
return ExecutionContext.config.base.run_id
@classmethod
def get_instrument(cls, order_book_id):
return ExecutionContext.data_proxy.instruments(order_book_id)
@classmethod
def get_data_proxy(cls):
return ExecutionContext.data_proxy
@classmethod
def enforce_phase(cls, *phases):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
phase = cls.get_active().phase
if phase not in phases:
raise patch_user_exc(
RuntimeError(_("You cannot call %s when executing %s") % (func.__name__, phase.value)))
return func(*args, **kwargs)
return wrapper
return decorator
@classmethod
def get_current_close_price(cls, order_book_id):
return ExecutionContext.data_proxy.current_snapshot(
order_book_id,
ExecutionContext.config.base.frequency,
ExecutionContext.calendar_dt
).last
@classmethod
def get_future_commission_info(cls, order_book_id, hedge_type):
try:
return ExecutionContext.data_proxy.get_future_info(order_book_id, hedge_type)
except NotImplementedError:
underlying_symbol = get_upper_underlying_symbol(order_book_id)
return DEFAULT_FUTURE_INFO[underlying_symbol][hedge_type.value]
@classmethod
def get_future_margin(cls, order_book_id):
try:
return ExecutionContext.data_proxy.get_future_info(order_book_id)['long_margin_ratio']
except NotImplementedError:
return ExecutionContext.data_proxy.instruments(order_book_id).margin_rate
@classmethod
def get_future_info(cls, order_book_id, hedge_type):
return ExecutionContext.data_proxy.get_future_info(order_book_id, hedge_type)