-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
229 lines (200 loc) · 8.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import os
from typing import Union
from beanie import init_beanie
from beanie.odm.operators.update.general import Set
from dotenv import load_dotenv
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from stock_analysis.indicator import Indicator
from stock_analysis.momentum_strategy import MomentumStrategy
from stock_analysis.schema.api import *
from stock_analysis.schema.db import AsyncNiftyIndex, AsyncNiftySector
load_dotenv()
client = AsyncIOMotorClient(os.environ["MONGODB_CONNECTION_STRING"])
__version__ = "2.2"
# FastAPI app init
app = FastAPI(
title="Stock Analysis",
description="An helping hand to identify & analyze stocks/company to invest",
version=__version__,
)
# REST API for heal check
@app.get("/")
def _index():
"""Health check"""
return {"message": "OK", "version": __version__}
# REST API for running algo strategy
@app.post("/api/momentum/relative-momentum/")
def relative_momentum(input_response: RelativeMomentum):
"""REST API for running relative-momentum algo strategy"""
ms = MomentumStrategy(company_name=input_response.company)
result = ms.relative_momentum(
end_date=input_response.end_date,
top_company_count=input_response.top_company_count,
save=False,
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/momentum/relative-momentum-ema/")
def relative_momentum_ema(input_response: RelativeMomentumEMA):
"""REST API for running relative-momentum-ema algo strategy"""
ms = MomentumStrategy(company_name=input_response.company)
result = ms.relative_momentum_with_ema(
end_date=input_response.end_date,
top_company_count=input_response.top_company_count,
ema_canditate=input_response.ema_candidate,
save=False,
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/momentum/absolute-momentum-dma/")
def absolute_momentum_dma(input_response: AbsoluteMomentumDMA):
"""REST API for running absolute-momentum-dma algo strategy"""
ms = MomentumStrategy(company_name=input_response.company)
result = ms.absolute_momentum_with_dma(
end_date=input_response.end_date,
period=input_response.period,
cutoff=input_response.cutoff,
save=False,
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/indicator/volume-n-days/")
def volume_n_days(input_response: VolumeNDaysIndicator):
"""REST API for running volume-n-days algo strategy"""
ind = Indicator(company_name=input_response.company)
result = ind.volume_n_days_indicator(duration=input_response.duration, save=False)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/indicator/ema-indicator-short/")
def ema_indicator_short(input_response: EMAIndicator):
"""REST API for running ema-indicator-short algo strategy"""
ind = Indicator(company_name=input_response.company)
result = ind.ema_indicator(
ema_canditate=input_response.ema_candidate,
cutoff_date=input_response.cutoff_date,
save=False,
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/indicator/ema-indicator-detail/")
def ema_indicator_detail(input_response: EMAIndicator):
"""REST API for running ema-indicator-detail algo strategy"""
ind = Indicator(company_name=input_response.company)
result = ind.ema_detail_indicator(
ema_canditate=input_response.ema_candidate,
cutoff_date=input_response.cutoff_date,
save=False,
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
@app.post("/api/indicator/ema-crossover-indicator/")
def ema_crossover_indicator(input_response: EMACrossoverIndicator):
"""REST API for running ema-crossover-indicator algo strategy"""
ind = Indicator(company_name=input_response.company)
result = ind.ema_crossover_detail_indicator(
ema_canditate=input_response.ema_candidate, save=False
)
# NOTE - by default result is return as dataframe's index as keys so changing it to symbol
result = result.set_index(
"symbol"
).T # need to perform `transpose` to match expected output
return result
# REST API for performing CRUD ops on MongoDB
@app.get("/api/db/{collection}/{document_id}")
async def _db_get_document(collection: str, document_id: str):
"""REST API for for performing MongoDB CRUD Ops - GET a specific document from desired collection"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[AsyncNiftyIndex, AsyncNiftySector],
)
if collection == "nifty-index":
return await AsyncNiftyIndex.get(document_id)
if collection == "nifty-sector":
return await AsyncNiftySector.get(document_id)
@app.post("/api/db/{collection}/query")
async def _db_find_documents(
collection: str,
query: dict,
skip: Union[int, None] = None,
limit: Union[int, None] = None,
):
"""REST API for for performing MongoDB CRUD Ops - Find document(s) based on desired query"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[AsyncNiftyIndex, AsyncNiftySector],
)
if collection == "nifty-index":
return await AsyncNiftyIndex.find_many(query).skip(skip).limit(limit).to_list()
if collection == "nifty-sector":
return await AsyncNiftySector.find_many(query).skip(skip).limit(limit).to_list()
@app.post("/api/db/nifty-index/insert")
async def _insert_document_nifty_index(documents: list[NiftyIndex]):
"""REST API for for performing MongoDB CRUD Ops - Insert/Add document nifty-index collection"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[AsyncNiftyIndex],
)
await AsyncNiftyIndex.insert_many(
[AsyncNiftyIndex(**document.dict()) for document in documents]
)
return {"message": "OK"}
@app.post("/api/db/nifty-index/insert")
async def _insert_document_nifty_sector(documents: list[NiftySector]):
"""REST API for for performing MongoDB CRUD Ops - Insert/Add document nifty-index collection"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[NiftySector],
)
await AsyncNiftySector.insert_many(
[AsyncNiftySector(**document.dict()) for document in documents]
)
return {"message": "OK"}
@app.delete("/api/db/{collection}/remove")
async def _delete_document(collection: str, query: dict):
"""REST API for for performing MongoDB CRUD Ops - Delete a document based on query"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[AsyncNiftyIndex, AsyncNiftySector],
)
if collection == "nifty-index":
await AsyncNiftyIndex.find(query).delete()
if collection == "nifty-sector":
await AsyncNiftySector.find(query).delete()
return {"message": "OK"}
@app.patch("/api/db/{collection}/update")
async def _update_document(collection: str, find_query: dict, update_query: dict):
"""REST API for for performing MongoDB CRUD Ops - Update desired document"""
# Initialize beanie with the Product document class
await init_beanie(
database=client["stock-repo-db"],
document_models=[AsyncNiftyIndex, AsyncNiftySector],
)
if collection == "nifty-index":
await AsyncNiftyIndex.find(find_query).update(Set(update_query))
if collection == "nifty-sector":
await AsyncNiftySector.find(find_query).update(Set(update_query))
return {"message": "OK"}