Python是数据采集和自动化的首选语言,而SERP API为Python开发者提供了获取搜索引擎数据的便捷方式。本教程将从基础到进阶,全面讲解如何使用Python开发SERP API应用。
相关教程:什么是SERP API | Node.js教程 | AI Agent开发
为什么选择Python开发SERP应用?
Python的优势
Python在数据处理和API开发方面具有独特优势:
- 简洁易读:代码清晰,开发效率高
- 丰富的库:requests、pandas、beautifulsoup等强大工具
- 数据处理能力:天然适合数据分析和处理
- 社区支持:大量教程和开源项目
- 跨平台:Windows、Linux、Mac都能运行
SERP API的应用场景
使用Python + SERP API可以实现:
- 🔍 SEO关键词排名监控
- 📊 竞品分析和市场研究
- 🤖 AI Agent实时搜索功能
- 💰 电商价格监控
- 📰 新闻聚合和内容采集
- 🎯 广告投放效果分析
环境准备
安装Python
确保安装了Python 3.7或更高版本:
python --version
安装依赖库
pip install requests pandas python-dotenv
获取API密钥
- 访问SearchCans注册页面创建账户
- 在控制台生成API密钥
- 新用户获得100积分免费体验
基础教程
第一个SERP API请求
创建basic_search.py:
import requests
import json
def search_bing(query, api_key):
"""
基础的Bing搜索API调用
"""
url = "https://searchcans.youxikuang.cn/api/search"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"s": query, # 搜索关键词
"t": "bing", # 搜索引擎类型
"p": 1, # 页码
"d": 3000 # 延迟(毫秒)
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
API_KEY = "your_api_key_here"
query = "Python教程"
results = search_bing(query, API_KEY)
if results:
print(f"搜索 '{query}' 的结果:\n")
for item in results.get('organic', [])[:5]:
print(f"标题: {item['title']}")
print(f"链接: {item['link']}")
print(f"摘要: {item['snippet']}\n")
运行程序:
python basic_search.py
使用环境变量管理密钥
创建.env文件:
SEARCHCANS_API_KEY=your_actual_api_key
修改代码使用环境变量:
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
API_KEY = os.getenv('SEARCHCANS_API_KEY')
if not API_KEY:
raise ValueError("请设置SEARCHCANS_API_KEY环境变量")
创建可复用的搜索类
# serp_client.py
import requests
from typing import Dict, List, Optional
import time
class SERPClient:
"""SearchCans SERP API客户端"""
def __init__(self, api_key: str, base_url: str = None):
self.api_key = api_key
self.base_url = base_url or "https://searchcans.youxikuang.cn/api/search"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def search(
self,
query: str,
engine: str = "bing",
page: int = 1,
delay: int = 3000
) -> Optional[Dict]:
"""
执行搜索
Args:
query: 搜索关键词
engine: 搜索引擎 (bing/google)
page: 页码
delay: 延迟毫秒数
Returns:
搜索结果字典
"""
payload = {
"s": query,
"t": engine,
"p": page,
"d": delay
}
try:
response = self.session.post(self.base_url, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
return None
def get_organic_results(self, query: str, **kwargs) -> List[Dict]:
"""获取自然搜索结果"""
data = self.search(query, **kwargs)
return data.get('organic', []) if data else []
def get_top_results(self, query: str, count: int = 10, **kwargs) -> List[Dict]:
"""获取前N个搜索结果"""
results = self.get_organic_results(query, **kwargs)
return results[:count]
def batch_search(self, queries: List[str], delay_between: float = 1.0, **kwargs) -> Dict[str, List]:
"""
批量搜索多个关键词
Args:
queries: 关键词列表
delay_between: 请求间隔(秒)
Returns:
{query: results} 字典
"""
results = {}
for query in queries:
print(f"正在搜索: {query}")
results[query] = self.get_organic_results(query, **kwargs)
time.sleep(delay_between)
return results
# 使用示例
if __name__ == "__main__":
client = SERPClient(os.getenv('SEARCHCANS_API_KEY'))
# 单次搜索
results = client.get_top_results("Python数据分析", count=5)
for r in results:
print(f"{r['title']} - {r['link']}")
# 批量搜索
keywords = ["机器学习", "深度学习", "自然语言处理"]
batch_results = client.batch_search(keywords)
for keyword, results in batch_results.items():
print(f"\n{keyword}: {len(results)}个结果")
进阶功能
1. 数据解析和清洗
import re
from urllib.parse import urlparse
class ResultParser:
"""搜索结果解析器"""
@staticmethod
def extract_domain(url: str) -> str:
"""提取域名"""
try:
return urlparse(url).netloc
except:
return ""
@staticmethod
def clean_snippet(snippet: str) -> str:
"""清洗摘要文本"""
# 移除多余空白
snippet = re.sub(r'\s+', ' ', snippet)
# 移除特殊字符
snippet = re.sub(r'[^\w\s\u4e00-\u9fff,。!?、;:""''()]', '', snippet)
return snippet.strip()
@staticmethod
def extract_price(text: str) -> Optional[float]:
"""从文本中提取价格"""
patterns = [
r'¥\s*(\d+(?:,\d{3})*(?:\.\d{2})?)',
r'(\d+(?:,\d{3})*(?:\.\d{2})?)\s*元',
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
price_str = match.group(1).replace(',', '')
return float(price_str)
return None
def parse_result(self, result: Dict) -> Dict:
"""解析单个搜索结果"""
return {
'title': result.get('title', ''),
'url': result.get('link', ''),
'domain': self.extract_domain(result.get('link', '')),
'snippet': self.clean_snippet(result.get('snippet', '')),
'position': result.get('position', 0),
'price': self.extract_price(result.get('snippet', ''))
}
def parse_results(self, results: List[Dict]) -> List[Dict]:
"""解析多个搜索结果"""
return [self.parse_result(r) for r in results]
# 使用示例
parser = ResultParser()
client = SERPClient(os.getenv('SEARCHCANS_API_KEY'))
results = client.get_organic_results("iPhone 15 Pro价格")
parsed = parser.parse_results(results)
for item in parsed:
if item['price']:
print(f"{item['title']}: ¥{item['price']}")
2. 数据存储
使用pandas保存数据:
import pandas as pd
from datetime import datetime
class DataStorage:
"""数据存储管理"""
@staticmethod
def save_to_csv(data: List[Dict], filename: str):
"""保存为CSV文件"""
df = pd.DataFrame(data)
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"数据已保存到 {filename}")
@staticmethod
def save_to_excel(data: List[Dict], filename: str):
"""保存为Excel文件"""
df = pd.DataFrame(data)
df.to_excel(filename, index=False, engine='openpyxl')
print(f"数据已保存到 {filename}")
@staticmethod
def append_to_csv(data: List[Dict], filename: str):
"""追加数据到CSV"""
df = pd.DataFrame(data)
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
@staticmethod
def load_from_csv(filename: str) -> pd.DataFrame:
"""从CSV加载数据"""
return pd.read_csv(filename, encoding='utf-8-sig')
# 使用示例
storage = DataStorage()
# 搜索并保存
results = client.get_organic_results("Python教程")
parsed = parser.parse_results(results)
# 添加时间戳
for item in parsed:
item['timestamp'] = datetime.now().isoformat()
# 保存数据
storage.save_to_csv(parsed, 'search_results.csv')
storage.save_to_excel(parsed, 'search_results.xlsx')
3. 异步并发搜索
使用asyncio和aiohttp提升性能:
import asyncio
import aiohttp
from typing import List, Dict
class AsyncSERPClient:
"""异步SERP API客户端"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://searchcans.youxikuang.cn/api/search"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def search(self, session: aiohttp.ClientSession, query: str, **kwargs) -> Dict:
"""异步搜索"""
payload = {
"s": query,
"t": kwargs.get('engine', 'bing'),
"p": kwargs.get('page', 1),
"d": kwargs.get('delay', 3000)
}
try:
async with session.post(self.base_url, headers=self.headers, json=payload) as response:
response.raise_for_status()
return await response.json()
except Exception as e:
print(f"搜索 '{query}' 失败: {e}")
return {}
async def batch_search(self, queries: List[str], **kwargs) -> Dict[str, List]:
"""异步批量搜索"""
async with aiohttp.ClientSession() as session:
tasks = [self.search(session, query, **kwargs) for query in queries]
results = await asyncio.gather(*tasks)
return {
query: result.get('organic', [])
for query, result in zip(queries, results)
}
# 使用示例
async def main():
client = AsyncSERPClient(os.getenv('SEARCHCANS_API_KEY'))
keywords = [
"Python教程",
"机器学习",
"数据分析",
"Web开发",
"人工智能"
]
print("开始批量搜索...")
start_time = time.time()
results = await client.batch_search(keywords)
elapsed = time.time() - start_time
print(f"\n完成!耗时: {elapsed:.2f}秒")
print(f"平均每个关键词: {elapsed/len(keywords):.2f}秒")
for keyword, items in results.items():
print(f"\n{keyword}: {len(items)}个结果")
# 运行异步程序
if __name__ == "__main__":
asyncio.run(main())
实战项目
项目1:SEO关键词排名监控
完整的SEO监控工具:
# seo_monitor.py
import schedule
import time
from datetime import datetime
import pandas as pd
class SEOMonitor:
"""SEO关键词排名监控"""
def __init__(self, api_key: str, target_domain: str):
self.client = SERPClient(api_key)
self.parser = ResultParser()
self.storage = DataStorage()
self.target_domain = target_domain
def check_ranking(self, keyword: str, max_results: int = 50) -> Optional[int]:
"""
检查目标域名在某关键词下的排名
Returns:
排名位置,如果未找到返回None
"""
results = self.client.get_top_results(keyword, count=max_results)
for i, result in enumerate(results, 1):
domain = self.parser.extract_domain(result.get('link', ''))
if self.target_domain in domain:
return i
return None
def monitor_keywords(self, keywords: List[str]) -> pd.DataFrame:
"""监控多个关键词的排名"""
data = []
timestamp = datetime.now()
for keyword in keywords:
print(f"检查关键词: {keyword}")
ranking = self.check_ranking(keyword)
data.append({
'timestamp': timestamp,
'keyword': keyword,
'ranking': ranking if ranking else '>50',
'found': ranking is not None
})
time.sleep(1) # 避免请求过快
return pd.DataFrame(data)
def save_report(self, df: pd.DataFrame, filename: str = None):
"""保存监控报告"""
if filename is None:
filename = f"seo_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
self.storage.save_to_csv(df.to_dict('records'), filename)
return filename
def daily_monitor(self, keywords: List[str]):
"""每日监控任务"""
print(f"\n[{datetime.now()}] 开始每日SEO监控")
df = self.monitor_keywords(keywords)
filename = self.save_report(df)
# 打印摘要
found_count = df['found'].sum()
print(f"\n监控完成:")
print(f"- 关键词总数: {len(keywords)}")
print(f"- 有排名: {found_count}")
print(f"- 无排名: {len(keywords) - found_count}")
print(f"- 报告已保存: {filename}")
# 使用示例
if __name__ == "__main__":
monitor = SEOMonitor(
api_key=os.getenv('SEARCHCANS_API_KEY'),
target_domain="yourwebsite.com"
)
keywords = [
"Python SERP API",
"搜索API",
"Bing搜索API",
"数据采集API"
]
# 立即执行一次
monitor.daily_monitor(keywords)
# 设置每天早上9点执行
schedule.every().day.at("09:00").do(monitor.daily_monitor, keywords)
print("\nSEO监控已启动,每天9:00自动执行")
while True:
schedule.run_pending()
time.sleep(60)
项目2:竞品价格监控
# price_monitor.py
class PriceMonitor:
"""电商价格监控"""
def __init__(self, api_key: str):
self.client = SERPClient(api_key)
self.parser = ResultParser()
def monitor_product(self, product_name: str) -> List[Dict]:
"""监控产品价格"""
query = f"{product_name} 价格"
results = self.client.get_organic_results(query)
prices = []
for result in results:
parsed = self.parser.parse_result(result)
if parsed['price']:
prices.append({
'product': product_name,
'title': parsed['title'],
'price': parsed['price'],
'url': parsed['url'],
'domain': parsed['domain'],
'timestamp': datetime.now()
})
return prices
def compare_prices(self, products: List[str]) -> pd.DataFrame:
"""比较多个产品价格"""
all_prices = []
for product in products:
print(f"查询: {product}")
prices = self.monitor_product(product)
all_prices.extend(prices)
time.sleep(1)
df = pd.DataFrame(all_prices)
# 计算统计信息
if not df.empty:
summary = df.groupby('product')['price'].agg(['min', 'max', 'mean', 'count'])
print("\n价格统计:")
print(summary)
return df
# 使用示例
monitor = PriceMonitor(os.getenv('SEARCHCANS_API_KEY'))
products = [
"iPhone 15 Pro Max 256GB",
"MacBook Pro M3",
"AirPods Pro 2"
]
df = monitor.compare_prices(products)
df.to_excel('price_comparison.xlsx', index=False)
最佳实践
1. 错误处理
from functools import wraps
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def retry_on_failure(max_retries=3, delay=2):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f"尝试 {attempt + 1}/{max_retries} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(delay)
else:
logger.error(f"所有重试失败")
raise
return wrapper
return decorator
class RobustSERPClient(SERPClient):
@retry_on_failure(max_retries=3)
def search(self, query: str, **kwargs):
return super().search(query, **kwargs)
2. 速率限制
from time import time
class RateLimiter:
"""速率限制器"""
def __init__(self, max_calls: int, time_window: int):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time()
# 清理过期的调用记录
self.calls = [t for t in self.calls if now - t < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
logger.info(f"达到速率限制,等待 {sleep_time:.2f}秒")
time.sleep(sleep_time)
self.calls = []
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
# 使用示例:每分钟最多30次请求
@RateLimiter(max_calls=30, time_window=60)
def search_with_limit(query):
return client.search(query)
3. 成本监控
class CostTracker:
"""API成本追踪"""
def __init__(self, cost_per_1k: float = 4.03):
self.cost_per_1k = cost_per_1k
self.call_count = 0
def track_call(self):
"""记录一次API调用"""
self.call_count += 1
def get_cost(self) -> float:
"""计算总成本"""
return (self.call_count / 1000) * self.cost_per_1k
def get_stats(self) -> Dict:
"""获取统计信息"""
return {
'total_calls': self.call_count,
'estimated_cost': f"¥{self.get_cost():.2f}",
'cost_per_call': f"¥{self.cost_per_1k/1000:.4f}"
}
def reset(self):
"""重置计数器"""
self.call_count = 0
# 集成到客户端
class TrackedSERPClient(SERPClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tracker = CostTracker()
def search(self, query: str, **kwargs):
result = super().search(query, **kwargs)
self.tracker.track_call()
return result
def print_stats(self):
stats = self.tracker.get_stats()
print("\nAPI使用统计:")
for key, value in stats.items():
print(f" {key}: {value}")
总结
通过本教程,你已经学会了:
✅ Python SERP API的基础用法
✅ 创建可复用的客户端类
✅ 数据解析和存储
✅ 异步并发搜索
✅ 实战项目开发
✅ 最佳实践和优化技巧
使用SearchCans SERP API,你可以以极低的成本(¥4.03/1000次)构建强大的数据采集和分析应用。
立即开始:
相关资源
开发教程:
- Node.js教程 – Node.js开发
- AI Agent开发 – AI应用
- SEO工具开发 – SEO应用
技术文档: