-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuseful_decorators.py
113 lines (94 loc) · 3.89 KB
/
useful_decorators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from rich.console import Console
from functools import wraps
from pathlib import Path
import aiohttp
import asyncio
import random
from loguru import logger
from typing import Any, Callable, Coroutine
from useful_tools import read_statejson_and_get_cookie_headers
console = Console()
def semaphore_decorator(semaphore: asyncio.Semaphore = asyncio.Semaphore(10)):
def semaphore_decorator_wrapper(func):
async def wrapper(*args, **kwargs):
async with semaphore:
return await func(*args, **kwargs)
return wrapper
return semaphore_decorator_wrapper
def async_download_retry_decorator(
retry_times: int = 10,
sleep_interval_min: int = 1,
sleep_interval_max: int = 2,
reset_session_interval: int = 2,
) -> Callable[..., Coroutine[Any, Any, Any]]:
def wrapper2(
func: Callable[..., Coroutine[Any, Any, Any]]
) -> Callable[..., Coroutine[Any, Any, Any]]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception = None
for i in range(retry_times):
try:
return await func(*args, **kwargs)
except AssertionError as e:
console.print(
f"\nYou have an assertion error: {str(e)}\n", style="bold red"
)
raise e
except Exception as e:
last_exception = e
console.print(
f"\nError type: {type(e)}: {str(e)}\n", style="bold red"
)
console.print(
f"\nRetrying {func.__name__} for the {i+1}/{retry_times} time\n",
style="bold yellow",
)
await asyncio.sleep(
random.randint(sleep_interval_min, sleep_interval_max)
)
# if (i + 1) % reset_session_interval == 0:
# await reset_session(kwargs, func.__name__)
await handle_retry_limit(kwargs, func.__name__, last_exception)
return 0
return wrapper
return wrapper2
# async def reset_session(kwargs: dict, func_name: str) -> None:
# if "session" in kwargs and isinstance(kwargs["session"], aiohttp.ClientSession):
# if not kwargs["session"].closed:
# await kwargs["session"].close()
# # session = aiohttp.ClientSession()
# # local_addr = random.choice(["10.193.2.171", "192.168.0.103"])
# # connector = aiohttp.TCPConnector(local_addr=(local_addr, 0))
# connector = None
# cookies, headers = read_statejson_and_get_cookie_headers()
# kwargs["session"] = aiohttp.ClientSession(
# timeout=aiohttp.ClientTimeout(connect=6),
# # headers=headers,
# cookies=cookies,
# connector=connector
# )
# console.print(f"\n{func_name} session has been reset\n", style="bold green")
async def handle_retry_limit(
kwargs: dict, func_name: str, last_exception: Exception
) -> None:
# if "file_save_path" in kwargs and isinstance(kwargs["file_save_path"], (str, Path)):
# file_path = (
# Path(kwargs["file_save_path"])
# if isinstance(kwargs["file_save_path"], str)
# else kwargs["file_save_path"]
# )
# if file_path.exists():
# logger.error(f"Deleting {file_path.name} because of reached retry limit")
# file_path.unlink()
if (
"session" in kwargs
and isinstance(kwargs["session"], aiohttp.ClientSession)
# and not kwargs["session"].closed
):
await kwargs["session"].close()
console.print(f"\n{func_name} session has been closed\n", style="bold green")
console.print(
f"\nRetry limit reached for {func_name}: {str(last_exception)}\n",
style="bold red",
)