Skip to content

Commit

Permalink
restructure some classes
Browse files Browse the repository at this point in the history
  • Loading branch information
bsdz committed Mar 28, 2024
1 parent 1249b0c commit d739ed1
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 261 deletions.
3 changes: 2 additions & 1 deletion yabte/backtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
PositionalOrderCheckType,
SimpleOrder,
)
from yabte.backtest.strategy import Strategy, StrategyRunner
from yabte.backtest.strategy import Strategy
from yabte.backtest.strategyrunner import StrategyRunner
from yabte.backtest.transaction import CashTransaction, Trade

__all__ = [
Expand Down
53 changes: 52 additions & 1 deletion yabte/backtest/order.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import logging
from collections import Counter, deque
from dataclasses import dataclass, field
from decimal import Decimal
from enum import Enum
from typing import Dict, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

import pandas as pd
from mypy_extensions import mypyc_attr
Expand Down Expand Up @@ -104,6 +105,56 @@ def apply(self, ts: pd.Timestamp, day_data: pd.Series, asset_map: Dict[str, Asse
raise NotImplementedError("The apply methods needs to be implemented.")


class Orders:
"""Double ended queue of orders."""

def __init__(self):
self.deque = deque()

def __len__(self):
return len(self.deque)

def __iter__(self):
return iter(self.deque)

def popleft(self):
return self.deque.popleft()

def append(self, order: Order):
return self.deque.append(order)

def extend(self, orders: Iterable[Order]):
return self.deque.extend(orders)

def sort_by_priority(self):
"""Sorts orders by order priority."""
ou_sorted = sorted(self.deque, key=lambda o: o.priority, reverse=True)
self.deque.clear()
self.deque.extend(ou_sorted)

def remove_duplicate_keys(self) -> List[Order]:
"""Remove older orders with same key.
Returns a list of orders than were removed with status set to REPLACED.
"""
removed = []
cntr = Counter(o.key for o in self.deque if o.key is not None)
if any(v > 1 for v in cntr.values()):
kept = []
while self.deque:
o = self.deque.popleft()
if o.key in cntr and cntr[o.key] > 1:
o.status = OrderStatus.REPLACED
removed.append(o)
cntr[o.key] -= 1
else:
kept.append(o)
self.deque.clear()
self.deque.extend(kept)

return removed


@mypyc_attr(allow_interpreted_subclasses=True)
@dataclass(kw_only=True)
class SimpleOrder(Order):
Expand Down
264 changes: 5 additions & 259 deletions yabte/backtest/strategy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import logging
from collections import Counter, deque
from copy import deepcopy
from dataclasses import dataclass, field
from dataclasses import dataclass
from itertools import chain, product
from typing import Any, Dict, Iterable, List, Optional, Type
from typing import Dict

import pandas as pd
from mypy_extensions import mypyc_attr
Expand All @@ -13,60 +11,12 @@
from pandas import DataFrame, Series, Timestamp # type: ignore

from .asset import ADFI_AVAILABLE_AT_OPEN, Asset, AssetName
from .book import Book, BookMandate, BookName
from .order import Order, OrderStatus
from .book import Book, BookName
from .order import Orders

logger = logging.getLogger(__name__)

__all__ = ["Strategy", "StrategyRunner"]


class Orders:
def __init__(self):
self.deque = deque()

def __len__(self):
return len(self.deque)

def __iter__(self):
return iter(self.deque)

def popleft(self):
return self.deque.popleft()

def append(self, order: Order):
return self.deque.append(order)

def extend(self, orders: Iterable[Order]):
return self.deque.extend(orders)

def sort_by_priority(self):
"""Sorts orders by order priority."""
ou_sorted = sorted(self.deque, key=lambda o: o.priority, reverse=True)
self.deque.clear()
self.deque.extend(ou_sorted)

def remove_duplicate_keys(self) -> List[Order]:
"""Remove older orders with same key.
Returns a list of orders than were removed with status set to REPLACED.
"""
removed = []
cntr = Counter(o.key for o in self.deque if o.key is not None)
if any(v > 1 for v in cntr.values()):
kept = []
while self.deque:
o = self.deque.popleft()
if o.key in cntr and cntr[o.key] > 1:
o.status = OrderStatus.REPLACED
removed.append(o)
cntr[o.key] -= 1
else:
kept.append(o)
self.deque.clear()
self.deque.extend(kept)

return removed
__all__ = ["Strategy"]


@mypyc_attr(allow_interpreted_subclasses=True)
Expand Down Expand Up @@ -164,207 +114,3 @@ def on_close(self):
this timestamp is accessible from `self.data`.
"""
pass


def _check_data(df, asset_map):
"""Check data structure correct."""

if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError("data index must be a datetimeindex")
if not df.index.is_monotonic_increasing:
raise ValueError("data needs to have increasing index")
if not df.index.is_unique:
raise ValueError("data index must be unique")

# column level 1 = asset, level 2 = field
if not isinstance(df.columns, pd.MultiIndex):
raise ValueError("data columns must be multindex asset/field")
if len(df.columns.levels) != 2:
raise ValueError("data columns multiindex must have 2 levels")

# for cartesian products
data_labels_data = set(df.columns.levels[0])
data_labels_asset = {a.data_label for a in asset_map.values()}
assets_missing_data = data_labels_asset - data_labels_data
if len(assets_missing_data):
raise ValueError(
f"some assets are missing corresponding data: {assets_missing_data}"
)

# check and fix data for each asset
dfs = {
asset.data_label: asset.check_and_fix_data(asset._filter_data(df))
for asset_name, asset in asset_map.items()
}

return pd.concat(dfs, axis=1)


@dataclass(kw_only=True)
class StrategyRunner:
"""Encapsulates the execution of multiple strategies.
Orders are captured in `orders_processed` and `orders_unprocessed`.
`books` is a list of books and if none provided a single book is
created called 'Main'. After execution summary book and trade
histories are captured in `book_history` and `transaction_history`.
"""

data: pd.DataFrame = field()
"""Dataframe of price data including columns High, Low, Open, Close, Volume for each
asset.
Both asset name and field make a multiindex column. The index should consist of
order pandas timestamps.
"""

assets: List[Asset]
"""Assets available to strategy."""

strat_classes: List[Type[Strategy]]
"""Strategy classes to be called within this runner."""

mandates: Dict[AssetName, BookMandate] = field(default_factory=dict)
"""Dictionary of asset mandates (experimental)."""

strat_params: Dict[str, Any] = field(default_factory=dict)
"""Parameters passed to all strategies."""

books: List[Book] = field(default_factory=list)
"""Books available to strategies.
If not supplied will be populated with single book named 'Main' denominated in USD.
"""

@property
def book_map(self) -> Dict[BookName, Book]:
"""Mapping from book name to book instance."""
return {book.name: book for book in self.books}

@property
def asset_map(self) -> Dict[AssetName, Asset]:
"""Mapping from asset name to asset instance."""
return {asset.name: asset for asset in self.assets}

_orders_unprocessed: Orders = field(default_factory=Orders)

@property
def orders_unprocessed(self) -> Orders:
"""Unprocessed orders queue."""
return self._orders_unprocessed

_orders_processed: List[Order] = field(default_factory=list)

@property
def orders_processed(self) -> List[Order]:
"""Processed orders list."""
return self._orders_processed

_strategies: List[Strategy] = field(default_factory=list)

@property
def strategies(self) -> List[Strategy]:
"""List of instantiated strategies."""
return self._strategies

_book_history: Optional[pd.DataFrame] = None

@property
def book_history(self) -> pd.DataFrame:
"""Dataframe with book cash, mtm and total value history."""
return pd.concat({b.name: b.history for b in self.books}, axis=1)

@property
def transaction_history(self) -> pd.DataFrame:
"""Dataframe with trade history."""
return pd.concat(
[pd.DataFrame(bk.transactions).assign(book=bk.name) for bk in self.books]
)

def __post_init__(self):
self.data = _check_data(self.data, self.asset_map)

# set up books
if not self.books:
self.books = [Book(name="Main", mandates=self.mandates)]

def run(self):
"""Execute each strategy through time."""

# made available where necessary
asset_map = self.asset_map
book_map = self.book_map

# calendar
calendar = self.data.index

# set up strategies
self._strategies = [
cls(
orders=self._orders_unprocessed,
params=pd.Series(self.strat_params, dtype=object),
books=book_map,
assets=asset_map,
)
for cls in self.strat_classes
]
for strat in self._strategies:
strat._data_lock = False
strat.data = deepcopy(self.data)
strat.init()
strat._data_lock = True

# run event loop
for ts in calendar:
logger.info(f"Processing timestep {ts}")

# open
for strat in self._strategies:
# provide window
strat._set_ts(ts)
strat._mask_open = True
strat.on_open()
strat._mask_open = False

# order applied with ts's data
day_data = self.data.loc[ts, :]

# sort orders by priority
self._orders_unprocessed.sort_by_priority()

# process orders
orders_next_ts = []
while self._orders_unprocessed:
order = self._orders_unprocessed.popleft()

# set book attribute if needed
if not isinstance(order.book, Book):
# fall back to first available book
order.book = book_map.get(order.book, self.books[0])

order.apply(ts, day_data, asset_map)

# add any child orders to next ts
orders_next_ts.extend(order.suborders)

if order.status == OrderStatus.OPEN:
orders_next_ts.append(order)
else:
self.orders_processed.append(order)

# extend with orders for next ts
self._orders_unprocessed.extend(orders_next_ts)

# remove older duplicate orders
replaced = self._orders_unprocessed.remove_duplicate_keys()
self.orders_processed.extend(replaced)

# close
for strat in self._strategies:
# provide window
strat._set_ts(ts)
strat.on_close()

# run book end-of-day tasks
for book in self.books:
book.eod_tasks(ts, day_data, asset_map)
Loading

0 comments on commit d739ed1

Please sign in to comment.