Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.altrina.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

This guide provides comprehensive examples of advanced browser automation scenarios using the Altrina SDK. Each example includes complete code with error handling and best practices.

E-Commerce Automation

Price Monitoring System

Monitor product prices across multiple e-commerce sites with alerts.
import json
import time
from datetime import datetime
from typing import List, Dict
from altrina_sdk import BrowserAgent, AltrinaClient
from altrina_sdk.exceptions import RateLimitError, TimeoutError

class PriceMonitor:
    """Monitor product prices across multiple sites."""
    
    def __init__(self, api_key: str, alert_threshold: float = 0.1):
        self.agent = BrowserAgent(api_key, residential_ip=True)
        self.alert_threshold = alert_threshold  # 10% price drop alert
        self.price_history = {}
    
    def monitor_product(self, product: Dict) -> Dict:
        """Monitor a single product."""
        
        try:
            result = self.agent.extract(
                url=product["url"],
                data_description="product name, current price, availability, and seller name"
            )
            
            if result.is_successful:
                current_price = self._parse_price(result.output.get("price", ""))
                product_name = result.output.get("name", "Unknown")
                
                # Check for price drop
                if product["url"] in self.price_history:
                    last_price = self.price_history[product["url"]]
                    if current_price < last_price * (1 - self.alert_threshold):
                        self._send_alert(product_name, last_price, current_price, product["url"])
                
                # Update history
                self.price_history[product["url"]] = current_price
                
                return {
                    "url": product["url"],
                    "name": product_name,
                    "price": current_price,
                    "available": result.output.get("availability", "Unknown"),
                    "timestamp": datetime.now().isoformat(),
                    "credits_used": result.credits_used
                }
                
        except (RateLimitError, TimeoutError) as e:
            print(f"Error monitoring {product['url']}: {e}")
            return None
    
    def _parse_price(self, price_str: str) -> float:
        """Parse price from string."""
        import re
        # Extract numeric value from price string
        match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
        return float(match.group()) if match else 0.0
    
    def _send_alert(self, product_name: str, old_price: float, new_price: float, url: str):
        """Send price drop alert."""
        discount = (old_price - new_price) / old_price * 100
        print(f"🔔 PRICE ALERT: {product_name}")
        print(f"   Price dropped {discount:.1f}% from ${old_price:.2f} to ${new_price:.2f}")
        print(f"   URL: {url}")
        
        # Here you could send email, SMS, or push notification
    
    def monitor_all(self, products: List[Dict], interval_minutes: int = 60):
        """Continuously monitor all products."""
        
        while True:
            results = []
            for product in products:
                result = self.monitor_product(product)
                if result:
                    results.append(result)
                time.sleep(2)  # Avoid rate limiting
            
            # Save results
            with open(f"price_monitor_{datetime.now().strftime('%Y%m%d')}.json", "a") as f:
                json.dump({"timestamp": datetime.now().isoformat(), "results": results}, f)
                f.write("\n")
            
            print(f"Monitored {len(results)} products. Next check in {interval_minutes} minutes.")
            time.sleep(interval_minutes * 60)

# Usage
monitor = PriceMonitor("YOUR_API_KEY", alert_threshold=0.05)

products = [
    {"url": "https://amazon.com/dp/B08N5WRWNW", "name": "Echo Dot"},
    {"url": "https://amazon.com/dp/B07FZ8S74R", "name": "Kindle"},
]

# Run monitoring (continuous)
# monitor.monitor_all(products, interval_minutes=30)

# Or single check
result = monitor.monitor_product(products[0])
print(result)

Inventory Tracker

Track product availability and get notified when items are back in stock.
import asyncio
from typing import List, Dict
from altrina_sdk import AsyncAltrinaClient
from datetime import datetime

class InventoryTracker:
    """Track product inventory across multiple sites."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.out_of_stock_items = set()
    
    async def check_availability(self, product_url: str) -> Dict:
        """Check if a product is available."""
        
        async with AsyncAltrinaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""Go to {product_url} and check:
                1. Is the product in stock?
                2. How many units are available?
                3. Are there any size/color variants available?
                4. What's the estimated delivery date?
                """,
                initial_url=product_url
            )
            
            result = await job.wait_for_completion(timeout=60)
            
            if result.is_successful:
                return {
                    "url": product_url,
                    "in_stock": self._parse_availability(result.output),
                    "details": result.output,
                    "checked_at": datetime.now().isoformat()
                }
            return None
    
    def _parse_availability(self, output: Dict) -> bool:
        """Parse availability from output."""
        if isinstance(output, dict):
            in_stock_indicators = ["in stock", "available", "add to cart"]
            output_str = str(output).lower()
            return any(indicator in output_str for indicator in in_stock_indicators)
        return False
    
    async def track_multiple(self, products: List[str]) -> List[Dict]:
        """Track multiple products concurrently."""
        
        async with AsyncAltrinaClient(self.api_key) as client:
            tasks = []
            for url in products:
                task = client.run_browser_agent(
                    f"Check if product at {url} is in stock and get availability details",
                    initial_url=url
                )
                tasks.append(task)
            
            jobs = await asyncio.gather(*tasks)
            results = await asyncio.gather(*[
                job.wait_for_completion(timeout=60) for job in jobs
            ])
            
            availability = []
            for url, result in zip(products, results):
                if result.is_successful:
                    is_available = self._parse_availability(result.output)
                    
                    # Check for restock
                    if url in self.out_of_stock_items and is_available:
                        print(f"🎉 BACK IN STOCK: {url}")
                        self.out_of_stock_items.remove(url)
                    elif not is_available:
                        self.out_of_stock_items.add(url)
                    
                    availability.append({
                        "url": url,
                        "available": is_available,
                        "details": result.output
                    })
            
            return availability

# Usage
async def main():
    tracker = InventoryTracker("YOUR_API_KEY")
    
    products = [
        "https://store.example.com/product1",
        "https://store.example.com/product2",
    ]
    
    results = await tracker.track_multiple(products)
    for result in results:
        status = "✅ In Stock" if result["available"] else "❌ Out of Stock"
        print(f"{result['url']}: {status}")

# asyncio.run(main())

Data Extraction & Web Scraping

Multi-Page Data Extraction

Extract data from paginated results.
from altrina_sdk import AltrinaClient, BrowserConfig
from typing import List, Dict

class PaginatedScraper:
    """Scrape data from paginated websites."""
    
    def __init__(self, api_key: str):
        self.client = AltrinaClient(api_key)
    
    def scrape_all_pages(
        self,
        base_url: str,
        data_description: str,
        max_pages: int = None
    ) -> List[Dict]:
        """Scrape data from all pages."""
        
        all_data = []
        page = 1
        
        with self.client as client:
            while True:
                if max_pages and page > max_pages:
                    break
                
                print(f"Scraping page {page}...")
                
                job = client.run_browser_agent(
                    f"""
                    1. Go to {base_url} (page {page} if applicable)
                    2. Extract: {data_description}
                    3. Check if there's a 'Next' button or page {page + 1}
                    4. Return the data and whether more pages exist
                    """,
                    initial_url=f"{base_url}?page={page}" if page > 1 else base_url,
                    browser_config=BrowserConfig(
                        width=1920,
                        height=1080,
                        max_duration_minutes=5
                    )
                )
                
                result = job.wait_for_completion(verbose=True)
                
                if result.is_successful:
                    page_data = result.output.get("data", [])
                    has_next = result.output.get("has_next_page", False)
                    
                    all_data.extend(page_data if isinstance(page_data, list) else [page_data])
                    
                    if not has_next:
                        print(f"Reached last page (page {page})")
                        break
                    
                    page += 1
                else:
                    print(f"Failed to scrape page {page}: {result.error}")
                    break
        
        return all_data
    
    def scrape_with_infinite_scroll(
        self,
        url: str,
        data_description: str,
        scroll_count: int = 5
    ) -> List[Dict]:
        """Scrape from infinite scroll pages."""
        
        with self.client as client:
            job = client.run_browser_agent(
                f"""
                1. Go to {url}
                2. Scroll down {scroll_count} times, waiting for content to load each time
                3. After each scroll, extract new items matching: {data_description}
                4. Return all unique items found
                """,
                initial_url=url,
                browser_config=BrowserConfig(
                    max_duration_minutes=10
                )
            )
            
            result = job.wait_for_completion()
            
            if result.is_successful:
                return result.output
            else:
                print(f"Scraping failed: {result.error}")
                return []

# Usage
scraper = PaginatedScraper("YOUR_API_KEY")

# Scrape paginated results
data = scraper.scrape_all_pages(
    base_url="https://news.site.com/articles",
    data_description="article titles, authors, dates, and URLs",
    max_pages=5
)

print(f"Scraped {len(data)} items")

Dynamic Content Extraction

Extract data from JavaScript-heavy sites with dynamic content.
from altrina_sdk import BrowserAgent

class DynamicContentExtractor:
    """Extract data from dynamic JavaScript-rendered content."""
    
    def __init__(self, api_key: str):
        self.agent = BrowserAgent(api_key, verbose=True)
    
    def extract_spa_content(self, url: str, wait_conditions: List[str]) -> Dict:
        """Extract from Single Page Applications."""
        
        wait_instruction = " AND ".join(wait_conditions)
        
        result = self.agent.run(f"""
            1. Navigate to {url}
            2. Wait until {wait_instruction}
            3. Extract all visible content including:
               - Text content
               - Image URLs and alt texts
               - Link URLs and anchor texts
               - Form fields and their values
            4. Capture any dynamically loaded data
        """, initial_url=url, timeout=120)
        
        return result.output if result.is_successful else None
    
    def extract_after_interaction(
        self,
        url: str,
        interactions: List[str],
        target_data: str
    ) -> Dict:
        """Extract data after performing interactions."""
        
        interaction_steps = "\n".join([
            f"{i+1}. {action}" for i, action in enumerate(interactions)
        ])
        
        result = self.agent.run(f"""
            Navigate to {url}
            {interaction_steps}
            After completing all interactions, extract: {target_data}
        """, initial_url=url)
        
        return result.output if result.is_successful else None

# Usage
extractor = DynamicContentExtractor("YOUR_API_KEY")

# Extract from SPA
spa_data = extractor.extract_spa_content(
    url="https://spa-app.example.com",
    wait_conditions=[
        "the loading spinner disappears",
        "product cards are visible",
        "the price information loads"
    ]
)

# Extract after interactions
interactive_data = extractor.extract_after_interaction(
    url="https://interactive-site.com",
    interactions=[
        "Click on 'Show More' button",
        "Select 'All Categories' from dropdown",
        "Click 'Apply Filters' button",
        "Wait for results to update"
    ],
    target_data="filtered product list with names, prices, and ratings"
)

Social Media Analytics

Social Media Monitoring

Monitor social media profiles and extract analytics.
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict
from altrina_sdk import AsyncAltrinaClient

class SocialMediaMonitor:
    """Monitor social media profiles and content."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    async def monitor_profile(self, platform: str, username: str) -> Dict:
        """Monitor a social media profile."""
        
        platform_urls = {
            "twitter": f"https://x.com/{username}",
            "instagram": f"https://instagram.com/{username}",
            "linkedin": f"https://linkedin.com/in/{username}",
            "youtube": f"https://youtube.com/@{username}"
        }
        
        url = platform_urls.get(platform.lower())
        if not url:
            raise ValueError(f"Unsupported platform: {platform}")
        
        async with AsyncAltrinaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""
                Go to {url} and extract:
                1. Profile statistics (followers, following, posts count)
                2. Recent posts (last 5) with:
                   - Content/caption
                   - Engagement metrics (likes, comments, shares)
                   - Post timestamp
                3. Profile bio/description
                4. Verification status
                """,
                initial_url=url,
                browser_config={"residential_ip": True}
            )
            
            result = await job.wait_for_completion(timeout=90)
            
            if result.is_successful:
                return {
                    "platform": platform,
                    "username": username,
                    "data": result.output,
                    "monitored_at": datetime.now().isoformat()
                }
            return None
    
    async def track_hashtag(self, platform: str, hashtag: str, limit: int = 20) -> List[Dict]:
        """Track posts with specific hashtag."""
        
        hashtag = hashtag.lstrip('#')
        
        async with AsyncAltrinaClient(self.api_key) as client:
            job = await client.run_browser_agent(
                f"""
                Search for #{hashtag} on {platform} and extract the first {limit} posts:
                - Post author and username
                - Post content
                - Engagement metrics
                - Post timestamp
                - Media type (photo/video/text)
                """,
                browser_config={"residential_ip": True}
            )
            
            result = await job.wait_for_completion(timeout=120)
            
            if result.is_successful:
                return result.output
            return []
    
    async def analyze_engagement_trends(
        self,
        profiles: List[Dict[str, str]]
    ) -> Dict:
        """Analyze engagement trends across multiple profiles."""
        
        all_data = []
        
        async with AsyncAltrinaClient(self.api_key) as client:
            tasks = []
            for profile in profiles:
                task = self.monitor_profile(
                    profile["platform"],
                    profile["username"]
                )
                tasks.append(task)
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            for result in results:
                if isinstance(result, dict):
                    all_data.append(result)
        
        # Analyze trends
        trends = {
            "total_profiles": len(profiles),
            "successful_extractions": len(all_data),
            "profiles": all_data,
            "analysis_timestamp": datetime.now().isoformat()
        }
        
        return trends

# Usage
async def main():
    monitor = SocialMediaMonitor("YOUR_API_KEY")
    
    # Monitor single profile
    profile_data = await monitor.monitor_profile("twitter", "elonmusk")
    print(profile_data)
    
    # Track hashtag
    hashtag_posts = await monitor.track_hashtag("twitter", "#AI", limit=10)
    print(f"Found {len(hashtag_posts)} posts")
    
    # Analyze multiple profiles
    profiles = [
        {"platform": "twitter", "username": "openai"},
        {"platform": "twitter", "username": "anthropic"},
    ]
    trends = await monitor.analyze_engagement_trends(profiles)
    print(trends)

# asyncio.run(main())

Workflow Automation

Multi-Step Form Submission

Automate complex multi-step forms with validation.
from altrina_sdk import AltrinaClient
from typing import Dict, List

class FormAutomation:
    """Automate complex form submissions."""
    
    def __init__(self, api_key: str):
        self.client = AltrinaClient(api_key)
    
    def submit_multi_step_form(
        self,
        url: str,
        form_data: Dict[str, Dict],
        validate_each_step: bool = True
    ) -> Dict:
        """Submit multi-step form with validation."""
        
        with self.client as client:
            steps_instruction = []
            
            for step_name, step_data in form_data.items():
                step_fields = "\n".join([
                    f"   - {field}: {value}"
                    for field, value in step_data.items()
                ])
                
                step_instruction = f"""
                Step '{step_name}':
                {step_fields}
                """
                
                if validate_each_step:
                    step_instruction += "\n   - Verify all fields are filled correctly"
                    step_instruction += "\n   - Check for any validation errors"
                
                steps_instruction.append(step_instruction)
            
            full_instruction = f"""
            Go to {url} and complete the multi-step form:
            
            {"".join(steps_instruction)}
            
            After completing all steps:
            1. Submit the form
            2. Capture the confirmation message or number
            3. Take a screenshot of the confirmation page
            """
            
            job = client.run_browser_agent(
                directive=full_instruction,
                initial_url=url,
                browser_config={"max_duration_minutes": 15}
            )
            
            result = job.wait_for_completion(verbose=True)
            
            if result.is_successful:
                return {
                    "success": True,
                    "confirmation": result.output,
                    "credits_used": result.credits_used
                }
            else:
                return {
                    "success": False,
                    "error": result.error
                }
    
    def fill_dynamic_form(
        self,
        url: str,
        form_rules: List[Dict]
    ) -> Dict:
        """Fill form with conditional logic."""
        
        rules_instruction = "\n".join([
            f"- If {rule['condition']}, then {rule['action']}"
            for rule in form_rules
        ])
        
        result = self.client.run_and_wait(
            directive=f"""
            Go to {url} and fill the form following these rules:
            {rules_instruction}
            
            Submit the form and capture the result.
            """,
            initial_url=url,
            timeout=120
        )
        
        return result.output if result.is_successful else None

# Usage
automation = FormAutomation("YOUR_API_KEY")

# Multi-step form
form_data = {
    "Personal Information": {
        "first_name": "John",
        "last_name": "Doe",
        "email": "john.doe@example.com",
        "phone": "555-0100"
    },
    "Address": {
        "street": "123 Main St",
        "city": "San Francisco",
        "state": "CA",
        "zip": "94105"
    },
    "Preferences": {
        "newsletter": "Yes",
        "contact_method": "Email"
    }
}

result = automation.submit_multi_step_form(
    url="https://example.com/registration",
    form_data=form_data,
    validate_each_step=True
)

# Dynamic form with conditions
rules = [
    {"condition": "country field is 'USA'", "action": "show and fill state field"},
    {"condition": "age is over 18", "action": "check 'I agree to terms'"},
    {"condition": "subscription type is 'Premium'", "action": "fill payment details"}
]

dynamic_result = automation.fill_dynamic_form(
    url="https://example.com/signup",
    form_rules=rules
)

Automated Testing

Use Altrina for automated web application testing.
from altrina_sdk import BrowserAgent
from typing import List, Dict
import json

class WebAppTester:
    """Automated testing for web applications."""
    
    def __init__(self, api_key: str):
        self.agent = BrowserAgent(api_key)
        self.test_results = []
    
    def run_test_suite(self, base_url: str, test_cases: List[Dict]) -> Dict:
        """Run a suite of test cases."""
        
        passed = 0
        failed = 0
        
        for test in test_cases:
            print(f"Running test: {test['name']}")
            
            result = self.agent.run(
                f"""
                Test: {test['name']}
                1. Go to {base_url}{test.get('path', '')}
                2. {test['action']}
                3. Verify: {test['expected']}
                4. Return whether the test passed or failed with details
                """,
                initial_url=f"{base_url}{test.get('path', '')}",
                timeout=60
            )
            
            if result.is_successful:
                test_passed = self._evaluate_test_result(
                    result.output,
                    test['expected']
                )
                
                if test_passed:
                    passed += 1
                    status = "✅ PASSED"
                else:
                    failed += 1
                    status = "❌ FAILED"
                
                self.test_results.append({
                    "test": test['name'],
                    "status": status,
                    "details": result.output
                })
                
                print(f"  {status}")
            else:
                failed += 1
                self.test_results.append({
                    "test": test['name'],
                    "status": "❌ ERROR",
                    "error": result.error
                })
        
        # Generate report
        report = {
            "total_tests": len(test_cases),
            "passed": passed,
            "failed": failed,
            "pass_rate": f"{(passed/len(test_cases)*100):.1f}%",
            "results": self.test_results
        }
        
        # Save report
        with open("test_report.json", "w") as f:
            json.dump(report, f, indent=2)
        
        return report
    
    def _evaluate_test_result(self, output: Dict, expected: str) -> bool:
        """Evaluate if test passed based on output."""
        output_str = str(output).lower()
        expected_lower = expected.lower()
        
        # Simple evaluation - check if expected content is in output
        return expected_lower in output_str or "passed" in output_str
    
    def test_user_journey(self, journey: Dict) -> Dict:
        """Test a complete user journey."""
        
        steps = "\n".join([
            f"{i+1}. {step}" for i, step in enumerate(journey['steps'])
        ])
        
        result = self.agent.run(
            f"""
            Test User Journey: {journey['name']}
            
            Perform these steps in order:
            {steps}
            
            After each step, verify it completed successfully.
            Report any errors or unexpected behavior.
            """,
            initial_url=journey['start_url'],
            timeout=180
        )
        
        return {
            "journey": journey['name'],
            "success": result.is_successful,
            "result": result.output if result.is_successful else result.error
        }

# Usage
tester = WebAppTester("YOUR_API_KEY")

# Define test cases
test_cases = [
    {
        "name": "Homepage Load Test",
        "path": "/",
        "action": "Check if page loads within 3 seconds",
        "expected": "Page loaded successfully"
    },
    {
        "name": "Navigation Test",
        "path": "/",
        "action": "Click on 'About' link in navigation",
        "expected": "About page is displayed"
    },
    {
        "name": "Search Functionality",
        "path": "/",
        "action": "Search for 'test product' in search bar",
        "expected": "Search results are displayed"
    },
    {
        "name": "Mobile Responsiveness",
        "path": "/",
        "action": "Resize browser to mobile size (375x667)",
        "expected": "Mobile menu appears and layout adjusts"
    }
]

# Run tests
report = tester.run_test_suite(
    base_url="https://example.com",
    test_cases=test_cases
)

print(f"\nTest Results: {report['passed']}/{report['total_tests']} passed ({report['pass_rate']})")

# Test user journey
journey = {
    "name": "Purchase Flow",
    "start_url": "https://shop.example.com",
    "steps": [
        "Search for 'laptop'",
        "Click on the first product",
        "Add to cart",
        "Go to checkout",
        "Fill in test payment details",
        "Complete purchase"
    ]
}

journey_result = tester.test_user_journey(journey)

Performance Optimization

Batch Processing with Rate Limiting

Process large batches efficiently while respecting rate limits.
import asyncio
from typing import List, Any, Callable
from altrina_sdk import AsyncAltrinaClient
from altrina_sdk.exceptions import RateLimitError
import time

class BatchProcessor:
    """Process large batches with rate limiting and retries."""
    
    def __init__(
        self,
        api_key: str,
        max_concurrent: int = 5,
        rate_limit: int = 10,  # requests per minute
        max_retries: int = 3
    ):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.rate_limit = rate_limit
        self.max_retries = max_retries
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.request_times = []
    
    async def _rate_limit_check(self):
        """Check and enforce rate limiting."""
        now = time.time()
        # Remove requests older than 1 minute
        self.request_times = [t for t in self.request_times if now - t < 60]
        
        if len(self.request_times) >= self.rate_limit:
            # Wait until we can make another request
            sleep_time = 60 - (now - self.request_times[0]) + 1
            await asyncio.sleep(sleep_time)
        
        self.request_times.append(now)
    
    async def process_item(
        self,
        client: AsyncAltrinaClient,
        item: Any,
        processor_func: Callable
    ) -> Dict:
        """Process a single item with retries."""
        
        async with self.semaphore:
            await self._rate_limit_check()
            
            for attempt in range(self.max_retries):
                try:
                    result = await processor_func(client, item)
                    return {"item": item, "success": True, "result": result}
                    
                except RateLimitError as e:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(e.retry_after or 10)
                    else:
                        return {"item": item, "success": False, "error": str(e)}
                        
                except Exception as e:
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    else:
                        return {"item": item, "success": False, "error": str(e)}
    
    async def process_batch(
        self,
        items: List[Any],
        processor_func: Callable,
        progress_callback: Callable = None
    ) -> List[Dict]:
        """Process a batch of items."""
        
        results = []
        completed = 0
        
        async with AsyncAltrinaClient(self.api_key) as client:
            tasks = []
            for item in items:
                task = self.process_item(client, item, processor_func)
                tasks.append(task)
            
            # Process with progress updates
            for task in asyncio.as_completed(tasks):
                result = await task
                results.append(result)
                completed += 1
                
                if progress_callback:
                    progress_callback(completed, len(items))
        
        return results

# Usage
async def extract_page_data(client: AsyncAltrinaClient, url: str) -> Dict:
    """Example processor function."""
    job = await client.run_browser_agent(
        f"Extract all text content from {url}",
        initial_url=url
    )
    result = await job.wait_for_completion(timeout=60)
    return result.output if result.is_successful else None

async def main():
    processor = BatchProcessor(
        api_key="YOUR_API_KEY",
        max_concurrent=3,
        rate_limit=20  # 20 requests per minute
    )
    
    urls = [f"https://example.com/page{i}" for i in range(50)]
    
    def progress(completed, total):
        print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
    
    results = await processor.process_batch(
        items=urls,
        processor_func=extract_page_data,
        progress_callback=progress
    )
    
    successful = sum(1 for r in results if r["success"])
    print(f"\nCompleted: {successful}/{len(results)} successful")

# asyncio.run(main())

See Also