Web Content Processing
Extract, analyze, and leverage web content at scale with intelligent scraping and processing. This guide covers web scraping use cases, techniques, and best practices for automated content intelligence.
Overview
Web Content Processing Capabilities:
- URL Scraping: Extract content from any public website
- Batch Processing: Process hundreds of URLs simultaneously
- Content Analysis: AI-powered summarization and insights
- Entity Extraction: Identify key people, companies, topics
- Sentiment Analysis: Gauge tone and sentiment
- Trend Detection: Identify patterns across multiple sources
- Competitive Intelligence: Monitor competitor content
- News Monitoring: Track industry developments
Common Use Cases
Competitive Intelligence
Challenge:
- Track 50+ competitor websites
- Monitor product launches, pricing changes, press releases
- Manual monitoring: Time-consuming and incomplete
- Miss important updates
Solution with Alactic:
Step 1: Define monitoring targets
competitors = {
"CompanyA": {
"blog": "https://companya.com/blog",
"press": "https://companya.com/press-releases",
"pricing": "https://companya.com/pricing",
"products": "https://companya.com/products"
},
"CompanyB": {
"blog": "https://companyb.com/blog",
"press": "https://companyb.com/news",
"pricing": "https://companyb.com/plans"
}
# ... 50+ competitors
}
Step 2: Schedule daily scraping
import schedule
import time
def daily_competitor_monitoring():
all_urls = []
for company, urls in competitors.items():
all_urls.extend(urls.values())
# Process in batch
results = process_url_batch(
urls=all_urls,
model="gpt-4o-mini",
analysis_depth="standard"
)
# Detect changes
for result in results:
changes = detect_changes(result)
if changes:
notify_team(changes)
# Schedule
schedule.every().day.at("09:00").do(daily_competitor_monitoring)
while True:
schedule.run_pending()
time.sleep(3600)
Step 3: Change detection
def detect_changes(current_result):
company = current_result["source_company"]
url = current_result["url"]
# Get previous version
previous = get_previous_snapshot(company, url)
if not previous:
return None
changes = {
"content_changed": content_similarity(previous, current_result) < 0.95,
"new_products": detect_new_products(previous, current_result),
"pricing_changes": detect_pricing_changes(previous, current_result),
"new_features": detect_new_features(previous, current_result),
"sentiment_shift": abs(previous["sentiment"]["score"] -
current_result["sentiment"]["score"]) > 0.2
}
# Store current as previous
store_snapshot(company, url, current_result)
return {k: v for k, v in changes.items() if v}
Step 4: Generate intelligence report
def generate_competitive_intelligence_report(changes_this_week):
report = {
"summary": summarize_key_changes(changes_this_week),
"product_launches": [c for c in changes_this_week if "new_products" in c],
"pricing_changes": [c for c in changes_this_week if "pricing_changes" in c],
"feature_updates": [c for c in changes_this_week if "new_features" in c],
"sentiment_analysis": analyze_competitive_sentiment(changes_this_week),
"recommendations": generate_recommendations(changes_this_week)
}
# Send to stakeholders
send_report(report, recipients=["product@company.com", "marketing@company.com"])
return report
Results:
- Monitoring: Continuous vs ad-hoc
- Coverage: 50+ competitors vs 5-10 manually
- Response time: Hours vs weeks
- Cost: $150/month vs $5,000/month for analyst
Best Practices:
- Schedule scraping during off-peak hours
- Use GPT-4o mini for cost efficiency
- Implement change detection algorithms
- Store historical snapshots for trend analysis
Content Aggregation
Challenge:
- Curate content from 200+ industry sources
- Manual curation: 20 hours/week
- Inconsistent coverage
- Difficult to identify best content
Solution with Alactic:
Step 1: Define content sources
content_sources = [
"https://techcrunch.com/category/artificial-intelligence/",
"https://www.theverge.com/ai",
"https://venturebeat.com/category/ai/",
# ... 200+ sources
]
Step 2: Daily content collection
def collect_daily_content():
# Scrape all sources
articles = []
for source in content_sources:
# Get article links from feed
article_links = extract_article_links(source)
articles.extend(article_links[:5]) # Top 5 per source
# Process articles
results = process_url_batch(
urls=articles,
model="gpt-4o-mini",
analysis_depth="standard"
)
return results
Step 3: Content scoring
def score_content_quality(article_analysis):
factors = {
"relevance": calculate_relevance(article_analysis, target_keywords),
"authority": check_source_authority(article_analysis["url"]),
"freshness": calculate_freshness(article_analysis["publish_date"]),
"engagement": estimate_engagement(article_analysis),
"uniqueness": check_uniqueness(article_analysis["summary"])
}
quality_score = (
factors["relevance"] * 0.30 +
factors["authority"] * 0.25 +
factors["freshness"] * 0.20 +
factors["engagement"] * 0.15 +
factors["uniqueness"] * 0.10
)
return quality_score
Step 4: Generate curated digest
def generate_content_digest(articles_data):
# Score all articles
scored_articles = [
{
"article": article,
"score": score_content_quality(article)
}
for article in articles_data
]
# Sort by score
scored_articles.sort(key=lambda x: x["score"], reverse=True)
# Top 20 articles
top_articles = scored_articles[:20]
# Generate digest
digest = {
"date": datetime.now().strftime("%Y-%m-%d"),
"top_articles": [
{
"title": a["article"]["title"],
"url": a["article"]["url"],
"summary": a["article"]["summary"],
"key_points": a["article"]["key_points"],
"quality_score": a["score"]
}
for a in top_articles
],
"topics": extract_trending_topics(top_articles),
"sentiment": analyze_overall_sentiment(top_articles)
}
return digest
Results:
- Curation time: 2 hours vs 20 hours weekly
- Coverage: 200+ sources vs 20-30 manually
- Quality: Data-driven scoring vs subjective
- Consistency: Daily automated vs sporadic
Best Practices:
- Use quality scoring algorithms
- Diversify content sources
- Remove duplicates
- Track engagement metrics
Market Research
Challenge:
- Research emerging market trends
- Analyze 500+ articles, reports, blog posts
- Manual research: 2-4 weeks
- Difficult to identify patterns
Solution with Alactic:
Step 1: Define research query
research_query = {
"topic": "AI in healthcare",
"timeframe": "last 12 months",
"sources": [
"academic_papers",
"industry_reports",
"news_articles",
"blog_posts",
"conference_proceedings"
],
"focus_areas": [
"diagnostic_imaging",
"drug_discovery",
"patient_monitoring",
"clinical_decision_support"
]
}
Step 2: Collect content
def collect_research_content(query):
urls = []
# Academic sources
urls.extend(search_pubmed(query["topic"], query["timeframe"]))
urls.extend(search_arxiv(query["topic"], query["timeframe"]))
# Industry sources
urls.extend(search_gartner(query["topic"]))
urls.extend(search_forrester(query["topic"]))
# News and blogs
urls.extend(search_google_news(query["topic"], query["timeframe"]))
urls.extend(search_medium(query["topic"], query["timeframe"]))
return urls
Step 3: Process and analyze
def analyze_market_research(urls):
# Process all content
results = process_url_batch(
urls=urls,
model="gpt-4o", # Use GPT-4o for research quality
analysis_depth="deep"
)
# Extract insights
insights = []
for result in results:
insights.append({
"url": result["url"],
"source_type": classify_source(result["url"]),
"key_insights": result["key_points"],
"companies_mentioned": result["entities"]["company"],
"technologies": result["entities"]["technology"],
"market_size": extract_market_size(result),
"growth_rate": extract_growth_rate(result),
"challenges": extract_challenges(result),
"opportunities": extract_opportunities(result)
})
return insights
Step 4: Synthesize findings
def synthesize_market_research(insights_data):
synthesis = {
"market_size": {
"current": aggregate_market_size(insights_data, "current"),
"projected": aggregate_market_size(insights_data, "projected"),
"cagr": calculate_average_cagr(insights_data)
},
"key_players": rank_companies(insights_data),
"technologies": rank_technologies(insights_data),
"trends": [
{
"trend": trend,
"mentions": count_mentions(insights_data, trend),
"sentiment": average_sentiment(insights_data, trend)
}
for trend in identify_trends(insights_data)
],
"opportunities": prioritize_opportunities(insights_data),
"challenges": categorize_challenges(insights_data),
"recommendations": generate_recommendations(insights_data)
}
return synthesis
Results:
- Research time: 3 days vs 4 weeks
- Coverage: 500+ sources vs 50-100 manually
- Quantitative insights: Aggregated metrics vs anecdotal
- Confidence: Data-backed vs gut feeling
Best Practices:
- Use GPT-4o for research content (quality critical)
- Enable Deep Analysis for comprehensive extraction
- Cross-reference findings across multiple sources
- Validate key statistics manually
SEO and Content Strategy
Challenge:
- Analyze top-performing content in industry
- Identify content gaps
- Manual analysis: Time-consuming
- Difficult to scale
Solution with Alactic:
Step 1: Identify target keywords
target_keywords = [
"AI document processing",
"automated document analysis",
"intelligent document processing",
"document AI",
# ... 50+ keywords
]
Step 2: Scrape top-ranking content
def scrape_serp_content(keywords):
all_urls = []
for keyword in keywords:
# Get top 10 results
search_results = google_search(keyword, num_results=10)
all_urls.extend([r["url"] for r in search_results])
# Remove duplicates
unique_urls = list(set(all_urls))
# Process content
results = process_url_batch(
urls=unique_urls,
model="gpt-4o-mini",
analysis_depth="standard"
)
return results
Step 3: Content analysis
def analyze_content_performance(content_results):
analysis = []
for content in content_results:
analysis.append({
"url": content["url"],
"word_count": len(content["text"].split()),
"headings": extract_headings(content["text"]),
"topics_covered": content["key_points"],
"entities": content["entities"],
"readability": calculate_readability(content["text"]),
"content_depth": assess_content_depth(content),
"unique_angle": identify_unique_angle(content),
"backlinks": get_backlink_count(content["url"]),
"social_shares": get_social_shares(content["url"])
})
return analysis
Step 4: Identify content gaps
def identify_content_gaps(analyzed_content, our_content):
# Topics covered by competitors
competitor_topics = set()
for content in analyzed_content:
competitor_topics.update(content["topics_covered"])
# Topics we cover
our_topics = set()
for content in our_content:
our_topics.update(content["topics_covered"])
# Gaps
gaps = competitor_topics - our_topics
# Prioritize gaps
prioritized_gaps = [
{
"topic": gap,
"opportunity_score": calculate_opportunity_score(gap, analyzed_content),
"competition_level": assess_competition(gap, analyzed_content),
"search_volume": get_search_volume(gap)
}
for gap in gaps
]
prioritized_gaps.sort(key=lambda x: x["opportunity_score"], reverse=True)
return prioritized_gaps
Step 5: Content recommendations
def generate_content_recommendations(gaps, analyzed_content):
recommendations = []
for gap in gaps[:10]: # Top 10 gaps
# Find best examples
best_examples = find_best_examples(gap["topic"], analyzed_content)
# Generate recommendation
recommendation = {
"topic": gap["topic"],
"opportunity_score": gap["opportunity_score"],
"target_word_count": calculate_ideal_word_count(best_examples),
"recommended_headings": extract_common_headings(best_examples),
"key_points_to_cover": extract_essential_points(best_examples),
"unique_angle": suggest_unique_angle(gap["topic"], analyzed_content),
"internal_links": suggest_internal_links(gap["topic"]),
"cta_suggestions": suggest_ctas(gap["topic"])
}
recommendations.append(recommendation)
return recommendations
Results:
- Analysis time: 1 day vs 2 weeks
- Content gaps identified: 50+ vs 10-15 manually
- Data-driven strategy: Backed by competitor analysis
- Competitive advantage: Faster content production
Best Practices:
- Analyze top 10 results for each target keyword
- Track content performance over time
- Focus on content gaps with high opportunity scores
- Create better, more comprehensive content than competitors
News Monitoring
Challenge:
- Track industry news across 100+ sources
- Identify relevant news quickly
- Manual monitoring: Overwhelming
- Miss critical developments
Solution with Alactic:
Step 1: Define news sources
news_sources = {
"tech": [
"https://techcrunch.com/feed/",
"https://www.theverge.com/rss/index.xml",
"https://venturebeat.com/feed/",
# ... 50+ tech news sources
],
"business": [
"https://www.wsj.com/xml/rss/",
"https://www.ft.com/rss/",
# ... 30+ business news sources
],
"industry_specific": [
# ... 20+ industry-specific sources
]
}
Step 2: Continuous monitoring
import feedparser
from datetime import datetime, timedelta
def monitor_news_feeds():
new_articles = []
for category, feeds in news_sources.items():
for feed_url in feeds:
# Parse RSS feed
feed = feedparser.parse(feed_url)
# Get articles from last hour
cutoff_time = datetime.now() - timedelta(hours=1)
for entry in feed.entries:
published = datetime(*entry.published_parsed[:6])
if published > cutoff_time:
new_articles.append({
"url": entry.link,
"title": entry.title,
"category": category,
"source": feed.feed.title,
"published": published
})
# Process new articles
if new_articles:
process_news_articles(new_articles)
return new_articles
Step 3: Relevance filtering
def filter_relevant_news(articles_data):
relevant_articles = []
for article in articles_data:
relevance_score = calculate_relevance(
article,
keywords=target_keywords,
companies=tracked_companies,
topics=important_topics
)
if relevance_score > 0.7:
relevant_articles.append({
"article": article,
"relevance": relevance_score,
"why_relevant": explain_relevance(article)
})
return relevant_articles
Step 4: Alert routing
def route_news_alerts(relevant_articles):
for article in relevant_articles:
# Determine urgency
urgency = assess_urgency(article)
# Determine recipients
recipients = determine_recipients(article)
# Send alert
if urgency == "high":
send_immediate_alert(article, recipients, channel="slack")
elif urgency == "medium":
add_to_daily_digest(article, recipients)
else:
add_to_weekly_summary(article, recipients)
Results:
- Coverage: 100+ sources vs 10-15 manually
- Latency: Minutes vs hours/days
- Relevance: Filtered vs everything
- Actionability: Routed to right people immediately
Best Practices:
- Monitor RSS feeds every 15-30 minutes
- Use relevance scoring to filter noise
- Route urgent alerts immediately
- Aggregate less urgent news into digests
Advanced Techniques
JavaScript-Heavy Websites
Challenge: Many modern websites use JavaScript to render content, making traditional scraping difficult.
Solution:
Use headless browser option (Enterprise plan):
def scrape_javascript_website(url):
result = process_url(
url=url,
model="gpt-4o-mini",
options={
"render_javascript": True,
"wait_for_selector": ".content-loaded",
"screenshot": True
}
)
return result
When to use:
- Single-page applications (SPAs)
- Dynamic content loading
- Content behind login (with credentials)
- Infinite scroll pages
Rate Limiting and Politeness
Best practices for responsible scraping:
import time
from urllib.parse import urlparse
class PoliteScr aper:
def __init__(self):
self.last_request_time = {}
self.min_delay = 2 # seconds between requests to same domain
def scrape_url(self, url):
domain = urlparse(url).netloc
# Check if we've scraped this domain recently
if domain in self.last_request_time:
elapsed = time.time() - self.last_request_time[domain]
if elapsed < self.min_delay:
time.sleep(self.min_delay - elapsed)
# Scrape
result = process_url(url)
# Update last request time
self.last_request_time[domain] = time.time()
return result
robots.txt Compliance
Always respect robots.txt:
from urllib.robotparser import RobotFileParser
def check_robots_txt(url):
rp = RobotFileParser()
rp.set_url(urlparse(url).scheme + "://" + urlparse(url).netloc + "/robots.txt")
rp.read()
can_fetch = rp.can_fetch("AlacticBot", url)
if not can_fetch:
print(f"Scraping not allowed by robots.txt: {url}")
return False
return True
def scrape_url_politely(url):
if not check_robots_txt(url):
return None
return process_url(url)
Duplicate Detection
Avoid processing duplicate content:
import hashlib
def detect_duplicates(articles):
seen_hashes = set()
unique_articles = []
for article in articles:
# Create content hash
content_hash = hashlib.md5(
article["text"].encode()
).hexdigest()
if content_hash not in seen_hashes:
seen_hashes.add(content_hash)
unique_articles.append(article)
duplicate_count = len(articles) - len(unique_articles)
print(f"Removed {duplicate_count} duplicates")
return unique_articles
Best Practices
Scraping Ethics
1. Respect Terms of Service
- Read and comply with website ToS
- Don't scrape if explicitly prohibited
- Consider contacting site owner for permission
2. Rate Limiting
- Don't overload target servers
- Space out requests (2-5 seconds)
- Use exponential backoff on errors
3. Identify Your Bot
- Use descriptive User-Agent
- Provide contact information
- Example: "AlacticBot/1.0 (contact@yourcompany.com)"
4. Respect robots.txt
- Always check and honor robots.txt
- Don't scrape disallowed paths
Error Handling
Handle common scraping errors:
from requests.exceptions import RequestException, Timeout
def robust_scraping(url):
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
result = process_url(url, timeout=30)
return result
except Timeout:
print(f"Timeout scraping {url}, attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
log_failed_scrape(url, "timeout")
return None
except RequestException as e:
print(f"Error scraping {url}: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
log_failed_scrape(url, str(e))
return None
Content Quality Checks
Validate scraped content:
def validate_scraped_content(result):
checks = {
"has_content": len(result["text"]) > 100,
"not_error_page": "404" not in result["text"].lower(),
"not_captcha": "captcha" not in result["text"].lower(),
"has_meaningful_content": len(result["text"].split()) > 50,
"proper_encoding": check_encoding(result["text"])
}
is_valid = all(checks.values())
if not is_valid:
print(f"Content validation failed: {checks}")
return is_valid
Monitoring and Analytics
Track Scraping Performance
def track_scraping_metrics():
metrics = {
"urls_scraped_today": count_scraped_urls(today),
"success_rate": calculate_success_rate(today),
"average_processing_time": calculate_avg_time(today),
"errors": get_error_breakdown(today),
"content_quality_score": assess_content_quality(today)
}
# Alert if metrics degraded
if metrics["success_rate"] < 0.95:
alert_team("Scraping success rate below threshold")
return metrics
Cost Tracking
def track_scraping_costs():
usage = get_monthly_usage()
costs = {
"urls_scraped": usage["urls_processed"],
"quota_used_percent": (usage["urls_processed"] / usage["url_quota"]) * 100,
"cost_per_url": usage["total_cost"] / usage["urls_processed"],
"projected_monthly_urls": project_monthly_usage(usage)
}
# Alert if approaching quota
if costs["quota_used_percent"] > 80:
alert_team("Approaching URL quota limit")
return costs