From fe2e12c046724c95d930f6fbaa218ee863d57cfa Mon Sep 17 00:00:00 2001 From: timercrack Date: Mon, 13 May 2024 17:39:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=B8=8A=E6=9C=9F=E6=89=80?= =?UTF-8?q?=E8=A1=8C=E6=83=85=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 更新上期所行情接口 --- test/test.py | 53 ++++++++++++++-------------------------- test/test_api.py | 6 ++--- trader/main.py | 2 +- trader/utils/__init__.py | 6 ++--- 4 files changed, 26 insertions(+), 41 deletions(-) diff --git a/test/test.py b/test/test.py index f12f684..258b9dd 100644 --- a/test/test.py +++ b/test/test.py @@ -1,44 +1,29 @@ -import aioredis +import sys +import os +import django +if sys.platform == 'darwin': + sys.path.append('/Users/jeffchen/Documents/gitdir/dashboard') +elif sys.platform == 'win32': + sys.path.append(r'E:\github\dashboard') +else: + sys.path.append('/root/gitee/dashboard') +os.environ["DJANGO_SETTINGS_MODULE"] = "dashboard.settings" +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" +django.setup() import asyncio +import datetime +import pytz +from trader.utils import update_from_czce -async def reader(message): - print(f"in reader: {message}, type={type(message)}") - - -async def monitor1(pb: aioredis.client.PubSub): - async for msg in pb.listen(): - print(f"in monitor1: {msg}, type={type(msg)}") - asyncio.create_task(reader(msg)) - if msg['type'] == 'punsubscribe': - print('quit monitor1') - break - - -async def main(): - redis = await aioredis.from_url("redis://192.168.123.142", decode_responses=True) - pubsub1 = redis.pubsub() - pubsub2 = redis.pubsub() - await pubsub1.psubscribe('channel:*') - await pubsub2.psubscribe('channel*') - await redis.publish("channel:1", "1") - await redis.publish("channel:2", "6") - await redis.publish("channel:3", "3") - await pubsub1.punsubscribe() - await pubsub2.punsubscribe() - loop = asyncio.get_running_loop() - asyncio.run_coroutine_threadsafe(monitor1(pubsub1), loop) - asyncio.run_coroutine_threadsafe(monitor1(pubsub2), loop) - # await asyncio.gather(monitor1(pubsub1), monitor1(pubsub2)) - # await pubsub2.run() - print('main done!') - if __name__ == "__main__": try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.create_task(main()) - loop.run_forever() + day = datetime.datetime.now().replace(tzinfo=pytz.FixedOffset(480)) + day = day - datetime.timedelta(days=1) + loop.run_until_complete(update_from_czce(day)) + print("DONE!") except KeyboardInterrupt: pass diff --git a/test/test_api.py b/test/test_api.py index 382a0f5..fbb9837 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -19,7 +19,7 @@ if sys.platform == 'darwin': sys.path.append('/Users/jeffchen/Documents/gitdir/dashboard') elif sys.platform == 'win32': - sys.path.append(r'D:\GitHub\dashboard') + sys.path.append(r'E:\GitHub\dashboard') else: sys.path.append('/root/dashboard') os.environ["DJANGO_SETTINGS_MODULE"] = "dashboard.settings" @@ -45,7 +45,7 @@ def tearDown(self) -> None: async def test_get_shfe_data(self): self.assertTrue(await update_from_shfe(self.trading_day)) - @asynctest.skipIf(True, 'no need') + @asynctest.skipIf(False, 'no need') async def test_get_dce_data(self): self.assertTrue(await update_from_dce(self.trading_day)) @@ -74,7 +74,7 @@ async def test_get_all(self): result = await asyncio.gather(*tasks, return_exceptions=True) self.assertEqual(result, [True, True, True, True, True]) - @asynctest.skipIf(False, 'no need') + @asynctest.skipIf(True, 'no need') async def test_load_from_kt(self): self.assertTrue(load_kt_data(r'D:\test')) diff --git a/trader/main.py b/trader/main.py index 3c11935..0b3d21b 100644 --- a/trader/main.py +++ b/trader/main.py @@ -19,7 +19,7 @@ if sys.platform == 'darwin': sys.path.append('/Users/jeffchen/Documents/gitdir/dashboard') elif sys.platform == 'win32': - sys.path.append(r'D:\github\dashboard') + sys.path.append(r'E:\github\dashboard') else: sys.path.append('/root/gitee/dashboard') os.environ["DJANGO_SETTINGS_MODULE"] = "dashboard.settings" diff --git a/trader/utils/__init__.py b/trader/utils/__init__.py index 9ce52a0..91f1320 100644 --- a/trader/utils/__init__.py +++ b/trader/utils/__init__.py @@ -226,7 +226,7 @@ async def update_from_dce(day: datetime.datetime) -> bool: 'year': day.year, 'month': day.month-1, 'day': day.day}) as response: rst = await response.text() max_conn_dce.release() - for lines in rst.split('\r\n')[1:-3]: + for lines in rst.split('\r\n')[3:-3]: if '小计' in lines or '品种' in lines: continue inst_data_raw = [x.strip() for x in lines.split('\t')] @@ -297,7 +297,7 @@ async def update_from_cffex(day: datetime.datetime) -> bool: try: async with aiohttp.ClientSession() as session: await max_conn_cffex.acquire() - async with session.get(f"http://{cffex_ip}/fzjy/mrhq/{day.strftime('%Y%m/%d')}/index.xml") as response: + async with session.get(f"http://{cffex_ip}/sj/hqsj/rtj/{day.strftime('%Y%m/%d')}/index.xml?id=7") as response: rst = await response.text() max_conn_cffex.release() tree = ET.fromstring(rst) @@ -693,7 +693,7 @@ async def get_contracts_argument(day: datetime.datetime = None) -> bool: async with aiohttp.ClientSession() as session: # 上期所 async with session.get( - f'http://{shfe_ip}/data/instrument/ContractDailyTradeArgument{day_str}.dat') as response: + f'http://{shfe_ip}/data/busiparamdata/future/ContractDailyTradeArgument{day_str}.dat') as response: rst = await response.read() rst_json = json.loads(rst) for inst_data in rst_json['ContractDailyTradeArgument']: