From 2f0a3db230691cf3968738ec56bdb04afcdd5157 Mon Sep 17 00:00:00 2001 From: pixeldeee Date: Thu, 31 Aug 2023 19:15:47 +0300 Subject: [PATCH] Presence and timer() --- pytecord/__init__.py | 1 + pytecord/client.py | 26 ++++++++++++++++++++- pytecord/guild.py | 2 +- pytecord/presence.py | 55 ++++++++++++++++++++++++++++++++++++++++++++ pytecord/timer.py | 28 ++++++++++++++++++++++ 5 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 pytecord/presence.py create mode 100644 pytecord/timer.py diff --git a/pytecord/__init__.py b/pytecord/__init__.py index 16c47c9..534a4fd 100644 --- a/pytecord/__init__.py +++ b/pytecord/__init__.py @@ -4,3 +4,4 @@ from .reaction import Emoji, Sticker from .user import User, GuildMember from .commands import Interaction +from .presence import Presence, Activity diff --git a/pytecord/client.py b/pytecord/client.py index 1b24983..0b00e6b 100644 --- a/pytecord/client.py +++ b/pytecord/client.py @@ -8,6 +8,8 @@ from .user import User from .utils import get_option_type, rget from .web import BaseWebhook +from .presence import Presence +from .timer import TimerLoop if TYPE_CHECKING: from .web import GatewayOutput @@ -16,7 +18,9 @@ class Client: def __init__(self, token: str, debug: bool = False) -> None: self.webhook = BaseWebhook(token, debug) self.token = token + self.timers: list[TimerLoop] = [] self.__intents = GatewayIntents.GUILD_INTEGRATIONS + self.__presence = None @property def user(self) -> User: @@ -25,6 +29,16 @@ def user(self) -> User: @property def guilds(self) -> list[Guild]: return self.webhook.get_current_user_guilds() + + @property + def presence(self) -> Presence: + return self.__presence + + @presence.setter + def presence(self, new: Presence) -> Presence: + self.__presence = new + self.__presence.register(self.webhook) + return self.__presence def listen(self): def decorator(func_to_decorate: Callable[..., Coroutine[Any, Any, Any]]): @@ -32,6 +46,8 @@ def decorator(func_to_decorate: Callable[..., Coroutine[Any, Any, Any]]): match event_name: case 'ready': async def func(data: 'GatewayOutput'): + for i in self.timers: + i.run() await self.webhook.register_app_commands(data) await func_to_decorate() @@ -57,6 +73,12 @@ async def func(data: 'GatewayOutput'): return decorator + def timer(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0): + def wrapper(func_to_decorate: Callable[..., Coroutine[Any, Any, Any]]): + timer = TimerLoop(func_to_decorate, days, hours, minutes, seconds) + self.timers.append(timer) + return wrapper + def command(self): def wrapper(func_to_decorate: Callable[..., Coroutine[Any, Any, Any]]): name = func_to_decorate.__name__ @@ -84,10 +106,12 @@ def get_channel(self, id: int) -> GuildChannel: def run(self): if not self.webhook.listener.events.get('READY'): async def func(data: 'GatewayOutput'): - self.__user_id = data.d['user']['id'] + for i in self.timers: + i.run() await self.webhook.register_app_commands(data) self.webhook.add_event('READY', func) try: + self.__presence = Presence([]) arun(self.webhook.run(self.__intents)) except KeyboardInterrupt: exit(0) diff --git a/pytecord/guild.py b/pytecord/guild.py index a29d906..0dffe3e 100644 --- a/pytecord/guild.py +++ b/pytecord/guild.py @@ -384,7 +384,7 @@ def __int__(self) -> int: def __str__(self) -> str: return self.name - def __getitem__(self, key: int) + def __getitem__(self, key: int): return self.fetch(key) def eval(self) -> dict[str, Any]: diff --git a/pytecord/presence.py b/pytecord/presence.py new file mode 100644 index 0000000..d6789df --- /dev/null +++ b/pytecord/presence.py @@ -0,0 +1,55 @@ +from typing import Literal, Any +from datetime import datetime +from time import mktime +from asyncio import create_task + +from .web import BaseWebhook, GatewayRequest + +class Activity: + def __init__(self, name: str, type: Literal[0, 1, 2, 3, 4, 5] = 0, state: str = None, *, url: str = None, data: dict[str, Any] = None) -> None: + if data: + self.name = data.get('name') + self.type = data.get('type') + self.url = data.get('url') + self.created_at = data.get('created_at') + self.timestamps = data.get('timestamps') + self.application_id = data.get('application_id') + self.details = data.get('details') + self.state = data.get('state') + self.emoji = data.get('emoji') + self.party = data.get('party') + self.assets = data.get('assets') + self.secrets = data.get('secrets') + self.instance = data.get('instance') + self.flags = data.get('flags') + self.buttons = data.get('buttons') + else: + self.name = name + self.type = type + self.url = url + self.state = state + + def eval(self) -> dict[str, Any]: + return { + 'name': self.name, + 'type': self.type, + 'url': self.url, + 'state': self.state + } + + +class Presence: + def __init__(self, activities: list[Activity], status: Literal['online', 'dnd', 'idle', 'invisible', 'offline'] = 'online', afk: bool = False) -> None: + self.since: int = int(mktime(datetime.now().timetuple()) * 1000) + self.activities = activities + self.status = status + self.afk = afk + + def register(self, webhook: BaseWebhook): + if webhook.stream.running: + create_task(webhook.stream.send_request(GatewayRequest(3, { + 'since': self.since, + 'activities': [i.eval() for i in self.activities], + 'status': self.status, + 'afk': self.afk + }))) diff --git a/pytecord/timer.py b/pytecord/timer.py new file mode 100644 index 0000000..23a3c15 --- /dev/null +++ b/pytecord/timer.py @@ -0,0 +1,28 @@ +from typing import Callable +from asyncio import create_task +from datetime import datetime +from time import mktime + +class TimerLoop: + def __init__(self, callable: Callable, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> None: + self.callable = callable + + self.every_time = seconds + self.every_time += minutes * 60 + self.every_time += hours * 60 * 60 + self.every_time += days * 60 * 60 * 24 + + async def __thread(self): + start_time = int(mktime(datetime.now().timetuple())) + + while True: + time = int(mktime(datetime.now().timetuple())) + + if time - start_time >= self.every_time: + start_time = int(mktime(datetime.now().timetuple())) + + await self.callable() + + + def run(self): + create_task(self.__thread())