diff --git a/changelog.md b/changelog.md index 7407c64..278c36d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] + +### Changed + +- 用 `gevent` 替代 `multitasking` + ## v0.5.0(2023-01-08) ### Added diff --git a/efinance/__init__.py b/efinance/__init__.py index 2a37add..89e1ee3 100644 --- a/efinance/__init__.py +++ b/efinance/__init__.py @@ -2,11 +2,6 @@ .. include:: ../README.md """ __docformat__ = "restructuredtext" -from efinance.api import ( - bond, - fund, - futures, - stock, -) +from efinance.api import bond, fund, futures, stock __all__ = ['stock', 'fund', 'bond', 'futures'] diff --git a/efinance/bond/getter.py b/efinance/bond/getter.py index 955e890..6debda3 100644 --- a/efinance/bond/getter.py +++ b/efinance/bond/getter.py @@ -1,8 +1,9 @@ from typing import Dict, List, Union -import multitasking +import gevent import pandas as pd import requests +from tqdm import tqdm from ..common import get_deal_detail as get_deal_detail_for_bond from ..common import get_history_bill as get_history_bill_for_bond @@ -10,7 +11,7 @@ from ..common import get_realtime_quotes_by_fs from ..common import get_today_bill as get_today_bill_for_bond from ..common.config import EASTMONEY_REQUEST_HEADERS, FS_DICT, MagicConfig -from ..utils import get_quote_id, process_dataframe_and_series, to_numeric +from ..utils import get_quote_id, gevent_task, process_dataframe_and_series, to_numeric from .config import EASTMONEY_BOND_BASE_INFO_FIELDS @@ -65,15 +66,17 @@ def get_base_info_multi(bond_codes: List[str]) -> pd.DataFrame: 多只债券信息 """ ss = [] + bar = tqdm(total=len(bond_codes)) - @multitasking.task + @gevent_task def start(bond_code: str) -> None: s = get_base_info_single(bond_code) ss.append(s) + bar.set_description(f'Processing => {bond_code}') + bar.update(1) - for bond_code in bond_codes: - start(bond_code) - multitasking.wait_for_tasks() + gevent.joinall([start(bond_code) for bond_code in bond_codes]) + bar.close() df = pd.DataFrame(ss) return df @@ -291,7 +294,6 @@ def get_quote_history( ) if isinstance(df, pd.DataFrame): - df.rename(columns={'代码': '债券代码', '名称': '债券名称'}, inplace=True) elif isinstance(df, dict): for bond_code in df.keys(): diff --git a/efinance/common/getter.py b/efinance/common/getter.py index c8e4196..2a88fd2 100644 --- a/efinance/common/getter.py +++ b/efinance/common/getter.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Dict, List, Union -import multitasking +import gevent import pandas as pd from jsonpath import jsonpath from retry import retry @@ -9,7 +9,7 @@ from ..common.config import MARKET_NUMBER_DICT from ..shared import BASE_INFO_CACHE, session -from ..utils import get_quote_id, to_numeric +from ..utils import get_quote_id, gevent_task, to_numeric from .config import ( EASTMONEY_BASE_INFO_FIELDS, EASTMONEY_HISTORY_BILL_FIELDS, @@ -134,7 +134,7 @@ def get_quote_history_multi( dfs: Dict[str, pd.DataFrame] = {} total = len(codes) - @multitasking.task + @gevent_task @retry(tries=tries, delay=1) def start(code: str): _df = get_quote_history_single( @@ -145,9 +145,7 @@ def start(code: str): pbar.set_description_str(f'Processing => {code}') pbar = tqdm(total=total) - for code in codes: - start(code) - multitasking.wait_for_tasks() + gevent.joinall([start(code) for code in codes]) pbar.close() if kwargs.get(MagicConfig.RETURN_DF): return pd.concat(dfs, axis=0, ignore_index=True) @@ -308,7 +306,6 @@ def get_today_bill(code: str) -> pd.DataFrame: @to_numeric def get_base_info(quote_id: str) -> pd.Series: - fields = ",".join(EASTMONEY_BASE_INFO_FIELDS.keys()) params = ( ('ut', 'fa5fd1943c7b386f172d6893dbfba10b'), diff --git a/efinance/fund/getter.py b/efinance/fund/getter.py index 003c98c..16d0381 100644 --- a/efinance/fund/getter.py +++ b/efinance/fund/getter.py @@ -1,9 +1,8 @@ import os import re -import signal from typing import List, Union -import multitasking +import gevent import pandas as pd import requests import rich @@ -11,11 +10,9 @@ from retry import retry from tqdm import tqdm -from ..utils import to_numeric +from ..utils import gevent_task, to_numeric from .config import EastmoneyFundHeaders -signal.signal(signal.SIGINT, multitasking.killall) - @retry(tries=3) @to_numeric @@ -608,7 +605,7 @@ def get_base_info_muliti(fund_codes: List[str]) -> pd.Series: ss = [] - @multitasking.task + @gevent_task @retry(tries=3, delay=1) def start(fund_code: str) -> None: s = get_base_info_single(fund_code) @@ -617,9 +614,7 @@ def start(fund_code: str) -> None: pbar.set_description(f'Processing => {fund_code}') pbar = tqdm(total=len(fund_codes)) - for fund_code in fund_codes: - start(fund_code) - multitasking.wait_for_tasks() + gevent.joinall([start(fund_code) for fund_code in fund_codes]) df = pd.DataFrame(ss) return df @@ -737,7 +732,6 @@ def get_industry_distribution( elif dates is None: dates = [None] for date in dates: - params = [ ('FCODE', fund_code), ('OSVersion', '14.4'), @@ -791,7 +785,7 @@ def get_pdf_reports(fund_code: str, max_count: int = 12, save_dir: str = 'pdf') 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', } - @multitasking.task + @gevent_task @retry(tries=3, delay=1) def download_file( fund_code: str, url: str, filename: str, file_type='.pdf' @@ -840,11 +834,11 @@ def download_file( pbar = tqdm(total=min(max_count, len(json_response['Data']))) if not os.path.exists(save_dir): os.mkdir(save_dir) - for item in json_response['Data'][-max_count:]: - - title = item['TITLE'] - download_url = base_link.format(item['ID']) - download_file(fund_code, download_url, title) - multitasking.wait_for_tasks() + gevent.joinall( + [ + download_file(fund_code, base_link.format(item['ID']), item['TITLE']) + for item in json_response['Data'][-max_count:] + ] + ) pbar.close() print(f'{fund_code} 的 pdf 文件已存储到文件夹 {save_dir}/{fund_code} 中') diff --git a/efinance/futures/getter.py b/efinance/futures/getter.py index cfad4e1..3e0462b 100644 --- a/efinance/futures/getter.py +++ b/efinance/futures/getter.py @@ -143,7 +143,6 @@ def get_quote_history( quote_ids, beg=beg, end=end, klt=klt, fqt=fqt, quote_id_mode=True ) if isinstance(df, pd.DataFrame): - df.rename(columns={'代码': '期货代码', '名称': '期货名称'}, inplace=True) elif isinstance(df, dict): for stock_code in df.keys(): diff --git a/efinance/stock/getter.py b/efinance/stock/getter.py index 85f65f9..ed948f6 100644 --- a/efinance/stock/getter.py +++ b/efinance/stock/getter.py @@ -1,11 +1,10 @@ import calendar import json -import signal import sys from datetime import datetime, timedelta from typing import Dict, List, Union -import multitasking +import gevent import numpy as np import pandas as pd import requests @@ -25,6 +24,7 @@ from ..shared import session from ..utils import ( get_quote_id, + gevent_task, process_dataframe_and_series, search_quote, to_numeric, @@ -35,7 +35,6 @@ EASTMONEY_STOCK_DAILY_BILL_BOARD_FIELDS, ) -signal.signal(signal.SIGINT, multitasking.killall) python_version = sys.version_info.major, sys.version_info.minor # * 适配 pythn 3.10 及其以上版本 if python_version >= (3, 10): @@ -81,7 +80,7 @@ def get_base_info_muliti(stock_codes: List[str]) -> pd.DataFrame: 多只股票基本信息 """ - @multitasking.task + @gevent_task @retry(tries=3, delay=1) def start(stock_code: str): s = get_base_info_single(stock_code) @@ -91,9 +90,7 @@ def start(stock_code: str): series: List[pd.Series] = [] pbar = tqdm(total=len(stock_codes)) - for stock_code in stock_codes: - start(stock_code) - multitasking.wait_for_tasks() + gevent.joinall([start(stock_code) for stock_code in stock_codes]) df = pd.DataFrame(series) df = df.dropna(subset=['股票代码']) return df @@ -243,7 +240,6 @@ def get_quote_history( stock_codes, beg=beg, end=end, klt=klt, fqt=fqt, **kwargs ) if isinstance(df, pd.DataFrame): - df.rename(columns={'代码': '股票代码', '名称': '股票名称'}, inplace=True) elif isinstance(df, dict): for stock_code in df.keys(): @@ -355,7 +351,6 @@ def get_realtime_quotes(fs: Union[str, List[str]] = None, **kwargs) -> pd.DataFr fs = [fs] if isinstance(fs, list): - for f in fs: if not FS_DICT.get(f): raise KeyError(f'指定的行情参数 `{fs}` 不正确') @@ -996,7 +991,6 @@ def get_daily_billboard(start_date: str = None, end_date: str = None) -> pd.Data bar: tqdm = None while 1: - dfs: List[pd.DataFrame] = [] page = 1 while 1: diff --git a/efinance/utils/__init__.py b/efinance/utils/__init__.py index b66a1be..f67b4a8 100644 --- a/efinance/utils/__init__.py +++ b/efinance/utils/__init__.py @@ -5,6 +5,7 @@ from functools import wraps from typing import Any, Callable, Dict, List, TypeVar, Union +import gevent import pandas as pd import rich from retry.api import retry @@ -40,12 +41,10 @@ def run(*args, **kwargs): if isinstance(values, pd.DataFrame): for column in values.columns: if column not in ignore: - values[column] = values[column].apply(convert) elif isinstance(values, pd.Series): for index in values.index: if index not in ignore: - values[index] = convert(values[index]) return values @@ -346,4 +345,12 @@ def add_market( FS_DICT[category] = new +def gevent_task(f: Callable): + # TODO: 增加类型注解 + def wrapper(*args, **kwargs): + return gevent.spawn(f, *args, **kwargs) + + return wrapper + + __all__ = [] diff --git a/requirements.txt b/requirements.txt index d6959a6..30cfe8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,6 @@ requests tqdm pandas retry -multitasking jsonpath -rich \ No newline at end of file +rich +gevent \ No newline at end of file diff --git a/setup.py b/setup.py index 6e15971..5aafa18 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,15 @@ here = pathlib.Path(__file__).parent # require = (here / "requirements.txt").read_text(encoding='utf-8').split() -require = ['requests', 'rich', 'jsonpath', 'pandas', 'tqdm', 'retry', 'multitasking'] +require = [ + 'requests', + 'rich', + 'jsonpath', + 'pandas', + 'tqdm', + 'retry', + 'gevent', +] readme = (here / "README.md").read_text(encoding='utf-8') about = {} exec((here / 'efinance' / '__version__.py').read_text(encoding='utf-8'), about)