from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import schedule import time import hashlib import csv # 导入csv模块 # 邮箱配置 from_email = "240884432@qq.com" from_password = "osjyjmbqrzxtbjbf" to_email = "240884432@qq.com" # 目标URL # 修改为新的目标URL url = 'https://www.jjin10.com/' # 设置请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36' } # 消息跟踪记录 last_records = {} # 格式:{id: (hash, timestamp)} # 发送邮件的函数 def send_email(subject, content, to_email): msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = from_email msg['To'] = to_email msg.attach(MIMEText(content, 'html')) try: server = smtplib.SMTP_SSL('smtp.qq.com', 465) # 使用SSL,端口通常是465 server.login(from_email, from_password) server.sendmail(from_email, to_email, msg.as_string()) server.quit() print("邮件发送成功") except Exception as e: print(f"邮件发送失败: {e}") # 爬取并发送邮件的函数 def fetch_and_notify(): global last_records # 初始化浏览器 options = webdriver.ChromeOptions() options.add_argument('--headless') driver = webdriver.Chrome(options=options) try: driver.get(url) # 点击爆款筛选按钮 button = WebDriverWait(driver, 10).until( # 确保此选择器在新网页中仍适用 EC.element_to_be_clickable((By.CSS_SELECTOR, '.flash-hot_text.is-bao')) ) driver.execute_script("arguments[0].click();", button) time.sleep(3) # 等待内容加载 soup = BeautifulSoup(driver.page_source, 'html.parser') items = soup.find_all('div', class_='jin-flash-item-container is-normal') filtered_items = [] new_item_ids = [] for item in items: item_id = item.get('id') if item.find('i', class_='flash-hot_text is-bao'): filtered_items.append(item) new_item_ids.append(item_id) finally: driver.quit() if not filtered_items: return # 生成内容哈希并筛选新内容 new_items = [] for item in filtered_items: item_id = item.get('id') content = item.find('div', class_='flash-text').get_text(strip=True) content_hash = hashlib.md5(content.encode()).hexdigest() # 双重校验:ID不存在 或 ID存在但内容哈希不同 if item_id not in last_records or last_records[item_id][0] != content_hash: new_items.append({ 'id': item_id, 'time': item.find('div', class_='item-time').get_text(strip=True), 'content': content, 'hash': content_hash }) if new_items: email_content = "" for i, item in enumerate(new_items, 1): email_content += f"

消息 {i}:
时间: {item['time']}
内容: {item['content']}

" email_content += '
' # 更新记录 last_records[item['id']] = (item['hash'], time.time()) send_email("金十数据市场快讯", email_content, to_email) # 清理过期记录(保留24小时) expire_time = time.time() - 86400 last_records = {k:v for k,v in last_records.items() if v[1] > expire_time} # 将新消息写入CSV文件 with open('news.csv', 'w', newline='', encoding='utf-8-sig') as csvfile: fieldnames = ['id', 'time', 'content', 'hash'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in new_items: writer.writerow(item) else: # 当没有新消息时发送提示邮件 send_email("金十数据市场快讯", "

没有新的市场快讯信息。

", to_email) # 每5分钟运行一次 schedule.every(5).minutes.do(fetch_and_notify) print("开始监控市场快讯信息...") while True: schedule.run_pending() time.sleep(1)