import requests import pandas as pd import numpy as np from datetime import datetime import re from bs4 import BeautifulSoup import os import time import concurrent.futures import json # 请求工具类 class RequestUtils: """网络请求工具类,提供重试和错误处理机制""" @staticmethod def fetch_with_retry(url, headers, method="get", json_data=None, data=None, max_retries=3, timeout=10): """通用的请求函数,支持重试机制""" for attempt in range(max_retries): try: if method.lower() == "get": response = requests.get(url, headers=headers, timeout=timeout) else: response = requests.post(url, headers=headers, json=json_data, data=data, timeout=timeout) response.raise_for_status() return response.json() except (requests.RequestException, ValueError) as e: if attempt == max_retries - 1: return None time.sleep(1) return None @staticmethod def fetch_html_with_retry(url, headers, max_retries=3, timeout=10, encoding=None): """获取HTML内容,支持重试机制""" for attempt in range(max_retries): try: response = requests.get(url, headers=headers, timeout=timeout) response.raise_for_status() if encoding: response.encoding = encoding return response.text except requests.RequestException as e: if attempt == max_retries - 1: return None time.sleep(1) return None # 股票数据获取类 class StockDataFetcher: """股票数据获取类,负责从各个数据源获取股票信息""" def __init__(self): self.base_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } def fetch_cls(self): """获取财联社热门股票数据""" url = "https://api3.cls.cn/v1/hot_stock?app=cailianpress&os=ios&sv=800&sign=f7f970ee36fc102317eeea2e5a6eb178" headers = { **self.base_headers, "Referer": "https://www.cls.cn/", "Accept": "application/json" } resp = RequestUtils.fetch_with_retry(url, headers) if not resp: return pd.DataFrame(columns=["代码", "cls_rank"]) data = resp.get("data", []) rows = [] for i, item in enumerate(data): stock = item["stock"] code = stock["StockID"][2:] rows.append({ "代码": code, "cls_rank": i + 1 }) return pd.DataFrame(rows) def fetch_em(self): """获取东方财富热门股票数据""" url = "https://emappdata.eastmoney.com/stockrank/getAllCurrentList" headers = { **self.base_headers, "Referer": "https://vipmoney.eastmoney.com/", "Accept": "application/json", "Content-Type": "application/json" } payload = { "appId": "appId01", "globalId": "786e4c21-70dc-435a-93bb-38", "marketType": "", "pageNo": 1, "pageSize": 100 } resp = RequestUtils.fetch_with_retry(url, headers, method="post", json_data=payload) if not resp: return pd.DataFrame(columns=["代码", "em_rank"]) data = resp.get("data", []) df = pd.DataFrame(data) if df.empty: return pd.DataFrame(columns=["代码", "em_rank"]) # 提取股票代码并添加排名 df["代码"] = df["sc"].str[2:] df["em_rank"] = df.index + 1 return df[["代码", "em_rank"]] def fetch_ths(self): """获取同花顺热门股票数据""" url = "https://dq.10jqka.com.cn/fuyao/hot_list_data/out/hot_list/v1/stock?stock_type=a&type=hour&list_type=normal" headers = { **self.base_headers, "Referer": "https://eq.10jqka.com.cn/frontend/thsTopRank/index.html#/", "Accept": "application/json" } resp = RequestUtils.fetch_with_retry(url, headers) if not resp: return pd.DataFrame(columns=["代码", "名称_ths", "涨幅_ths", "涨幅值", "概念_ths", "概念列表", "ths_rank"]) data = resp.get("data", {}).get("stock_list", [])[:100] rows = [] for i, item in enumerate(data): code = item.get("code") name = item.get("name") # 安全处理涨幅 raw_change = item.get("rise_and_fall") if raw_change is not None: change = f"{raw_change:.2f}%" change_value = raw_change # 保存原始涨幅数值用于计算 else: change = "--" change_value = np.nan tag_data = item.get("tag", {}) concept_tags = tag_data.get("concept_tag", []) popularity = tag_data.get("popularity_tag", "") all_tags = concept_tags + [popularity] if popularity else concept_tags tag_text = " | ".join(all_tags) if all_tags else "--" rows.append({ "代码": code, "名称_ths": name, "涨幅_ths": change, "涨幅值": change_value, "概念_ths": tag_text, "概念列表": concept_tags, "ths_rank": i + 1 }) return pd.DataFrame(rows) def fetch_tdx(self): """获取通达信热门股票数据""" url = "https://pul.tdx.com.cn/TQLEX?Entry=JNLPSE.hotStockList&RI=" headers = { **self.base_headers, 'Content-Type': 'text/plain;charset=UTF-8', 'Origin': 'https://pul.tdx.com.cn', 'Referer': 'https://pul.tdx.com.cn/site/app/gzhbd/tdx-topsearch/page-main.html', } payload = '{"listType":"0","cycle":"0"}' # 使用带有字符串 data 的请求 resp = RequestUtils.fetch_with_retry(url, headers, method="post", data=payload.encode('utf-8')) if not resp or not isinstance(resp, list) or len(resp) < 4: return pd.DataFrame(columns=["代码", "tdx_rank"]) data_rows = resp[3:] rows = [] for i, row in enumerate(data_rows): if row and len(row) > 1: code = row[1] rows.append({ "代码": code, "tdx_rank": i + 1 }) return pd.DataFrame(rows) def fetch_stock_info(self, code): """从新浪财经API获取股票信息""" try: # 确定市场代码 market = "sh" if code.startswith('6') else "sz" full_code = f"{market}{code}" # 使用新浪接口获取股票信息 url = f"https://hq.sinajs.cn/list={full_code}" headers = { **self.base_headers, "Referer": "https://finance.sina.com.cn/" } html_content = RequestUtils.fetch_html_with_retry(url, headers) if not html_content or "FAILED" in html_content or len(html_content) < 30: return None, "--", np.nan # 解析返回的数据 data = html_content # 新浪接口返回格式:var hq_str_sh600000="浦发银行,10.210,10.220,10.390,10.410,10.190,10.380,10.390,..." content = data.split('=')[1].strip('";\n').split(',') if len(content) < 4: return None, "--", np.nan name = content[0] current_price = float(content[3]) yesterday_close = float(content[2]) # 计算涨跌幅 change_percent = (current_price - yesterday_close) / yesterday_close * 100 change_formatted = f"{change_percent:.2f}%" return name, change_formatted, change_percent except Exception as e: return None, "--", np.nan def fetch_ths_concepts(self, code): """从同花顺F10页获取核心概念""" try: url = f'http://basic.10jqka.com.cn/{code}/' headers = self.base_headers html_content = RequestUtils.fetch_html_with_retry(url, headers, encoding='GBK') if not html_content: return [] concepts_found = [] soup = BeautifulSoup(html_content, 'html.parser') # 查找所有带title属性的a标签 all_a_tags = soup.find_all('a', {'title': True}) # 使用字典确保顺序并提取概念 ranked_concepts = {} for tag in all_a_tags: title = tag.get('title', '') text = tag.text.strip() if "排名第一" in title: ranked_concepts[1] = text elif "排名第二" in title: ranked_concepts[2] = text elif "排名第三" in title: ranked_concepts[3] = text # 按排名顺序1, 2, 3排序 concepts_found = [ranked_concepts[k] for k in sorted(ranked_concepts.keys())] return concepts_found except Exception as e: return [] # 数据处理类 class StockDataProcessor: """股票数据处理类,负责合并和处理从不同来源获取的数据""" def __init__(self): self.fetcher = StockDataFetcher() def fetch_all_sources(self): """获取所有数据源的数据""" print("正在获取各数据源热门股票数据...") df_cls = self.fetcher.fetch_cls() df_em = self.fetcher.fetch_em() df_ths = self.fetcher.fetch_ths() df_tdx = self.fetcher.fetch_tdx() # 新增通达信数据源 # 在合并前,为每个数据源添加标记列 df_cls['来源_C'] = 'C' df_em['来源_D'] = 'D' df_ths['来源_T'] = 'T' df_tdx['来源_X'] = 'X' # 新增通达信来源标记 return df_cls, df_em, df_ths, df_tdx def merge_data(self, df_cls, df_em, df_ths, df_tdx): """合并各数据源数据""" df_all = pd.merge(df_cls, df_em, on="代码", how="outer") df_all = pd.merge(df_all, df_ths, on="代码", how="outer") df_all = pd.merge(df_all, df_tdx, on="代码", how="outer") # 合并通达信数据 # 初始化名称和涨幅列,以便后续填充 df_all["名称"] = df_all["名称_ths"] df_all["涨幅"] = df_all["涨幅_ths"] df_all["概念"] = df_all["概念_ths"] return df_all def process_missing_info(self, df_all): """处理缺失的股票信息""" missing_info_indices = df_all[df_all["名称"].isna()].index missing_count = len(missing_info_indices) if missing_count > 0: print(f"正在补充 {missing_count} 只股票的信息...") if missing_count == 0: return df_all # 使用并行处理获取缺失信息 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = {} # 提交所有任务 for idx in missing_info_indices: code = df_all.loc[idx, "代码"] if not isinstance(code, str) or len(code) != 6: continue # 提交获取股票基本信息的任务 info_future = executor.submit(self.fetcher.fetch_stock_info, code) futures[info_future] = (idx, code, "info") # 提交获取概念信息的任务 concept_future = executor.submit(self.fetcher.fetch_ths_concepts, code) futures[concept_future] = (idx, code, "concept") # 处理完成的任务 for future in concurrent.futures.as_completed(futures): idx, code, task_type = futures[future] try: result = future.result() if task_type == "info": name, zdf_formatted, zdf_value = result if name: df_all.loc[idx, "名称"] = name df_all.loc[idx, "涨幅"] = zdf_formatted df_all.loc[idx, "涨幅值"] = zdf_value elif task_type == "concept": concepts = result if concepts: df_all.loc[idx, "概念"] = " | ".join(concepts) df_all.at[idx, "概念列表"] = concepts else: df_all.loc[idx, "概念"] = "--" df_all.at[idx, "概念列表"] = [] except Exception: pass return df_all def create_result_dataframe(self, df_all): """创建最终结果DataFrame""" # 创建来源列,合并T、D、C、X标记 df_all["来源"] = "" df_all.loc[~df_all["来源_T"].isna(), "来源"] += "T" df_all.loc[~df_all["来源_D"].isna(), "来源"] += "D" df_all.loc[~df_all["来源_C"].isna(), "来源"] += "C" df_all.loc[~df_all["来源_X"].isna(), "来源"] += "X" # 新增来源X # 计算平均排名 df_all["平均排名"] = df_all[["cls_rank", "em_rank", "ths_rank", "tdx_rank"]].mean(axis=1, skipna=True) # 确保概念列有默认值 df_all["概念"] = df_all["概念"].fillna("--") # 修复概念列表处理 df_all["概念列表"] = df_all["概念列表"].apply(lambda x: [] if isinstance(x, float) and pd.isna(x) else x) # 选择最终列并排序 final_cols = ["代码", "名称", "涨幅", "概念", "来源", "平均排名", "涨幅值", "概念列表"] df_result = df_all[final_cols].sort_values("平均排名").reset_index(drop=True) df_result.index += 1 return df_result def analyze_concepts(self, df_result): """分析概念标签出现频次和平均涨幅""" # 统计概念标签出现频次和平均涨幅 concept_stocks = {} for idx, row in df_result.iterrows(): concepts = row["概念列表"] if isinstance(concepts, list): for concept in concepts: if concept not in concept_stocks: concept_stocks[concept] = [] concept_stocks[concept].append((row["代码"], row["涨幅值"])) # 计算每个概念的出现频次和平均涨幅 concept_stats = [] for concept, stocks in concept_stocks.items(): count = len(stocks) # 计算平均涨幅,跳过NaN值 changes = [change for _, change in stocks if not pd.isna(change)] avg_change = np.mean(changes) if changes else np.nan concept_stats.append({ "概念": concept, "出现次数": count, "平均涨幅": avg_change }) # 创建概念统计DataFrame并排序 df_concepts = pd.DataFrame(concept_stats) if not df_concepts.empty: # 确保有概念数据 df_concepts = df_concepts.sort_values(["出现次数", "平均涨幅"], ascending=[False, False]).reset_index(drop=True) df_concepts.index += 1 # 格式化平均涨幅 df_concepts["平均涨幅"] = df_concepts["平均涨幅"].apply(lambda x: f"{x:.2f}%" if not pd.isna(x) else "--") return df_concepts # 输出生成类 class OutputGenerator: """输出生成类,负责生成HTML和CSV输出""" @staticmethod def generate_html(df_result, df_concepts, now): """生成HTML输出""" # 生成股票名称链接 df_result["名称链接"] = df_result.apply( lambda row: f'{row["名称"]}', axis=1 ) # 生成HTML表格 html_table = df_result[["代码", "名称链接", "涨幅", "概念", "来源", "平均排名"]].rename( columns={"名称链接": "名称"} ).to_html(index=True, justify="center", border=0, escape=False, table_id="stockTable") # 修改表格行,添加数据源属性 soup = BeautifulSoup(html_table, 'html.parser') tbody = soup.find('tbody') if tbody: for i, row in enumerate(tbody.find_all('tr')): # 获取当前行的索引 index_cell = row.find('th') if index_cell and index_cell.text.isdigit(): row_index = int(index_cell.text) - 1 if 0 <= row_index < len(df_result): # 添加data-source属性 row['data-source'] = df_result.iloc[row_index]['来源'] # 将修改后的HTML转换回字符串 html_table = str(soup) # 概念表格 if not df_concepts.empty: html_concepts_table = df_concepts.to_html(index=True, justify="center", border=0) else: html_concepts_table = "

没有概念数据

" # 代码列表每行20个 code_list = df_result["代码"].tolist() grouped_lines = [code_list[i:i + 20] for i in range(0, len(code_list), 20)] html_code_block = "
".join([",".join(group) for group in grouped_lines]) # HTML模板 full_html = f""" 热门个股整合榜 - {now}

🔥 热门个股整合榜 ({now})

⚠️🈹🆘🆘 现在就卖!!我现在卖出,没有任何感情。

⚠️只要稍微走弱,就是卖!放弃了任何格局行为。

⚠️可以说最近卖飞了大量大牛股,很多几十个点的股票我只赚了一点点就跑了,但胜率直线上升,曲线一路向上。

⚠️控制回撒,复利的核心是控制回撒,而不是抓住多少机会,不需要抓住每一次大涨,仅仅靠着复利,就可以实现很好的收益。

热门个股排名

{html_table}

概念热度排行

{html_concepts_table}

📋 可复制的代码列表(每行20个):

{html_code_block}
""" with open("热门个股整合榜.html", "w", encoding="utf-8") as f: f.write(full_html) print("已成功导出为:热门个股整合榜.html,请用浏览器打开查看。") @staticmethod def print_terminal_output(df_result, df_concepts): """打印终端输出""" print("\n=== 🔥 热门个股整合榜(按平均排名排序) ===\n") print(df_result[["代码", "名称", "涨幅", "概念", "来源", "平均排名"]].to_string(index=True)) if not df_concepts.empty: print("\n=== 📊 概念热度排行榜 ===\n") print(df_concepts.to_string(index=True)) # 代码列表每行20个 code_list = df_result["代码"].tolist() grouped_lines = [code_list[i:i + 20] for i in range(0, len(code_list), 20)] code_text_multiline = "\n".join([",".join(group) for group in grouped_lines]) print("\n📋 可复制的代码列表(每行20个):\n") print(code_text_multiline) # 主程序类 class HotStockIntegrator: """热门股票整合主程序类""" def __init__(self): self.processor = StockDataProcessor() self.output_generator = OutputGenerator() def run(self): """运行主程序""" now = datetime.now().strftime("%Y-%m-%d %H-%M") print(f"开始运行热门股票整合程序 - {now}") try: # 1. 获取所有数据源的数据 df_cls, df_em, df_ths, df_tdx = self.processor.fetch_all_sources() # 2. 合并数据 df_all = self.processor.merge_data(df_cls, df_em, df_ths, df_tdx) # 3. 处理缺失信息 df_all = self.processor.process_missing_info(df_all) # 4. 创建结果DataFrame df_result = self.processor.create_result_dataframe(df_all) # 5. 分析概念 df_concepts = self.processor.analyze_concepts(df_result) # 6. 生成输出 self.output_generator.print_terminal_output(df_result, df_concepts) self.output_generator.generate_html(df_result, df_concepts, now) print("程序运行完成") except Exception as e: print(f"程序运行出错: {e}") # 主函数 def main(): integrator = HotStockIntegrator() integrator.run() if __name__ == "__main__": main()