自动化 25 分钟阅读

实时搜索数据采集自动化:SERP API实战 | SearchCans

用SERP API构建实时搜索数据采集自动化。分布式调度、数据存储、监控告警、重试机制。Python/Node.js代码。

9,754 字

在数字营销、SEO优化和市场研究领域,持续监控搜索在数据驱动的商业环境中,实时获取搜索引擎数据已成为企业竞争的关键。无论是竞品分析、市场调研还是SEO优化,实时搜索数据收集自动化都能带来巨大价值。SERP API为这一需求提供了完美解决方案。

相关教程Python教程 | Node.js教程 | API文档要自动化采集?

手动采集的痛点

  1. 时间成本高:每天手动搜索几十个关键词需要数小时
  2. 数据不连续:无法保证每天同一时间采集
  3. 容易遗漏:人工操作难免出错
  4. 无法规模化:监控关键词增加时工作量成倍增长

自动化的价值

通过自动化系统,你可以:

  • ✅ 24/7不间断监控搜索数据
  • ✅ 精确到分钟级的数据采集
  • ✅ 轻松扩展到数千个关键词
  • ✅ 自动存储和分析历史数据
  • ✅ 异常情况实时告警

系统架构设计

核心组件

定时调度器 → SERP API → 数据处理 → 数据存储 → 监控告警
     ↓           ↓          ↓          ↓          ↓
  Cron/PM2   SearchCans   清洗/解析   MySQL/CSV   邮件/钉钉

技术选型

调度层

  • Node-cron(Node.js)
  • APScheduler(Python)
  • PM2(进程管理)

数据采集

数据存储

  • MySQL(结构化数据)
  • MongoDB(灵活schema)
  • CSV/Excel(简单场景)

监控告警

  • 邮件通知
  • 钉钉/企业微信机器人
  • Prometheus + Grafana

Python实现方案

1. 基础采集脚本

import requests
import schedule
import time
from datetime import datetime
import pandas as pd

class SERPCollector:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://searchcans.youxikuang.cn/api/search"
    
    def search(self, keyword):
        """执行单次搜索"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = {
            "s": keyword,
            "t": "bing",
            "p": 1
        }
        
        try:
            response = requests.post(self.base_url, headers=headers, json=payload)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"搜索失败 {keyword}: {e}")
            return None
    
    def collect_batch(self, keywords):
        """批量采集"""
        results = []
        for keyword in keywords:
            print(f"[{datetime.now()}] 采集: {keyword}")
            data = self.search(keyword)
            if data:
                results.append({
                    'keyword': keyword,
                    'timestamp': datetime.now(),
                    'results': data.get('organic', [])
                })
            time.sleep(1)  # 避免请求过快
        return results
    
    def save_to_csv(self, results, filename):
        """保存到CSV"""
        flat_data = []
        for item in results:
            for i, result in enumerate(item['results'][:10]):
                flat_data.append({
                    'timestamp': item['timestamp'],
                    'keyword': item['keyword'],
                    'position': i + 1,
                    'title': result.get('title'),
                    'url': result.get('link'),
                    'snippet': result.get('snippet')
                })
        
        df = pd.DataFrame(flat_data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"数据已保存: {filename}")

# 使用示例
collector = SERPCollector("your_api_key")

keywords = [
    "Python教程",
    "机器学习",
    "数据分析"
]

# 立即执行一次
results = collector.collect_batch(keywords)
collector.save_to_csv(results, f"serp_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")

2. 定时任务配置

def daily_collection():
    """每日采集任务"""
    print(f"\n{'='*50}")
    print(f"开始每日采集: {datetime.now()}")
    print(f"{'='*50}\n")
    
    results = collector.collect_batch(keywords)
    filename = f"daily_serp_{datetime.now().strftime('%Y%m%d')}.csv"
    collector.save_to_csv(results, filename)
    
    print(f"\n采集完成,共 {len(results)} 个关键词")

# 设置定时任务
schedule.every().day.at("09:00").do(daily_collection)  # 每天9点
schedule.every().hour.do(daily_collection)  # 每小时
schedule.every(30).minutes.do(daily_collection)  # 每30分钟

print("定时任务已启动...")
while True:
    schedule.run_pending()
    time.sleep(60)

3. 数据库存储

import mysql.connector
from mysql.connector import Error

class DatabaseManager:
    def __init__(self, host, database, user, password):
        self.connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        self.cursor = self.connection.cursor()
        self.create_tables()
    
    def create_tables(self):
        """创建数据表"""
        create_table_query = """
        CREATE TABLE IF NOT EXISTS serp_results (
            id INT AUTO_INCREMENT PRIMARY KEY,
            keyword VARCHAR(255),
            position INT,
            title TEXT,
            url TEXT,
            snippet TEXT,
            collected_at DATETIME,
            INDEX idx_keyword (keyword),
            INDEX idx_collected_at (collected_at)
        )
        """
        self.cursor.execute(create_table_query)
        self.connection.commit()
    
    def save_results(self, results):
        """保存搜索结果"""
        insert_query = """
        INSERT INTO serp_results 
        (keyword, position, title, url, snippet, collected_at)
        VALUES (%s, %s, %s, %s, %s, %s)
        """
        
        for item in results:
            for i, result in enumerate(item['results'][:10]):
                values = (
                    item['keyword'],
                    i + 1,
                    result.get('title'),
                    result.get('link'),
                    result.get('snippet'),
                    item['timestamp']
                )
                self.cursor.execute(insert_query, values)
        
        self.connection.commit()
        print(f"已保存 {len(results)} 条记录到数据库")

# 使用数据库
db = DatabaseManager('localhost', 'serp_db', 'user', 'password')
db.save_results(results)

Node.js实现方案

完整自动化系统

const axios = require('axios');
const cron = require('node-cron');
const nodemailer = require('nodemailer');
const fs = require('fs');

class AutomatedSERPCollector {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.baseUrl = 'https://searchcans.youxikuang.cn/api/search';
  }
  
  async search(keyword) {
    try {
      const response = await axios.post(this.baseUrl, {
        s: keyword,
        t: 'bing',
        p: 1
      }, {
        headers: {
          'Authorization': `Bearer ${this.apiKey}`,
          'Content-Type': 'application/json'
        }
      });
      return response.data;
    } catch (error) {
      console.error(`搜索失败 ${keyword}:`, error.message);
      return null;
    }
  }
  
  async collectBatch(keywords) {
    const results = [];
    for (const keyword of keywords) {
      console.log(`[${new Date().toISOString()}] 采集: ${keyword}`);
      const data = await this.search(keyword);
      if (data) {
        results.push({
          keyword,
          timestamp: new Date(),
          results: data.organic || []
        });
      }
      await this.sleep(1000);
    }
    return results;
  }
  
  saveToJSON(results, filename) {
    fs.writeFileSync(filename, JSON.stringify(results, null, 2));
    console.log(`数据已保存: ${filename}`);
  }
  
  async sendAlert(subject, message) {
    // 邮件告警配置
    const transporter = nodemailer.createTransporter({
      service: 'gmail',
      auth: {
        user: process.env.EMAIL_USER,
        pass: process.env.EMAIL_PASS
      }
    });
    
    await transporter.sendMail({
      from: process.env.EMAIL_USER,
      to: process.env.ALERT_EMAIL,
      subject,
      text: message
    });
  }
  
  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 初始化
const collector = new AutomatedSERPCollector(process.env.SEARCHCANS_API_KEY);

const keywords = ['Node.js教程', 'JavaScript框架', 'React开发'];

// 定时任务
cron.schedule('0 9 * * *', async () => {
  console.log('开始每日采集...');
  const results = await collector.collectBatch(keywords);
  const filename = `serp_${new Date().toISOString().split('T')[0]}.json`;
  collector.saveToJSON(results, filename);
  await collector.sendAlert('采集完成', `已采集 ${results.length} 个关键词`);
});

console.log('自动化系统已启动');

监控和告警

异常检测

class MonitoringSystem:
    def __init__(self, threshold=0.3):
        self.threshold = threshold
        self.history = {}
    
    def check_ranking_change(self, keyword, current_position, historical_avg):
        """检测排名变化"""
        if historical_avg is None:
            return False
        
        change_rate = abs(current_position - historical_avg) / historical_avg
        return change_rate > self.threshold
    
    def alert(self, keyword, message):
        """发送告警"""
        print(f"⚠️ 告警: {keyword} - {message}")
        # 这里可以集成钉钉、邮件等通知方式

monitor = MonitoringSystem()

# 检测排名变化
if monitor.check_ranking_change("Python教程", 15, 5):
    monitor.alert("Python教程", "排名下降超过30%")

成本优化

使用SearchCans可以大幅降低成本:

场景:监控100个关键词,每小时采集一次

  • 每天采集:100 × 24 = 2,400次
  • 每月采集:2,400 × 30 = 72,000次
  • 月度成本:72,000 ÷ 1,000 × ¥4.03 = ¥290

相比其他服务商节省90%以上成本!

生产环境部署

使用PM2管理

# 安装PM2
npm install -g pm2

# 启动应用
pm2 start collector.js --name serp-collector

# 设置开机自启
pm2 startup
pm2 save

# 查看日志
pm2 logs serp-collector

Docker部署

FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "collector.py"]

最佳实践

  1. 错误重试:网络异常时自动重试3次
  2. 速率控制:避免请求过快被限制
  3. 数据备份:定期备份历史数据
  4. 日志记录:详细记录每次采集情况
  5. 监控告警:异常情况及时通知

总结

通过本文的方案,你可以快速搭建一个完整的实时搜索数据采集自动化系统。使用SearchCans SERP API,以极低成本(¥4.03/1000次)实现:

✅ 24/7自动化监控
✅ 数千关键词并发采集
✅ 历史数据完整存储
✅ 异常情况实时告警

立即注册SearchCans,获取100积分免费体验,开始构建你的自动化采集系统!


相关阅读:

标签:

自动化 数据采集 SERP API 定时任务

准备好用 SearchCans 构建你的 AI 应用了吗?

立即体验我们的 SERP API 和 Reader API。每千次调用仅需 ¥0.56 起,无需信用卡即可免费试用。