URL提取 26 分钟阅读

URL内容提取API完整实现指南

URL内容提取API的详细实现教程。包含代码示例、最佳实践、错误处理和性能优化,为开发者提供完整的集成方案。

10,185 字

构建从URL提取内容的应用比以往更容易。本综合指南涵盖了在应用程序中实现URL内容提取所需的一切知识。

快速开始: API文档 | 获取API密钥 | 在线测试

什么是URL内容提取?

URL内容提取API自动获取网页并返回清洁、结构化的内容:

# 输入: 任何URL
url = "https://example.com/article"

# 输出: 结构化数据
{
  "title": "文章标题",
  "content": "清洁的文章文本...",
  "author": "张三", 
  "published_date": "2025-01-15",
  "images": ["image1.jpg", "image2.jpg"],
  "metadata": {...}
}

核心优势:

  • 无需HTML解析
  • 处理JavaScript重度网站
  • 自动提取元数据
  • 适用于所有网站类型

开始使用

1. API设置

首先获取API凭证:

# 安装必需包
pip install requests

# 基础设置
import requests

API_KEY = "your-api-key"
API_ENDPOINT = "https://searchcans.youxikuang.cn/api/url"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

2. 基础实现

def extract_content(url, enable_js=True):
    """从URL提取内容"""
    payload = {
        "url": url,
        "b": enable_js  # 启用浏览器渲染
    }
    
    response = requests.post(
        API_ENDPOINT,
        json=payload,
        headers=headers
    )
    
    response.raise_for_status()
    return response.json()

# 使用示例
data = extract_content("https://example.com/article")
print(f"标题: {data['title']}")
print(f"内容: {data['content'][:200]}...")

高级功能

JavaScript渲染

许多现代网站需要JavaScript执行:

# 静态HTML网站(更快)
data = extract_content(url, enable_js=False)

# JavaScript重度网站(更兼容)
data = extract_content(url, enable_js=True)

批量处理

高效处理多个URL:

def extract_batch(urls, max_workers=5):
    """从多个URL提取内容"""
    from concurrent.futures import ThreadPoolExecutor
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(extract_content, urls))
    
    return results

# 并行处理100个URL
urls = ["https://example.com/page1", "https://example.com/page2", ...]
results = extract_batch(urls)

错误处理

生产环境的健壮错误处理:

def safe_extract_content(url, max_retries=3):
    """带重试逻辑的内容提取"""
    for attempt in range(max_retries):
        try:
            return extract_content(url)
        except requests.HTTPError as e:
            if e.response.status_code == 429:  # 频率限制
                time.sleep(2 ** attempt)  # 指数退避
                continue
            elif e.response.status_code == 404:
                return None  # URL未找到
            else:
                raise e
        except requests.RequestException:
            if attempt == max_retries - 1:
                raise
            time.sleep(1)
    
    return None

使用场景示例

新闻聚合

class NewsAggregator:
    def __init__(self, api_key):
        self.api_key = api_key
        
    def fetch_article(self, url):
        data = extract_content(url)
        return {
            'headline': data['title'],
            'body': data['content'],
            'author': data.get('author'),
            'published': data.get('published_date'),
            'source_url': url,
            'word_count': len(data['content'].split())
        }
    
    def aggregate_from_sources(self, rss_feeds):
        articles = []
        for feed in rss_feeds:
            urls = self.parse_rss(feed)
            
            for url in urls:
                article = self.fetch_article(url)
                articles.append(article)
                
        return articles

内容分析

def analyze_content(url):
    """分析提取的内容"""
    data = extract_content(url)
    
    content = data['content']
    
    analysis = {
        'word_count': len(content.split()),
        'reading_time': len(content.split()) // 200,  # 200字/分钟
        'has_author': bool(data.get('author')),
        'has_date': bool(data.get('published_date')),
        'image_count': len(data.get('images', [])),
        'content_quality': 'high' if len(content) > 1000 else 'low'
    }
    
    return analysis

AI训练数据收集

def collect_training_data(urls, output_file):
    """收集AI训练的清洁文本数据"""
    training_data = []
    
    for url in urls:
        try:
            data = extract_content(url)
            
            sample = {
                'text': data['content'],
                'metadata': {
                    'title': data['title'],
                    'source': url,
                    'length': len(data['content']),
                    'language': 'zh'
                }
            }
            
            training_data.append(sample)
            
        except Exception as e:
            print(f"提取失败 {url}: {e}")
            continue
    
    # 保存为JSONL格式用于训练
    with open(output_file, 'w', encoding='utf-8') as f:
        for sample in training_data:
            f.write(json.dumps(sample, ensure_ascii=False) + '\n')

性能优化

缓存策略

import redis
import json
from hashlib import md5

class CachedExtractor:
    def __init__(self, api_key, redis_client):
        self.api_key = api_key
        self.redis = redis_client
        self.cache_ttl = 24 * 60 * 60  # 24小时
    
    def extract_with_cache(self, url):
        # 创建缓存键
        cache_key = f"extract:{md5(url.encode()).hexdigest()}"
        
        # 先尝试缓存
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # 提取并缓存
        data = extract_content(url)
        self.redis.setex(
            cache_key, 
            self.cache_ttl, 
            json.dumps(data, ensure_ascii=False)
        )
        
        return data

频率限制

import time
from collections import defaultdict

class RateLimitedExtractor:
    def __init__(self, api_key, requests_per_second=10):
        self.api_key = api_key
        self.rps = requests_per_second
        self.last_request = defaultdict(float)
    
    def extract_with_rate_limit(self, url):
        # 执行频率限制
        current_time = time.time()
        time_since_last = current_time - self.last_request['default']
        
        if time_since_last < (1.0 / self.rps):
            sleep_time = (1.0 / self.rps) - time_since_last
            time.sleep(sleep_time)
        
        self.last_request['default'] = time.time()
        
        return extract_content(url)

集成模式

Webhook处理

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/extract', methods=['POST'])
def extract_endpoint():
    """内容提取的Webhook端点"""
    data = request.json
    url = data.get('url')
    
    if not url:
        return jsonify({'error': '需要URL参数'}), 400
    
    try:
        result = extract_content(url)
        return jsonify({
            'status': 'success',
            'data': result
        })
    except Exception as e:
        return jsonify({
            'status': 'error',
            'message': str(e)
        }), 500

# 使用: POST /extract with {"url": "https://example.com"}

基于队列的处理

import celery

app = celery.Celery('content_extractor')

@app.task(retry_kwargs={'max_retries': 3})
def extract_content_task(url, callback_url=None):
    """内容提取的后台任务"""
    try:
        data = extract_content(url)
        
        # 可选:将结果发送到回调URL
        if callback_url:
            requests.post(callback_url, json=data)
        
        return data
        
    except Exception as e:
        # 失败时重试
        raise extract_content_task.retry(exc=e)

# 使用
result = extract_content_task.delay('https://example.com/article')

最佳��践

1. URL验证

import re
from urllib.parse import urlparse

def is_valid_url(url):
    """验证URL格式"""
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except:
        return False

def normalize_url(url):
    """标准化URL格式"""
    if not url.startswith(('http://', 'https://')):
        url = 'https://' + url
    return url.rstrip('/')

2. 内容质量检查

def validate_extracted_content(data):
    """验证提取内容的质量"""
    content = data.get('content', '')
    
    checks = {
        'has_content': len(content.strip()) > 100,
        'has_title': bool(data.get('title', '').strip()),
        'reasonable_length': 100 < len(content) < 100000,
        'not_error_page': 'error' not in content.lower()[:200]
    }
    
    return all(checks.values()), checks

3. 监控和日志

import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def extract_with_logging(url):
    """带全面日志的内容提取"""
    start_time = datetime.now()
    
    logger.info(f"开始提取: {url}")
    
    try:
        data = extract_content(url)
        
        duration = (datetime.now() - start_time).total_seconds()
        content_length = len(data.get('content', ''))
        
        logger.info(
            f"提取成功: {url} "
            f"({duration:.2f}秒, {content_length}字符)"
        )
        
        return data
        
    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        logger.error(f"提取失败: {url} ({duration:.2f}秒) - {e}")
        raise

问题排查

常见问题

1. 返回空内容

# 解决方案:启用JavaScript渲染
data = extract_content(url, enable_js=True)

2. 频率限制错误

# 解决方案:实现指数退避
def extract_with_backoff(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            return extract_content(url)
        except requests.HTTPError as e:
            if e.response.status_code == 429:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
            else:
                raise

3. 超时问题

# 解决方案:增加超时或使用异步处理
def extract_content(url, timeout=30):
    payload = {"url": url, "b": True}
    
    response = requests.post(
        API_ENDPOINT,
        json=payload,
        headers=headers,
        timeout=timeout
    )
    
    return response.json()

开始使用

  1. 注册API访问 — 获得100个免费积分
  2. 在线测试 — 在浏览器中尝试提取
  3. 查看完整文档 — 完整的API参考
  4. 查看定价 — 透明的按使用量计费

相关资源


SearchCans提供可靠、经济的URL内容提取服务,单次提取仅需¥0.0039。开始免费试用 →

标签:

URL提取 API指南 内容提取 开发教程

准备好用 SearchCans 构建你的 AI 应用了吗?

立即体验我们的 SERP API 和 Reader API。每千次调用仅需 ¥0.56 起,无需信用卡即可免费试用。