Documentation Index
Fetch the complete documentation index at: https://docs.altrina.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
This guide provides comprehensive examples of advanced browser automation scenarios using the Altrina SDK. Each example includes complete code with error handling and best practices.E-Commerce Automation
Price Monitoring System
Monitor product prices across multiple e-commerce sites with alerts.import json
import time
from datetime import datetime
from typing import List, Dict
from altrina_sdk import BrowserAgent, AltrinaClient
from altrina_sdk.exceptions import RateLimitError, TimeoutError
class PriceMonitor:
"""Monitor product prices across multiple sites."""
def __init__(self, api_key: str, alert_threshold: float = 0.1):
self.agent = BrowserAgent(api_key, residential_ip=True)
self.alert_threshold = alert_threshold # 10% price drop alert
self.price_history = {}
def monitor_product(self, product: Dict) -> Dict:
"""Monitor a single product."""
try:
result = self.agent.extract(
url=product["url"],
data_description="product name, current price, availability, and seller name"
)
if result.is_successful:
current_price = self._parse_price(result.output.get("price", ""))
product_name = result.output.get("name", "Unknown")
# Check for price drop
if product["url"] in self.price_history:
last_price = self.price_history[product["url"]]
if current_price < last_price * (1 - self.alert_threshold):
self._send_alert(product_name, last_price, current_price, product["url"])
# Update history
self.price_history[product["url"]] = current_price
return {
"url": product["url"],
"name": product_name,
"price": current_price,
"available": result.output.get("availability", "Unknown"),
"timestamp": datetime.now().isoformat(),
"credits_used": result.credits_used
}
except (RateLimitError, TimeoutError) as e:
print(f"Error monitoring {product['url']}: {e}")
return None
def _parse_price(self, price_str: str) -> float:
"""Parse price from string."""
import re
# Extract numeric value from price string
match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
return float(match.group()) if match else 0.0
def _send_alert(self, product_name: str, old_price: float, new_price: float, url: str):
"""Send price drop alert."""
discount = (old_price - new_price) / old_price * 100
print(f"🔔 PRICE ALERT: {product_name}")
print(f" Price dropped {discount:.1f}% from ${old_price:.2f} to ${new_price:.2f}")
print(f" URL: {url}")
# Here you could send email, SMS, or push notification
def monitor_all(self, products: List[Dict], interval_minutes: int = 60):
"""Continuously monitor all products."""
while True:
results = []
for product in products:
result = self.monitor_product(product)
if result:
results.append(result)
time.sleep(2) # Avoid rate limiting
# Save results
with open(f"price_monitor_{datetime.now().strftime('%Y%m%d')}.json", "a") as f:
json.dump({"timestamp": datetime.now().isoformat(), "results": results}, f)
f.write("\n")
print(f"Monitored {len(results)} products. Next check in {interval_minutes} minutes.")
time.sleep(interval_minutes * 60)
# Usage
monitor = PriceMonitor("YOUR_API_KEY", alert_threshold=0.05)
products = [
{"url": "https://amazon.com/dp/B08N5WRWNW", "name": "Echo Dot"},
{"url": "https://amazon.com/dp/B07FZ8S74R", "name": "Kindle"},
]
# Run monitoring (continuous)
# monitor.monitor_all(products, interval_minutes=30)
# Or single check
result = monitor.monitor_product(products[0])
print(result)
Inventory Tracker
Track product availability and get notified when items are back in stock.import asyncio
from typing import List, Dict
from altrina_sdk import AsyncAltrinaClient
from datetime import datetime
class InventoryTracker:
"""Track product inventory across multiple sites."""
def __init__(self, api_key: str):
self.api_key = api_key
self.out_of_stock_items = set()
async def check_availability(self, product_url: str) -> Dict:
"""Check if a product is available."""
async with AsyncAltrinaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""Go to {product_url} and check:
1. Is the product in stock?
2. How many units are available?
3. Are there any size/color variants available?
4. What's the estimated delivery date?
""",
initial_url=product_url
)
result = await job.wait_for_completion(timeout=60)
if result.is_successful:
return {
"url": product_url,
"in_stock": self._parse_availability(result.output),
"details": result.output,
"checked_at": datetime.now().isoformat()
}
return None
def _parse_availability(self, output: Dict) -> bool:
"""Parse availability from output."""
if isinstance(output, dict):
in_stock_indicators = ["in stock", "available", "add to cart"]
output_str = str(output).lower()
return any(indicator in output_str for indicator in in_stock_indicators)
return False
async def track_multiple(self, products: List[str]) -> List[Dict]:
"""Track multiple products concurrently."""
async with AsyncAltrinaClient(self.api_key) as client:
tasks = []
for url in products:
task = client.run_browser_agent(
f"Check if product at {url} is in stock and get availability details",
initial_url=url
)
tasks.append(task)
jobs = await asyncio.gather(*tasks)
results = await asyncio.gather(*[
job.wait_for_completion(timeout=60) for job in jobs
])
availability = []
for url, result in zip(products, results):
if result.is_successful:
is_available = self._parse_availability(result.output)
# Check for restock
if url in self.out_of_stock_items and is_available:
print(f"🎉 BACK IN STOCK: {url}")
self.out_of_stock_items.remove(url)
elif not is_available:
self.out_of_stock_items.add(url)
availability.append({
"url": url,
"available": is_available,
"details": result.output
})
return availability
# Usage
async def main():
tracker = InventoryTracker("YOUR_API_KEY")
products = [
"https://store.example.com/product1",
"https://store.example.com/product2",
]
results = await tracker.track_multiple(products)
for result in results:
status = "✅ In Stock" if result["available"] else "❌ Out of Stock"
print(f"{result['url']}: {status}")
# asyncio.run(main())
Data Extraction & Web Scraping
Multi-Page Data Extraction
Extract data from paginated results.from altrina_sdk import AltrinaClient, BrowserConfig
from typing import List, Dict
class PaginatedScraper:
"""Scrape data from paginated websites."""
def __init__(self, api_key: str):
self.client = AltrinaClient(api_key)
def scrape_all_pages(
self,
base_url: str,
data_description: str,
max_pages: int = None
) -> List[Dict]:
"""Scrape data from all pages."""
all_data = []
page = 1
with self.client as client:
while True:
if max_pages and page > max_pages:
break
print(f"Scraping page {page}...")
job = client.run_browser_agent(
f"""
1. Go to {base_url} (page {page} if applicable)
2. Extract: {data_description}
3. Check if there's a 'Next' button or page {page + 1}
4. Return the data and whether more pages exist
""",
initial_url=f"{base_url}?page={page}" if page > 1 else base_url,
browser_config=BrowserConfig(
width=1920,
height=1080,
max_duration_minutes=5
)
)
result = job.wait_for_completion(verbose=True)
if result.is_successful:
page_data = result.output.get("data", [])
has_next = result.output.get("has_next_page", False)
all_data.extend(page_data if isinstance(page_data, list) else [page_data])
if not has_next:
print(f"Reached last page (page {page})")
break
page += 1
else:
print(f"Failed to scrape page {page}: {result.error}")
break
return all_data
def scrape_with_infinite_scroll(
self,
url: str,
data_description: str,
scroll_count: int = 5
) -> List[Dict]:
"""Scrape from infinite scroll pages."""
with self.client as client:
job = client.run_browser_agent(
f"""
1. Go to {url}
2. Scroll down {scroll_count} times, waiting for content to load each time
3. After each scroll, extract new items matching: {data_description}
4. Return all unique items found
""",
initial_url=url,
browser_config=BrowserConfig(
max_duration_minutes=10
)
)
result = job.wait_for_completion()
if result.is_successful:
return result.output
else:
print(f"Scraping failed: {result.error}")
return []
# Usage
scraper = PaginatedScraper("YOUR_API_KEY")
# Scrape paginated results
data = scraper.scrape_all_pages(
base_url="https://news.site.com/articles",
data_description="article titles, authors, dates, and URLs",
max_pages=5
)
print(f"Scraped {len(data)} items")
Dynamic Content Extraction
Extract data from JavaScript-heavy sites with dynamic content.from altrina_sdk import BrowserAgent
class DynamicContentExtractor:
"""Extract data from dynamic JavaScript-rendered content."""
def __init__(self, api_key: str):
self.agent = BrowserAgent(api_key, verbose=True)
def extract_spa_content(self, url: str, wait_conditions: List[str]) -> Dict:
"""Extract from Single Page Applications."""
wait_instruction = " AND ".join(wait_conditions)
result = self.agent.run(f"""
1. Navigate to {url}
2. Wait until {wait_instruction}
3. Extract all visible content including:
- Text content
- Image URLs and alt texts
- Link URLs and anchor texts
- Form fields and their values
4. Capture any dynamically loaded data
""", initial_url=url, timeout=120)
return result.output if result.is_successful else None
def extract_after_interaction(
self,
url: str,
interactions: List[str],
target_data: str
) -> Dict:
"""Extract data after performing interactions."""
interaction_steps = "\n".join([
f"{i+1}. {action}" for i, action in enumerate(interactions)
])
result = self.agent.run(f"""
Navigate to {url}
{interaction_steps}
After completing all interactions, extract: {target_data}
""", initial_url=url)
return result.output if result.is_successful else None
# Usage
extractor = DynamicContentExtractor("YOUR_API_KEY")
# Extract from SPA
spa_data = extractor.extract_spa_content(
url="https://spa-app.example.com",
wait_conditions=[
"the loading spinner disappears",
"product cards are visible",
"the price information loads"
]
)
# Extract after interactions
interactive_data = extractor.extract_after_interaction(
url="https://interactive-site.com",
interactions=[
"Click on 'Show More' button",
"Select 'All Categories' from dropdown",
"Click 'Apply Filters' button",
"Wait for results to update"
],
target_data="filtered product list with names, prices, and ratings"
)
Social Media Analytics
Social Media Monitoring
Monitor social media profiles and extract analytics.import asyncio
from datetime import datetime, timedelta
from typing import List, Dict
from altrina_sdk import AsyncAltrinaClient
class SocialMediaMonitor:
"""Monitor social media profiles and content."""
def __init__(self, api_key: str):
self.api_key = api_key
async def monitor_profile(self, platform: str, username: str) -> Dict:
"""Monitor a social media profile."""
platform_urls = {
"twitter": f"https://x.com/{username}",
"instagram": f"https://instagram.com/{username}",
"linkedin": f"https://linkedin.com/in/{username}",
"youtube": f"https://youtube.com/@{username}"
}
url = platform_urls.get(platform.lower())
if not url:
raise ValueError(f"Unsupported platform: {platform}")
async with AsyncAltrinaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""
Go to {url} and extract:
1. Profile statistics (followers, following, posts count)
2. Recent posts (last 5) with:
- Content/caption
- Engagement metrics (likes, comments, shares)
- Post timestamp
3. Profile bio/description
4. Verification status
""",
initial_url=url,
browser_config={"residential_ip": True}
)
result = await job.wait_for_completion(timeout=90)
if result.is_successful:
return {
"platform": platform,
"username": username,
"data": result.output,
"monitored_at": datetime.now().isoformat()
}
return None
async def track_hashtag(self, platform: str, hashtag: str, limit: int = 20) -> List[Dict]:
"""Track posts with specific hashtag."""
hashtag = hashtag.lstrip('#')
async with AsyncAltrinaClient(self.api_key) as client:
job = await client.run_browser_agent(
f"""
Search for #{hashtag} on {platform} and extract the first {limit} posts:
- Post author and username
- Post content
- Engagement metrics
- Post timestamp
- Media type (photo/video/text)
""",
browser_config={"residential_ip": True}
)
result = await job.wait_for_completion(timeout=120)
if result.is_successful:
return result.output
return []
async def analyze_engagement_trends(
self,
profiles: List[Dict[str, str]]
) -> Dict:
"""Analyze engagement trends across multiple profiles."""
all_data = []
async with AsyncAltrinaClient(self.api_key) as client:
tasks = []
for profile in profiles:
task = self.monitor_profile(
profile["platform"],
profile["username"]
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict):
all_data.append(result)
# Analyze trends
trends = {
"total_profiles": len(profiles),
"successful_extractions": len(all_data),
"profiles": all_data,
"analysis_timestamp": datetime.now().isoformat()
}
return trends
# Usage
async def main():
monitor = SocialMediaMonitor("YOUR_API_KEY")
# Monitor single profile
profile_data = await monitor.monitor_profile("twitter", "elonmusk")
print(profile_data)
# Track hashtag
hashtag_posts = await monitor.track_hashtag("twitter", "#AI", limit=10)
print(f"Found {len(hashtag_posts)} posts")
# Analyze multiple profiles
profiles = [
{"platform": "twitter", "username": "openai"},
{"platform": "twitter", "username": "anthropic"},
]
trends = await monitor.analyze_engagement_trends(profiles)
print(trends)
# asyncio.run(main())
Workflow Automation
Multi-Step Form Submission
Automate complex multi-step forms with validation.from altrina_sdk import AltrinaClient
from typing import Dict, List
class FormAutomation:
"""Automate complex form submissions."""
def __init__(self, api_key: str):
self.client = AltrinaClient(api_key)
def submit_multi_step_form(
self,
url: str,
form_data: Dict[str, Dict],
validate_each_step: bool = True
) -> Dict:
"""Submit multi-step form with validation."""
with self.client as client:
steps_instruction = []
for step_name, step_data in form_data.items():
step_fields = "\n".join([
f" - {field}: {value}"
for field, value in step_data.items()
])
step_instruction = f"""
Step '{step_name}':
{step_fields}
"""
if validate_each_step:
step_instruction += "\n - Verify all fields are filled correctly"
step_instruction += "\n - Check for any validation errors"
steps_instruction.append(step_instruction)
full_instruction = f"""
Go to {url} and complete the multi-step form:
{"".join(steps_instruction)}
After completing all steps:
1. Submit the form
2. Capture the confirmation message or number
3. Take a screenshot of the confirmation page
"""
job = client.run_browser_agent(
directive=full_instruction,
initial_url=url,
browser_config={"max_duration_minutes": 15}
)
result = job.wait_for_completion(verbose=True)
if result.is_successful:
return {
"success": True,
"confirmation": result.output,
"credits_used": result.credits_used
}
else:
return {
"success": False,
"error": result.error
}
def fill_dynamic_form(
self,
url: str,
form_rules: List[Dict]
) -> Dict:
"""Fill form with conditional logic."""
rules_instruction = "\n".join([
f"- If {rule['condition']}, then {rule['action']}"
for rule in form_rules
])
result = self.client.run_and_wait(
directive=f"""
Go to {url} and fill the form following these rules:
{rules_instruction}
Submit the form and capture the result.
""",
initial_url=url,
timeout=120
)
return result.output if result.is_successful else None
# Usage
automation = FormAutomation("YOUR_API_KEY")
# Multi-step form
form_data = {
"Personal Information": {
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
"phone": "555-0100"
},
"Address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94105"
},
"Preferences": {
"newsletter": "Yes",
"contact_method": "Email"
}
}
result = automation.submit_multi_step_form(
url="https://example.com/registration",
form_data=form_data,
validate_each_step=True
)
# Dynamic form with conditions
rules = [
{"condition": "country field is 'USA'", "action": "show and fill state field"},
{"condition": "age is over 18", "action": "check 'I agree to terms'"},
{"condition": "subscription type is 'Premium'", "action": "fill payment details"}
]
dynamic_result = automation.fill_dynamic_form(
url="https://example.com/signup",
form_rules=rules
)
Automated Testing
Use Altrina for automated web application testing.from altrina_sdk import BrowserAgent
from typing import List, Dict
import json
class WebAppTester:
"""Automated testing for web applications."""
def __init__(self, api_key: str):
self.agent = BrowserAgent(api_key)
self.test_results = []
def run_test_suite(self, base_url: str, test_cases: List[Dict]) -> Dict:
"""Run a suite of test cases."""
passed = 0
failed = 0
for test in test_cases:
print(f"Running test: {test['name']}")
result = self.agent.run(
f"""
Test: {test['name']}
1. Go to {base_url}{test.get('path', '')}
2. {test['action']}
3. Verify: {test['expected']}
4. Return whether the test passed or failed with details
""",
initial_url=f"{base_url}{test.get('path', '')}",
timeout=60
)
if result.is_successful:
test_passed = self._evaluate_test_result(
result.output,
test['expected']
)
if test_passed:
passed += 1
status = "✅ PASSED"
else:
failed += 1
status = "❌ FAILED"
self.test_results.append({
"test": test['name'],
"status": status,
"details": result.output
})
print(f" {status}")
else:
failed += 1
self.test_results.append({
"test": test['name'],
"status": "❌ ERROR",
"error": result.error
})
# Generate report
report = {
"total_tests": len(test_cases),
"passed": passed,
"failed": failed,
"pass_rate": f"{(passed/len(test_cases)*100):.1f}%",
"results": self.test_results
}
# Save report
with open("test_report.json", "w") as f:
json.dump(report, f, indent=2)
return report
def _evaluate_test_result(self, output: Dict, expected: str) -> bool:
"""Evaluate if test passed based on output."""
output_str = str(output).lower()
expected_lower = expected.lower()
# Simple evaluation - check if expected content is in output
return expected_lower in output_str or "passed" in output_str
def test_user_journey(self, journey: Dict) -> Dict:
"""Test a complete user journey."""
steps = "\n".join([
f"{i+1}. {step}" for i, step in enumerate(journey['steps'])
])
result = self.agent.run(
f"""
Test User Journey: {journey['name']}
Perform these steps in order:
{steps}
After each step, verify it completed successfully.
Report any errors or unexpected behavior.
""",
initial_url=journey['start_url'],
timeout=180
)
return {
"journey": journey['name'],
"success": result.is_successful,
"result": result.output if result.is_successful else result.error
}
# Usage
tester = WebAppTester("YOUR_API_KEY")
# Define test cases
test_cases = [
{
"name": "Homepage Load Test",
"path": "/",
"action": "Check if page loads within 3 seconds",
"expected": "Page loaded successfully"
},
{
"name": "Navigation Test",
"path": "/",
"action": "Click on 'About' link in navigation",
"expected": "About page is displayed"
},
{
"name": "Search Functionality",
"path": "/",
"action": "Search for 'test product' in search bar",
"expected": "Search results are displayed"
},
{
"name": "Mobile Responsiveness",
"path": "/",
"action": "Resize browser to mobile size (375x667)",
"expected": "Mobile menu appears and layout adjusts"
}
]
# Run tests
report = tester.run_test_suite(
base_url="https://example.com",
test_cases=test_cases
)
print(f"\nTest Results: {report['passed']}/{report['total_tests']} passed ({report['pass_rate']})")
# Test user journey
journey = {
"name": "Purchase Flow",
"start_url": "https://shop.example.com",
"steps": [
"Search for 'laptop'",
"Click on the first product",
"Add to cart",
"Go to checkout",
"Fill in test payment details",
"Complete purchase"
]
}
journey_result = tester.test_user_journey(journey)
Performance Optimization
Batch Processing with Rate Limiting
Process large batches efficiently while respecting rate limits.import asyncio
from typing import List, Any, Callable
from altrina_sdk import AsyncAltrinaClient
from altrina_sdk.exceptions import RateLimitError
import time
class BatchProcessor:
"""Process large batches with rate limiting and retries."""
def __init__(
self,
api_key: str,
max_concurrent: int = 5,
rate_limit: int = 10, # requests per minute
max_retries: int = 3
):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.rate_limit = rate_limit
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_times = []
async def _rate_limit_check(self):
"""Check and enforce rate limiting."""
now = time.time()
# Remove requests older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.rate_limit:
# Wait until we can make another request
sleep_time = 60 - (now - self.request_times[0]) + 1
await asyncio.sleep(sleep_time)
self.request_times.append(now)
async def process_item(
self,
client: AsyncAltrinaClient,
item: Any,
processor_func: Callable
) -> Dict:
"""Process a single item with retries."""
async with self.semaphore:
await self._rate_limit_check()
for attempt in range(self.max_retries):
try:
result = await processor_func(client, item)
return {"item": item, "success": True, "result": result}
except RateLimitError as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(e.retry_after or 10)
else:
return {"item": item, "success": False, "error": str(e)}
except Exception as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
return {"item": item, "success": False, "error": str(e)}
async def process_batch(
self,
items: List[Any],
processor_func: Callable,
progress_callback: Callable = None
) -> List[Dict]:
"""Process a batch of items."""
results = []
completed = 0
async with AsyncAltrinaClient(self.api_key) as client:
tasks = []
for item in items:
task = self.process_item(client, item, processor_func)
tasks.append(task)
# Process with progress updates
for task in asyncio.as_completed(tasks):
result = await task
results.append(result)
completed += 1
if progress_callback:
progress_callback(completed, len(items))
return results
# Usage
async def extract_page_data(client: AsyncAltrinaClient, url: str) -> Dict:
"""Example processor function."""
job = await client.run_browser_agent(
f"Extract all text content from {url}",
initial_url=url
)
result = await job.wait_for_completion(timeout=60)
return result.output if result.is_successful else None
async def main():
processor = BatchProcessor(
api_key="YOUR_API_KEY",
max_concurrent=3,
rate_limit=20 # 20 requests per minute
)
urls = [f"https://example.com/page{i}" for i in range(50)]
def progress(completed, total):
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
results = await processor.process_batch(
items=urls,
processor_func=extract_page_data,
progress_callback=progress
)
successful = sum(1 for r in results if r["success"])
print(f"\nCompleted: {successful}/{len(results)} successful")
# asyncio.run(main())
See Also
- Quickstart Guide - Get started quickly
- Error Handling - Handle errors gracefully
- AsyncAltrinaClient - Async operations
- API Reference - REST API documentation