from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup import time # import smtplib # from email.mime.text import MIMEText # from email.mime.multipart import MIMEMultipart import hashlib import schedule import requests import re from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.desired_capabilities import DesiredCapabilities # from_email = "240884432@qq.com" # from_password = "osjyjmbqrzxtbjbf" # to_email = "240884432@qq.com" # 邮件发送函数 # def send_email(content): # msg = MIMEMultipart('alternative') # msg['Subject'] = '金十数据更新通知' # msg['From'] = from_email # msg['To'] = to_email # msg.attach(MIMEText(content, 'html')) # try: # server = smtplib.SMTP_SSL('smtp.qq.com', 465) # server.login(from_email, from_password) # server.sendmail(from_email, to_email, msg.as_string()) # server.quit() # print("邮件发送成功") # except Exception as e: # print(f"邮件发送失败: {e}") # 飞书消息发送函数 def send_feishu_message(text): headers = { "Content-Type": "application/json" } table_html = f'{text}' data = { "msg_type": "text", "content": { "text": table_html } } response = requests.post("https://open.feishu.cn/open-apis/bot/v2/hook/094b85fb-4fc3-46f3-9673-ddb9702f7885", headers=headers, json=data) if response.status_code != 200: print(f"飞书消息发送失败,状态码: {response.status_code}, 响应内容: {response.text}") # 全局存储上次匹配内容 last_matched_hash = None # 主抓取函数 def fetch_news(): global last_matched_hash # 配置选项,方法一:禁用SSL证书验证 options = webdriver.ChromeOptions() options.add_argument('--headless') options.add_argument('--ignore-certificate-errors') options.add_argument('--allow-insecure-localhost') options.add_argument('--ssl-protocol=TLSv1.2') options.add_argument('--no-proxy-server') options.add_argument('--enable-logging') options.add_argument('--v=1') # 日志级别 driver = webdriver.Chrome(options=options) # 配置选项,方法二:配置WebDriver以信任所有证书(不推荐) # capabilities = DesiredCapabilities().CHROME # capabilities['acceptInsecureCerts'] = True # options = webdriver.ChromeOptions() # driver = webdriver.Chrome(options=options, desired_capabilities=capabilities) # service = Service("D:\chromedriver.exe") # # 忽略证书错误 # options.add_argument('--ignore-certificate-errors') # # 忽略 Bluetooth: bluetooth_adapter_winrt.cc:1075 Getting Default Adapter failed. 错误 # options.add_experimental_option('excludeSwitches', ['enable-automation']) # # 忽略 DevTools listening on ws://127.0.0.1... 提示 # options.add_experimental_option('excludeSwitches', ['enable-logging']) # 获取驱动 # driver = webdriver.Chrome(service=service, options=options) try: driver.get("https://www.jin10.com/") # driver.refresh() target_xpath = '/html/body/div[1]/div[2]/div[2]/div/main/div[2]/div[2]/div[2]/div[2]/div[2]/div[4]/span[2]/div[1]/div[3]/div/div[2]/div[3]' target_element = WebDriverWait(driver, 40).until( EC.presence_of_element_located((By.XPATH, target_xpath)) ) script = ''' var element = document.evaluate(arguments[0], document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; element.className = "hot-filter_item is-active"; ''' driver.execute_script(script, target_xpath) update_xpath = '/html/body/div[1]/div[2]/div[2]/div/main/div[2]/div[2]/div[3]' update_element = WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.XPATH, update_xpath)) ) driver.execute_script('arguments[0].scrollIntoView(true);', update_element) # get_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) # print('网页抓取更新时间:', get_time) time.sleep(5) page_source = driver.page_source soup = BeautifulSoup(page_source, 'lxml') items = soup.find_all('div', class_='jin-flash-item-container is-normal') matched_count = 0 modified_text = '' # text_data = {} text_list = [] for item in items[:40]: text_content = item.get_text(strip=True) # print(text_content) if '默认火热沸爆爆' not in text_content: continue # print('原始内容', text_content) # matched_count += 1 # 提取分享收藏详情复制之后的时间 index = text_content.find('分享收藏详情复制') if index != -1: time_match = re.search(r'\d{2}:\d{2}:\d{2}', text_content[index:]) if time_match: specific_time = time_match.group() # print('提取的时间:', specific_time) parts = text_content.split('默认火热沸爆', 1) if len(parts) > 1 and parts[1].strip(): modified_text = "消息时间:" +specific_time + "\n" + "消息等级:" + parts[1].strip()[0] + "\n" + parts[1].strip()[1:] # text_data[matched_count] = modified_text text_list.append(modified_text) # print("text_list:",text_list) # print("第一条text_list:",text_list[0]) # print("最后一条text_list:",text_list[-1]) if text_list: print("最新一条消息:",text_list[0]) current_hash = hashlib.md5(text_list[0].encode()).hexdigest() if last_matched_hash and current_hash != last_matched_hash: # send_email(modified_text) send_feishu_message(text_list[0]) # print("last_matched_hash", last_matched_hash) last_matched_hash = current_hash # print("current_hash", current_hash) # else: # print("未找到匹配的信息") # last_matched_hash = None # 重置为 None,以便下一次匹配 nul print(f"\n共找到 {matched_count} 条匹配'默认火热沸爆爆'的信息") except Exception as e: print(f"执行出错: {e}") finally: driver.quit() # sp_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) # print('网页抓取间隔时间:', sp_time) # 定时任务配置 # schedule.every(1).minutes.do(fetch_news) schedule.every(1).seconds.do(fetch_news) print("开始定时监控...") while True: schedule.run_pending() # currrentime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) # print('当前暂停时间:', currrentime) time.sleep(50)