-
Notifications
You must be signed in to change notification settings - Fork 0
/
spider.py
374 lines (296 loc) · 14 KB
/
spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
from selenium import webdriver
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.firefox import GeckoDriverManager
from bs4 import BeautifulSoup
import pandas as pd
import re
import os
import random
import time
from io import StringIO
from openpyxl import load_workbook
import requests
from requests.exceptions import ConnectionError
# 调整所有 sheet 的列宽
def auto_adjust_column_width(excel_file_path):
# 打开 Excel 文件
workbook = load_workbook(excel_file_path)
# 遍历每个 sheet
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
# 遍历每一列,调整列宽
for col in sheet.columns:
max_length = 0
column = col[0].column_letter # 获取列字母
for cell in col:
try:
# 获取每个单元格的值的长度
if cell.value:
max_length = max(max_length, len(str(cell.value)))
except:
pass
# 设置列宽为最大长度 + 2(留一些额外空间)
adjusted_width = max_length + 20
sheet.column_dimensions[column].width = adjusted_width
# 保存调整后的文件
workbook.save(excel_file_path)
# 构建公司公告页面URL
def build_announcement_urls(stock_id, reportTypes):
urls = [];
for currentType in reportTypes:
url = f'https://vip.stock.finance.sina.com.cn/corp/go.php/vCB_Bulletin/stockid/{stock_id}/page_type/{currentType}.phtml'
urls.append(url)
return urls
# 配置 Firefox 浏览器选项
options = Options()
options.set_preference('permissions.default.image', 2) # 禁用图片
options.add_argument('--headless') # 在 Colab 环境中无界面运行
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
# 指定 geckodriver 的路径,使用 Service 来设置
service = Service('/usr/local/bin/geckodriver')
# service = Service('D:\\work\\GitHub\\files\\geckodriver.exe')
# 动态获取,不需要像上面一样判断系统是mac还是window
# service = Service(GeckoDriverManager().install(), port=0)
# 创建 WebDriver 实例
driver = webdriver.Firefox(service=service, options=options)
# 使用 webdriver_manager 动态下载和配置 GeckoDriver
# driver = webdriver.Firefox(service=Service(service), options=options)
def get_reports_urls(soup, years):
# 存放符合条件的链接
links = []
# 查找 class="datelist" 的 div
datelist_div = soup.find('div', class_='datelist')
if datelist_div:
# 查找 div 内所有 a 标签
a_tags = datelist_div.find_all('a', href=True)
# 遍历所有 a 标签
for a_tag in a_tags:
# 获取链接文本
link_text = a_tag.text.strip()
# 检查文本是否包含“半年度报告”或“年度报告”,且年份在 years 数组中
for year in years:
if ('半年度报告' in link_text or '年度报告' in link_text) and str(year) in link_text:
# 满足条件的链接加入 links 列表
links.append(a_tag)
return links
# 获取公司最新的年度报告和半年度报告链接
def get_latest_reports_urls(stock_id, years, reportTypes):
urls = build_announcement_urls(stock_id, reportTypes)
report_urls = {} # 初始化字典,避免在异常情况下未定义
for url in urls:
driver.get(url)
try:
# 等待页面加载完成
WebDriverWait(driver, 5).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'div.datelist'))
)
# 获取页面源码
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# 查找符合条件的报告链接
links = get_reports_urls(soup, years)
# 检查链接是否为 BeautifulSoup 对象
for link in links:
if isinstance(link, str):
# 如果 link 是字符串,则解析为 <a> 标签
link = BeautifulSoup(link, 'html.parser').a
text = link.text.strip() # 获取 <a> 标签的文本
href = 'https://vip.stock.finance.sina.com.cn/' + link['href'] # 获取 href 属性
report_urls[text] = href # 将文本作为 key,href 作为 value
except Exception as e:
print(f"获取报告链接时出现错误: {e}")
# 返回一个空字典,而不是 None
report_urls = {}
return report_urls
# 目标目录
directory = './dist'
# 初始化存放 reports 文件名的列表
reports_files = []
# 正则表达式匹配以 'reports' 开头,后面可选数字,并以 '.xlsx' 结尾的文件
pattern = r'^reports(\d*)\.xlsx$'
# 遍历 dist 目录下的文件
for filename in os.listdir(directory):
match = re.match(pattern, filename)
if match:
# 提取文件名中的数字部分
num_str = match.group(1)
# 如果没有数字部分,默认为 0
number = int(num_str) if num_str else 0
reports_files.append(number)
# 找到当前最大数字
if reports_files:
max_number = max(reports_files)
else:
max_number = 0
# 下一个文件名
new_filename = f'reports{max_number + 1}.xlsx'
# 完整的导出路径
output_file = os.path.join(directory, new_filename)
# 继续执行保存 Excel 的操作,例如:
# df.to_excel(output_file)
# 主函数,爬取多个公司的报告并提取研发费用信息
# 示例:如何调用函数将不同的报告保存到同一个 Excel 文件
def crawl_reports_for_companies(companies, years, target_tables, reportTypes = ['zqbg', 'ndbg']):
results = []
# 创建 ExcelWriter 对象,用于写入多个 sheet
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
for company in companies:
stock_id = get_stock_code_by_company_name(company)
if not stock_id:
print(f"无法找到公司 {company} 的股票代码")
continue
report_urls = get_latest_reports_urls(stock_id, years, reportTypes)
print(f"stock_id {stock_id} report_urls {report_urls}")
if not report_urls:
print(f"未找到 {company} 的报告链接")
continue
for report_type, report_url in report_urls.items():
if report_url:
print(f"正在爬取 {company} 的 {report_type}...")
# 调用带重试机制的爬取函数,将数据写入 Excel 不同 sheet
report = get_report_content_selenium(report_url, writer, target_tables)
if report:
print(f"{company} 的 {report_type} 报告爬取成功")
results.append(report)
else:
print(f"无法爬取 {company} 的 {report_type} 报告")
# 在每次请求之间添加随机延迟
time.sleep(random.uniform(3, 7)) # 休息 3 到 7 秒以避免被检测为爬虫
else:
print(f"未找到 {company} 的 {report_type} 链接")
driver.quit() # 确保浏览器关闭
# 调整列宽
auto_adjust_column_width(output_file)
return results
# 使用Selenium爬取报告页面内容,带有重试机制
def get_report_content_selenium(report_url, writer, target_tables = ['合并利润表'], retries=3):
"""
爬取报告页面内容,并将表格数据写入到 Excel 的不同 sheet。
:param report_url: 报告的 URL
:param writer: pd.ExcelWriter 对象,用于写入 Excel
:param retries: 重试次数
"""
if not report_url:
return None
for attempt in range(retries):
try:
# 打开URL
driver.get(report_url)
report_content = None
# 等待页面加载
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.table-wrap'))
)
# 获取页面源码
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
for target_table_name in target_tables:
# 寻找目标表格
target_p_list = soup.find_all('p', string=lambda s: s and target_table_name in s and len(s) < 50)
if not target_p_list:
print(f"未找到匹配的表格名称:{target_table_name}")
continue # 跳过此表格名,继续下一个
target_p = target_p_list[0] # 取第一个匹配项
combined_df = pd.DataFrame()
# 遍历表格
next_div = target_p.find_next_sibling('div', class_='table-wrap')
while next_div and 'table-wrap' in next_div.get('class', []):
tables = next_div.find_all('table')
for table in tables:
extracted_html = str(table)
html_io = StringIO(extracted_html)
df = pd.read_html(html_io)[0]
combined_df = pd.concat([combined_df, df], ignore_index=True)
next_div = next_div.find_next_sibling()
if not (next_div and next_div.name == 'div' and 'table-wrap' in next_div.get('class', [])):
break
# 获取报告标题
content_div = soup.find('div', id='content')
p_tags = content_div.find_all('p') if content_div else []
# 找到 th 标签
th_tag = soup.find('th', class_='head')
report_title = None
# 获取 th 标签中的文本内容,排除 font 标签的文本
if th_tag:
# 使用 decompose 方法移除 font 标签
for font_tag in th_tag.find_all('font'):
font_tag.decompose()
# 获取 th 标签剩余的文本内容
report_title = th_tag.get_text(strip=True).replace(":", "")+f"-{target_table_name}"
report_title = report_title.split("表", 1)[0] + "表"
print(f"report_title: {report_title}")
# 将数据写入Excel的不同 sheet
sheet_name = report_title if report_title else f"Sheet_{random.randint(1000, 9999)}"
combined_df.to_excel(writer, sheet_name=sheet_name, index=False)
# 成功则返回结果
return {'title': report_title, 'content': combined_df}
except ConnectionError as e:
print(f"连接被拒绝,正在重试 {attempt + 1}/{retries} ... 错误: {e}")
time.sleep(5) # 等待 5 秒后重试
except Exception as e:
print(f"爬取报告内容时出现错误: {e}")
break # 其他错误时停止重试
# finally:
# if driver:
# driver.quit() # 确保浏览器关闭
return None # 如果重试失败,返回None
# 解析报告中的研发费用和同比数据
def extract_r_d_expenses(report_content):
if not report_content:
return None
# 正则表达式匹配“研发费用”及相关信息
pattern = re.compile(r"研发费用.*?([\d,.]+).*?同比(增长|下降).*?([\d,.]+)%", re.S)
match = pattern.search(report_content)
if match:
r_d_expense = match.group(1) # 研发费用
comparison_type = match.group(2) # 增长或下降
comparison_value = match.group(3) # 同比百分比变化
return {
'r_d_expense': r_d_expense,
'comparison_type': comparison_type,
'comparison_value': comparison_value
}
else:
return None
# 获取公司股票代码(假设通过公司名称搜索页面)
def get_stock_code_by_company_name(company_name):
search_url = f"https://so.eastmoney.com/web/s?keyword={company_name}"
driver.get(search_url)
try:
# 等待页面加载完成
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'a.exstock_t_l'))
)
# 获取页面内容并解析
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# 根据页面内容提取股票代码
# 这里需要根据实际页面结构调整选择器
a_tags = soup.find_all('a', class_='exstock_t_l')
for a_tag in a_tags:
text = a_tag.text.strip()
# 使用正则表达式提取括号内的股票代码
match = re.search(r'\((\d+)\)', text)
if match:
stock_code = match.group(1) # 提取的股票代码
print(f"公司: {company_name}, 股票代码: {stock_code}")
return stock_code
print(f"公司: {company_name}, 未找到股票代码")
return None
except Exception as e:
print(f"获取公司 {company_name} 股票代码时出现错误: {e}")
return None
# 示例调用
# companies = ['艾为电子','圣邦股份','恒玄科技','杰理科技','纳芯微','中科蓝讯','杰华特','晶丰明源','思瑞浦','芯朋微','力芯微','博通集成','必易微','富满微','炬芯科技','微源股份']
companies = ['艾为电子','圣邦股份']
years = [2023, 2024]
target_tables=['合并资产负债表2023年6月30日', '合并利润表','合并现金流量表']
# 'zqbg'是中报, 'ndbg'是年报
reportTypes = ['zqbg', 'ndbg'];
# 爬取报告并提取研发费用信息
results = crawl_reports_for_companies(companies, years, target_tables, reportTypes)