Python 51 分钟阅读

Python SERP API完整教程:从入门到实战项目开发

讲解如何用Python调用SERP API的完整教程。含requests基础、异步请求、数据解析、错误处理和实战项目方案。从入门到进阶的代码和最佳实践,快速掌握API开发。

20,035 字

Python是数据采集和自动化的首选语言,而SERP API为Python开发者提供了获取搜索引擎数据的便捷方式。本教程将从基础到进阶,全面讲解如何使用Python开发SERP API应用。

相关教程什么是SERP API | Node.js教程 | AI Agent开发

为什么选择Python开发SERP应用?

Python的优势

Python在数据处理和API开发方面具有独特优势:

  1. 简洁易读:代码清晰,开发效率高
  2. 丰富的库:requests、pandas、beautifulsoup等强大工具
  3. 数据处理能力:天然适合数据分析和处理
  4. 社区支持:大量教程和开源项目
  5. 跨平台:Windows、Linux、Mac都能运行

SERP API的应用场景

使用Python + SERP API可以实现:

  • 🔍 SEO关键词排名监控
  • 📊 竞品分析和市场研究
  • 🤖 AI Agent实时搜索功能
  • 💰 电商价格监控
  • 📰 新闻聚合和内容采集
  • 🎯 广告投放效果分析

环境准备

安装Python

确保安装了Python 3.7或更高版本:

python --version

安装依赖库

pip install requests pandas python-dotenv

获取API密钥

  1. 访问SearchCans注册页面创建账户
  2. 控制台生成API密钥
  3. 新用户获得100积分免费体验

基础教程

第一个SERP API请求

创建basic_search.py

import requests
import json

def search_bing(query, api_key):
    """
    基础的Bing搜索API调用
    """
    url = "https://searchcans.youxikuang.cn/api/search"
    
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "s": query,      # 搜索关键词
        "t": "bing",     # 搜索引擎类型
        "p": 1,          # 页码
        "d": 3000        # 延迟(毫秒)
    }
    
    try:
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None

# 使用示例
if __name__ == "__main__":
    API_KEY = "your_api_key_here"
    query = "Python教程"
    
    results = search_bing(query, API_KEY)
    
    if results:
        print(f"搜索 '{query}' 的结果:\n")
        for item in results.get('organic', [])[:5]:
            print(f"标题: {item['title']}")
            print(f"链接: {item['link']}")
            print(f"摘要: {item['snippet']}\n")

运行程序:

python basic_search.py

使用环境变量管理密钥

创建.env文件:

SEARCHCANS_API_KEY=your_actual_api_key

修改代码使用环境变量:

import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

API_KEY = os.getenv('SEARCHCANS_API_KEY')

if not API_KEY:
    raise ValueError("请设置SEARCHCANS_API_KEY环境变量")

创建可复用的搜索类

# serp_client.py
import requests
from typing import Dict, List, Optional
import time

class SERPClient:
    """SearchCans SERP API客户端"""
    
    def __init__(self, api_key: str, base_url: str = None):
        self.api_key = api_key
        self.base_url = base_url or "https://searchcans.youxikuang.cn/api/search"
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        })
    
    def search(
        self, 
        query: str, 
        engine: str = "bing",
        page: int = 1,
        delay: int = 3000
    ) -> Optional[Dict]:
        """
        执行搜索
        
        Args:
            query: 搜索关键词
            engine: 搜索引擎 (bing/google)
            page: 页码
            delay: 延迟毫秒数
            
        Returns:
            搜索结果字典
        """
        payload = {
            "s": query,
            "t": engine,
            "p": page,
            "d": delay
        }
        
        try:
            response = self.session.post(self.base_url, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            print(f"HTTP错误: {e}")
            return None
        except requests.exceptions.RequestException as e:
            print(f"请求错误: {e}")
            return None
    
    def get_organic_results(self, query: str, **kwargs) -> List[Dict]:
        """获取自然搜索结果"""
        data = self.search(query, **kwargs)
        return data.get('organic', []) if data else []
    
    def get_top_results(self, query: str, count: int = 10, **kwargs) -> List[Dict]:
        """获取前N个搜索结果"""
        results = self.get_organic_results(query, **kwargs)
        return results[:count]
    
    def batch_search(self, queries: List[str], delay_between: float = 1.0, **kwargs) -> Dict[str, List]:
        """
        批量搜索多个关键词
        
        Args:
            queries: 关键词列表
            delay_between: 请求间隔(秒)
            
        Returns:
            {query: results} 字典
        """
        results = {}
        
        for query in queries:
            print(f"正在搜索: {query}")
            results[query] = self.get_organic_results(query, **kwargs)
            time.sleep(delay_between)
        
        return results

# 使用示例
if __name__ == "__main__":
    client = SERPClient(os.getenv('SEARCHCANS_API_KEY'))
    
    # 单次搜索
    results = client.get_top_results("Python数据分析", count=5)
    for r in results:
        print(f"{r['title']} - {r['link']}")
    
    # 批量搜索
    keywords = ["机器学习", "深度学习", "自然语言处理"]
    batch_results = client.batch_search(keywords)
    
    for keyword, results in batch_results.items():
        print(f"\n{keyword}: {len(results)}个结果")

进阶功能

1. 数据解析和清洗

import re
from urllib.parse import urlparse

class ResultParser:
    """搜索结果解析器"""
    
    @staticmethod
    def extract_domain(url: str) -> str:
        """提取域名"""
        try:
            return urlparse(url).netloc
        except:
            return ""
    
    @staticmethod
    def clean_snippet(snippet: str) -> str:
        """清洗摘要文本"""
        # 移除多余空白
        snippet = re.sub(r'\s+', ' ', snippet)
        # 移除特殊字符
        snippet = re.sub(r'[^\w\s\u4e00-\u9fff,。!?、;:""''()]', '', snippet)
        return snippet.strip()
    
    @staticmethod
    def extract_price(text: str) -> Optional[float]:
        """从文本中提取价格"""
        patterns = [
            r'¥\s*(\d+(?:,\d{3})*(?:\.\d{2})?)',
            r'(\d+(?:,\d{3})*(?:\.\d{2})?)\s*元',
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text)
            if match:
                price_str = match.group(1).replace(',', '')
                return float(price_str)
        return None
    
    def parse_result(self, result: Dict) -> Dict:
        """解析单个搜索结果"""
        return {
            'title': result.get('title', ''),
            'url': result.get('link', ''),
            'domain': self.extract_domain(result.get('link', '')),
            'snippet': self.clean_snippet(result.get('snippet', '')),
            'position': result.get('position', 0),
            'price': self.extract_price(result.get('snippet', ''))
        }
    
    def parse_results(self, results: List[Dict]) -> List[Dict]:
        """解析多个搜索结果"""
        return [self.parse_result(r) for r in results]

# 使用示例
parser = ResultParser()
client = SERPClient(os.getenv('SEARCHCANS_API_KEY'))

results = client.get_organic_results("iPhone 15 Pro价格")
parsed = parser.parse_results(results)

for item in parsed:
    if item['price']:
        print(f"{item['title']}: ¥{item['price']}")

2. 数据存储

使用pandas保存数据:

import pandas as pd
from datetime import datetime

class DataStorage:
    """数据存储管理"""
    
    @staticmethod
    def save_to_csv(data: List[Dict], filename: str):
        """保存为CSV文件"""
        df = pd.DataFrame(data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"数据已保存到 {filename}")
    
    @staticmethod
    def save_to_excel(data: List[Dict], filename: str):
        """保存为Excel文件"""
        df = pd.DataFrame(data)
        df.to_excel(filename, index=False, engine='openpyxl')
        print(f"数据已保存到 {filename}")
    
    @staticmethod
    def append_to_csv(data: List[Dict], filename: str):
        """追加数据到CSV"""
        df = pd.DataFrame(data)
        df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
    
    @staticmethod
    def load_from_csv(filename: str) -> pd.DataFrame:
        """从CSV加载数据"""
        return pd.read_csv(filename, encoding='utf-8-sig')

# 使用示例
storage = DataStorage()

# 搜索并保存
results = client.get_organic_results("Python教程")
parsed = parser.parse_results(results)

# 添加时间戳
for item in parsed:
    item['timestamp'] = datetime.now().isoformat()

# 保存数据
storage.save_to_csv(parsed, 'search_results.csv')
storage.save_to_excel(parsed, 'search_results.xlsx')

3. 异步并发搜索

使用asyncioaiohttp提升性能:

import asyncio
import aiohttp
from typing import List, Dict

class AsyncSERPClient:
    """异步SERP API客户端"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://searchcans.youxikuang.cn/api/search"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    async def search(self, session: aiohttp.ClientSession, query: str, **kwargs) -> Dict:
        """异步搜索"""
        payload = {
            "s": query,
            "t": kwargs.get('engine', 'bing'),
            "p": kwargs.get('page', 1),
            "d": kwargs.get('delay', 3000)
        }
        
        try:
            async with session.post(self.base_url, headers=self.headers, json=payload) as response:
                response.raise_for_status()
                return await response.json()
        except Exception as e:
            print(f"搜索 '{query}' 失败: {e}")
            return {}
    
    async def batch_search(self, queries: List[str], **kwargs) -> Dict[str, List]:
        """异步批量搜索"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.search(session, query, **kwargs) for query in queries]
            results = await asyncio.gather(*tasks)
            
            return {
                query: result.get('organic', [])
                for query, result in zip(queries, results)
            }

# 使用示例
async def main():
    client = AsyncSERPClient(os.getenv('SEARCHCANS_API_KEY'))
    
    keywords = [
        "Python教程",
        "机器学习",
        "数据分析",
        "Web开发",
        "人工智能"
    ]
    
    print("开始批量搜索...")
    start_time = time.time()
    
    results = await client.batch_search(keywords)
    
    elapsed = time.time() - start_time
    print(f"\n完成!耗时: {elapsed:.2f}秒")
    print(f"平均每个关键词: {elapsed/len(keywords):.2f}秒")
    
    for keyword, items in results.items():
        print(f"\n{keyword}: {len(items)}个结果")

# 运行异步程序
if __name__ == "__main__":
    asyncio.run(main())

实战项目

项目1:SEO关键词排名监控

完整的SEO监控工具:

# seo_monitor.py
import schedule
import time
from datetime import datetime
import pandas as pd

class SEOMonitor:
    """SEO关键词排名监控"""
    
    def __init__(self, api_key: str, target_domain: str):
        self.client = SERPClient(api_key)
        self.parser = ResultParser()
        self.storage = DataStorage()
        self.target_domain = target_domain
    
    def check_ranking(self, keyword: str, max_results: int = 50) -> Optional[int]:
        """
        检查目标域名在某关键词下的排名
        
        Returns:
            排名位置,如果未找到返回None
        """
        results = self.client.get_top_results(keyword, count=max_results)
        
        for i, result in enumerate(results, 1):
            domain = self.parser.extract_domain(result.get('link', ''))
            if self.target_domain in domain:
                return i
        
        return None
    
    def monitor_keywords(self, keywords: List[str]) -> pd.DataFrame:
        """监控多个关键词的排名"""
        data = []
        timestamp = datetime.now()
        
        for keyword in keywords:
            print(f"检查关键词: {keyword}")
            ranking = self.check_ranking(keyword)
            
            data.append({
                'timestamp': timestamp,
                'keyword': keyword,
                'ranking': ranking if ranking else '>50',
                'found': ranking is not None
            })
            
            time.sleep(1)  # 避免请求过快
        
        return pd.DataFrame(data)
    
    def save_report(self, df: pd.DataFrame, filename: str = None):
        """保存监控报告"""
        if filename is None:
            filename = f"seo_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        
        self.storage.save_to_csv(df.to_dict('records'), filename)
        return filename
    
    def daily_monitor(self, keywords: List[str]):
        """每日监控任务"""
        print(f"\n[{datetime.now()}] 开始每日SEO监控")
        
        df = self.monitor_keywords(keywords)
        filename = self.save_report(df)
        
        # 打印摘要
        found_count = df['found'].sum()
        print(f"\n监控完成:")
        print(f"- 关键词总数: {len(keywords)}")
        print(f"- 有排名: {found_count}")
        print(f"- 无排名: {len(keywords) - found_count}")
        print(f"- 报告已保存: {filename}")

# 使用示例
if __name__ == "__main__":
    monitor = SEOMonitor(
        api_key=os.getenv('SEARCHCANS_API_KEY'),
        target_domain="yourwebsite.com"
    )
    
    keywords = [
        "Python SERP API",
        "搜索API",
        "Bing搜索API",
        "数据采集API"
    ]
    
    # 立即执行一次
    monitor.daily_monitor(keywords)
    
    # 设置每天早上9点执行
    schedule.every().day.at("09:00").do(monitor.daily_monitor, keywords)
    
    print("\nSEO监控已启动,每天9:00自动执行")
    
    while True:
        schedule.run_pending()
        time.sleep(60)

项目2:竞品价格监控

# price_monitor.py
class PriceMonitor:
    """电商价格监控"""
    
    def __init__(self, api_key: str):
        self.client = SERPClient(api_key)
        self.parser = ResultParser()
    
    def monitor_product(self, product_name: str) -> List[Dict]:
        """监控产品价格"""
        query = f"{product_name} 价格"
        results = self.client.get_organic_results(query)
        
        prices = []
        for result in results:
            parsed = self.parser.parse_result(result)
            if parsed['price']:
                prices.append({
                    'product': product_name,
                    'title': parsed['title'],
                    'price': parsed['price'],
                    'url': parsed['url'],
                    'domain': parsed['domain'],
                    'timestamp': datetime.now()
                })
        
        return prices
    
    def compare_prices(self, products: List[str]) -> pd.DataFrame:
        """比较多个产品价格"""
        all_prices = []
        
        for product in products:
            print(f"查询: {product}")
            prices = self.monitor_product(product)
            all_prices.extend(prices)
            time.sleep(1)
        
        df = pd.DataFrame(all_prices)
        
        # 计算统计信息
        if not df.empty:
            summary = df.groupby('product')['price'].agg(['min', 'max', 'mean', 'count'])
            print("\n价格统计:")
            print(summary)
        
        return df

# 使用示例
monitor = PriceMonitor(os.getenv('SEARCHCANS_API_KEY'))

products = [
    "iPhone 15 Pro Max 256GB",
    "MacBook Pro M3",
    "AirPods Pro 2"
]

df = monitor.compare_prices(products)
df.to_excel('price_comparison.xlsx', index=False)

最佳实践

1. 错误处理

from functools import wraps
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def retry_on_failure(max_retries=3, delay=2):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    logger.warning(f"尝试 {attempt + 1}/{max_retries} 失败: {e}")
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                    else:
                        logger.error(f"所有重试失败")
                        raise
        return wrapper
    return decorator

class RobustSERPClient(SERPClient):
    @retry_on_failure(max_retries=3)
    def search(self, query: str, **kwargs):
        return super().search(query, **kwargs)

2. 速率限制

from time import time

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_calls: int, time_window: int):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
    
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time()
            
            # 清理过期的调用记录
            self.calls = [t for t in self.calls if now - t < self.time_window]
            
            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                if sleep_time > 0:
                    logger.info(f"达到速率限制,等待 {sleep_time:.2f}秒")
                    time.sleep(sleep_time)
                    self.calls = []
            
            self.calls.append(now)
            return func(*args, **kwargs)
        
        return wrapper
    
# 使用示例:每分钟最多30次请求
@RateLimiter(max_calls=30, time_window=60)
def search_with_limit(query):
    return client.search(query)

3. 成本监控

class CostTracker:
    """API成本追踪"""
    
    def __init__(self, cost_per_1k: float = 4.03):
        self.cost_per_1k = cost_per_1k
        self.call_count = 0
    
    def track_call(self):
        """记录一次API调用"""
        self.call_count += 1
    
    def get_cost(self) -> float:
        """计算总成本"""
        return (self.call_count / 1000) * self.cost_per_1k
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        return {
            'total_calls': self.call_count,
            'estimated_cost': f"¥{self.get_cost():.2f}",
            'cost_per_call': f"¥{self.cost_per_1k/1000:.4f}"
        }
    
    def reset(self):
        """重置计数器"""
        self.call_count = 0

# 集成到客户端
class TrackedSERPClient(SERPClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tracker = CostTracker()
    
    def search(self, query: str, **kwargs):
        result = super().search(query, **kwargs)
        self.tracker.track_call()
        return result
    
    def print_stats(self):
        stats = self.tracker.get_stats()
        print("\nAPI使用统计:")
        for key, value in stats.items():
            print(f"  {key}: {value}")

总结

通过本教程,你已经学会了:

✅ Python SERP API的基础用法
✅ 创建可复用的客户端类
✅ 数据解析和存储
✅ 异步并发搜索
✅ 实战项目开发
✅ 最佳实践和优化技巧

使用SearchCans SERP API,你可以以极低的成本(¥4.03/1000次)构建强大的数据采集和分析应用。

立即开始:

  1. 注册账户获取100积分免费体验
  2. 查看API文档了解更多接口
  3. API操作台测试你的代码
  4. 参考本教程快速开发应用

相关资源

开发教程

技术文档

标签:

Python SERP API 教程 开发指南

准备好用 SearchCans 构建你的 AI 应用了吗?

立即体验我们的 SERP API 和 Reader API。每千次调用仅需 ¥0.56 起,无需信用卡即可免费试用。