在信息爆炸的时代,企业需要持续追踪市场动态、竞品策略、行业趋势。传统的人工市场研究耗时长、成本高、更新慢。AI驱动的市场情报平台能够24/7自动采集、分析和报告关键信息,将市场研究从"月度报告"升级为"实时监控"。本文将手把手教你构建这样一个平台。
系统架构设计
核心功能模块
1. 数据采集层
- SERP API:搜索引擎结果采集
- Reader API:网页内容深度提取
- 定时任务调度
2. 数据处理层
- 内容清洗和标准化
- 实体识别(公司、产品、人名)
- 情感分析
- 主题分类
3. 数据存储层
- 时序数据库(监控数据)
- 文档数据库(原始内容)
- 向量数据库(语义检索)
4. 分析与洞察层
- 趋势分析
- 竞品对比
- 异常检测
- 报告生成
5. 展示层
- 仪表板
- 告警系统
- API接口
技术栈
- 后端:Python + FastAPI
- 任务调度:Celery + Redis
- 数据库:PostgreSQL + TimescaleDB + Pinecone
- LLM:GPT-4 / Claude 3
- 前端:React + ECharts
- 部署:Docker + Kubernetes
模块1:数据采集引擎
核心采集器
import requests
from typing import List, Dict
from datetime import datetime
class MarketDataCollector:
def __init__(self, serp_api_key: str, reader_api_key: str):
self.serp_api_key = serp_api_key
self.reader_api_key = reader_api_key
self.serp_url = "https://searchcans.youxikuang.cn/api/search"
self.reader_url = "https://searchcans.youxikuang.cn/api/url"
def search_news(self, query: str, num_results: int = 10) -> List[Dict]:
"""
搜索相关新闻
"""
response = requests.get(
self.serp_url,
headers={"Authorization": f"Bearer {self.serp_api_key}"},
params={
"q": query,
"num": num_results,
"tbm": "nws" # 新闻搜索
}
)
results = response.json().get("news_results", [])
return [
{
"title": item["title"],
"url": item["link"],
"source": item["source"],
"date": item["date"],
"snippet": item["snippet"]
}
for item in results
]
def extract_content(self, url: str) -> Dict:
"""
深度提取网页内容
"""
response = requests.get(
self.reader_url,
headers={"Authorization": f"Bearer {self.reader_api_key}"},
params={"url": url}
)
data = response.json()
return {
"content": data.get("content", ""),
"title": data.get("title", ""),
"author": data.get("author", ""),
"published_date": data.get("published_date")
}
def collect_competitor_intel(self, competitor_name: str) -> Dict:
"""
收集竞品情报
"""
# 多维度搜索
queries = [
f"{competitor_name} 新产品",
f"{competitor_name} 融资",
f"{competitor_name} 市场份额",
f"{competitor_name} 客户评价"
]
all_results = []
for query in queries:
results = self.search_news(query, num_results=5)
all_results.extend(results)
# 深度提取关键文章
detailed_articles = []
for result in all_results[:5]: # 提取前5篇
try:
content = self.extract_content(result["url"])
detailed_articles.append({
**result,
**content
})
except Exception as e:
print(f"提取失败:{result['url']} - {e}")
return {
"competitor": competitor_name,
"collected_at": datetime.now().isoformat(),
"news_count": len(all_results),
"articles": detailed_articles
}
定时监控任务
from celery import Celery
from celery.schedules import crontab
app = Celery('market_intelligence', broker='redis://localhost:6379')
@app.task
def daily_competitor_monitoring():
"""
每日竞品监控任务
"""
competitors = ["竞品A", "竞品B", "竞品C"]
collector = MarketDataCollector(SERP_API_KEY, READER_API_KEY)
for competitor in competitors:
intel = collector.collect_competitor_intel(competitor)
# 存储到数据库
db.market_intel.insert_one(intel)
# 分析并生成告警
analyze_and_alert(intel)
@app.task
def hourly_trend_tracking():
"""
每小时行业趋势追踪
"""
keywords = ["AI Agent", "大模型", "RAG系统"]
collector = MarketDataCollector(SERP_API_KEY, READER_API_KEY)
for keyword in keywords:
results = collector.search_news(keyword, num_results=20)
# 存储到时序数据库
for result in results:
db.timeseries.insert({
"keyword": keyword,
"title": result["title"],
"timestamp": datetime.now(),
"source": result["source"]
})
# 配置定时任务
app.conf.beat_schedule = {
'daily-competitor-monitoring': {
'task': 'tasks.daily_competitor_monitoring',
'schedule': crontab(hour=8, minute=0) # 每天早上8点
},
'hourly-trend-tracking': {
'task': 'tasks.hourly_trend_tracking',
'schedule': crontab(minute=0) # 每小时
}
}
模块2:智能分析引擎
实体识别与关系抽取
from transformers import pipeline
class IntelligenceAnalyzer:
def __init__(self):
self.ner = pipeline("ner", model="bert-base-chinese")
self.sentiment = pipeline("sentiment-analysis")
def extract_entities(self, text: str) -> Dict:
"""
提取文本中的实体(公司、产品、人名)
"""
entities = self.ner(text)
# 聚合同类实体
companies = [e["word"] for e in entities if e["entity"] == "ORG"]
products = [e["word"] for e in entities if e["entity"] == "PRODUCT"]
persons = [e["word"] for e in entities if e["entity"] == "PER"]
return {
"companies": list(set(companies)),
"products": list(set(products)),
"persons": list(set(persons))
}
def analyze_sentiment(self, text: str) -> Dict:
"""
分析文本情感倾向
"""
result = self.sentiment(text)[0]
return {
"label": result["label"], # POSITIVE / NEGATIVE
"score": result["score"]
}
def generate_summary(self, text: str) -> str:
"""
使用LLM生成摘要
"""
prompt = f"""
请用2-3句话总结以下新闻的核心内容:
{text[:2000]} # 限制长度
摘要:
"""
response = llm.generate(prompt, max_tokens=150)
return response
趋势分析
import pandas as pd
from scipy import stats
class TrendAnalyzer:
def analyze_keyword_trend(self, keyword: str, days: int = 30) -> Dict:
"""
分析关键词的搜索热度趋势
"""
# 从时序数据库查询历史数据
df = pd.read_sql(
f"""
SELECT DATE(timestamp) as date, COUNT(*) as mentions
FROM market_intel
WHERE keyword = '{keyword}'
AND timestamp >= NOW() - INTERVAL '{days} days'
GROUP BY DATE(timestamp)
ORDER BY date
""",
con=db_engine
)
# 计算趋势
if len(df) < 2:
return {"trend": "insufficient_data"}
x = range(len(df))
y = df["mentions"].values
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# 判断趋势
if p_value < 0.05: # 统计显著
if slope > 0:
trend = "上升"
change_rate = (y[-1] - y[0]) / y[0] * 100
else:
trend = "下降"
change_rate = (y[0] - y[-1]) / y[0] * 100
else:
trend = "稳定"
change_rate = 0
return {
"keyword": keyword,
"trend": trend,
"change_rate": f"{change_rate:.1f}%",
"confidence": 1 - p_value,
"data_points": len(df)
}
def detect_anomalies(self, keyword: str) -> List[Dict]:
"""
检测异常峰值(可能是重大事件)
"""
df = pd.read_sql(
f"""
SELECT timestamp, COUNT(*) as mentions
FROM market_intel
WHERE keyword = '{keyword}'
AND timestamp >= NOW() - INTERVAL '7 days'
GROUP BY DATE_TRUNC('hour', timestamp)
ORDER BY timestamp
""",
con=db_engine
)
# 使用3-sigma规则检测异常
mean = df["mentions"].mean()
std = df["mentions"].std()
threshold = mean + 3 * std
anomalies = df[df["mentions"] > threshold]
return [
{
"timestamp": row["timestamp"],
"mentions": row["mentions"],
"deviation": (row["mentions"] - mean) / std
}
for _, row in anomalies.iterrows()
]
竞品对比分析
class CompetitorAnalyzer:
def compare_competitors(self, competitors: List[str]) -> Dict:
"""
多维度对比竞品
"""
comparison = {}
for competitor in competitors:
# 媒体曝光度
exposure = db.market_intel.count_documents({
"competitor": competitor,
"collected_at": {"$gte": thirty_days_ago()}
})
# 情感分析
articles = db.market_intel.find({"competitor": competitor}).limit(50)
sentiments = [
IntelligenceAnalyzer().analyze_sentiment(article["content"])
for article in articles
]
positive_rate = sum(1 for s in sentiments if s["label"] == "POSITIVE") / len(sentiments)
# 提及的产品和技术
all_entities = []
for article in articles:
entities = IntelligenceAnalyzer().extract_entities(article["content"])
all_entities.extend(entities["products"])
top_products = pd.Series(all_entities).value_counts().head(5).to_dict()
comparison[competitor] = {
"exposure_score": exposure,
"sentiment_score": positive_rate,
"top_products": top_products
}
return comparison
模块3:告警与报告系统
智能告警
class AlertSystem:
def __init__(self):
self.alert_rules = []
def add_rule(self, rule: Dict):
"""
添加告警规则
"""
self.alert_rules.append(rule)
def check_alerts(self, intel_data: Dict):
"""
检查是否触发告警
"""
alerts = []
for rule in self.alert_rules:
if rule["type"] == "keyword_spike":
# 关键词提及量激增
trend = TrendAnalyzer().analyze_keyword_trend(rule["keyword"], days=1)
if trend["trend"] == "上升" and float(trend["change_rate"].rstrip('%')) > rule["threshold"]:
alerts.append({
"severity": "high",
"message": f"关键词'{rule['keyword']}'提及量激增{trend['change_rate']}"
})
elif rule["type"] == "competitor_news":
# 竞品重大新闻
if "融资" in intel_data.get("title", "") or "收购" in intel_data.get("title", ""):
alerts.append({
"severity": "medium",
"message": f"竞品{intel_data['competitor']}有重大新闻:{intel_data['title']}"
})
elif rule["type"] == "negative_sentiment":
# 负面舆情
sentiment = IntelligenceAnalyzer().analyze_sentiment(intel_data.get("content", ""))
if sentiment["label"] == "NEGATIVE" and sentiment["score"] > 0.8:
alerts.append({
"severity": "high",
"message": f"检测到强烈负面舆情:{intel_data['title']}"
})
# 发送告警
for alert in alerts:
self.send_alert(alert)
def send_alert(self, alert: Dict):
"""
发送告警(邮件、Slack、微信等)
"""
# 发送邮件
send_email(
to="team@company.com",
subject=f"[市场情报告警] {alert['severity'].upper()}",
body=alert["message"]
)
# 推送到Slack
slack_webhook.post(json={"text": f"🚨 {alert['message']}"})
# 配置告警规则
alert_system = AlertSystem()
alert_system.add_rule({
"type": "keyword_spike",
"keyword": "AI Agent",
"threshold": 50 # 提及量增长超过50%
})
alert_system.add_rule({
"type": "competitor_news",
"competitors": ["竞品A", "竞品B"]
})
自动报告生成
class ReportGenerator:
def generate_weekly_report(self) -> str:
"""
生成周报
"""
# 收集本周数据
this_week = get_this_week_data()
# 使用LLM生成报告
prompt = f"""
基于以下市场情报数据,生成一份周报:
## 数据概览
- 监控竞品:{len(this_week['competitors'])}家
- 采集新闻:{this_week['news_count']}条
- 检测到的趋势:{this_week['trends']}
## 竞品动态
{json.dumps(this_week['competitor_intel'], ensure_ascii=False, indent=2)}
## 行业趋势
{json.dumps(this_week['industry_trends'], ensure_ascii=False, indent=2)}
请生成一份结构化的周报,包括:
1. 执行摘要(200字)
2. 竞品动态分析
3. 行业趋势洞察
4. 风险与机会
5. 行动建议
使用Markdown格式,专业、客观的语气。
"""
report = llm.generate(prompt, max_tokens=2000)
# 保存报告
report_path = f"reports/weekly_{datetime.now().strftime('%Y%m%d')}.md"
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
return report
模块4:API与仪表板
REST API
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Market Intelligence API")
class CompetitorQuery(BaseModel):
competitor_name: str
days: int = 7
@app.get("/api/competitors")
def list_competitors():
"""
获取所有监控的竞品列表
"""
competitors = db.competitors.find({})
return {"competitors": [c["name"] for c in competitors]}
@app.post("/api/competitor/intel")
def get_competitor_intel(query: CompetitorQuery):
"""
获取指定竞品的最新情报
"""
intel = db.market_intel.find({
"competitor": query.competitor_name,
"collected_at": {"$gte": days_ago(query.days)}
}).sort("collected_at", -1)
return {"intel": list(intel)}
@app.get("/api/trends/{keyword}")
def get_keyword_trend(keyword: str, days: int = 30):
"""
获取关键词趋势分析
"""
analyzer = TrendAnalyzer()
trend = analyzer.analyze_keyword_trend(keyword, days)
anomalies = analyzer.detect_anomalies(keyword)
return {
"trend": trend,
"anomalies": anomalies
}
@app.get("/api/report/weekly")
def get_weekly_report():
"""
获取最新周报
"""
report = ReportGenerator().generate_weekly_report()
return {"report": report}
可视化仪表板
使用React + ECharts构建交互式仪表板:
import React, { useEffect, useState } from 'react';
import ReactECharts from 'echarts-for-react';
function TrendChart({ keyword }) {
const [data, setData] = useState(null);
useEffect(() => {
fetch(`/api/trends/${keyword}?days=30`)
.then(res => res.json())
.then(data => setData(data));
}, [keyword]);
if (!data) return <div>加载中...</div>;
const option = {
title: { text: `${keyword} 趋势分析` },
xAxis: { type: 'category', data: data.dates },
yAxis: { type: 'value' },
series: [{
data: data.mentions,
type: 'line',
smooth: true
}]
};
return <ReactECharts option={option} />;
}
部署与运维
Docker化部署
# Dockerfile
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- SERP_API_KEY=${SERP_API_KEY}
- READER_API_KEY=${READER_API_KEY}
celery-worker:
build: .
command: celery -A tasks worker --loglevel=info
celery-beat:
build: .
command: celery -A tasks beat --loglevel=info
redis:
image: redis:7-alpine
postgres:
image: postgres:14
environment:
- POSTGRES_DB=market_intel
监控与日志
import logging
from prometheus_client import Counter, Histogram
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('market_intel.log'),
logging.StreamHandler()
]
)
# Prometheus指标
api_requests = Counter('api_requests_total', 'Total API requests')
collection_duration = Histogram('collection_duration_seconds', 'Collection duration')
@api_requests.count_exceptions()
@collection_duration.time()
def collect_data():
# ... 数据采集逻辑
pass
成本与扩展性
成本估算(月1000次竞品监控)
- SERP API:1000次 × $0.002 = $2
- Reader API:500次 × $0.001 = $0.5
- LLM(摘要生成):$50
- 服务器:$100
- 总计:约$150/月
相比传统人工市场研究($5,000-$10,000/月),节省97%。
扩展性
- 横向扩展:增加Celery worker处理更多任务
- 数据分片:按竞品或地区分片存储
- 缓存层:Redis缓存热点查询
- CDN:静态报告通过CDN分发
完整的技术实现参考金融情报自动化案例和SearchCans API文档。
总结
构建AI驱动的市场情报平台,核心要素是:
- 持续数据采集:SERP API + Reader API
- 智能分析:实体识别、情感分析、趋势检测
- 实时告警:规则引擎+异常检测
- 可视化展示:仪表板+自动报告
通过自动化,企业能从"事后分析"转向"实时监控",从"被动应对"转向"主动决策",在快速变化的市场中保持竞争优势。
相关资源
技术实现:
- SERP API入门 – 搜索API
- 金融情报自动化 – 金融案例
- 实时数据采集 – 采集方案
分析方法:
系统架构:
- 高级RAG系统 – RAG设计
- DeepResearch商业应用 – 研究应用
- API文档 – 技术文档
SearchCans提供高性价比的Bing搜索API和Reader API服务,专为AI Agent和开发者打造。立即体验 →