import aiohttp
import asyncio
import os
import logging
import json
from typing import Dict, List, Optional, Any
from tqdm.asyncio import tqdm
# Constants
API_KEY: str = os.getenv("NEWSCATCHER_API_KEY")
if not API_KEY:
raise EnvironmentError("API key not set in environment variables")
URL: str = "https://v3-api.newscatcherapi.com/api/search"
HEADERS: Dict[str, str] = {"x-api-token": API_KEY}
PAYLOAD: Dict[str, Any] = {
"q": "artificial intelligence",
"lang": "en",
"page_size": 100,
}
MAX_CONCURRENT_REQUESTS: int = 5 # Set the desired number of concurrent requests
MAX_RETRIES: int = 3 # Number of retries for failed requests
TIMEOUT: int = 30 # Timeout for each request in seconds
OUTPUT_FILE: str = "fetched_articles.json"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("fetch_articles.log"), # Log to a file
logging.StreamHandler(), # Also log to console
],
)
async def fetch_page(
session: aiohttp.ClientSession,
url: str,
headers: Dict[str, str],
payload: Dict[str, Any],
timeout: int = TIMEOUT,
max_retries: int = MAX_RETRIES,
) -> Optional[Dict]:
"""Fetch a single page with retry logic."""
retries: int = 0
while retries < max_retries:
try:
async with session.post(
url, headers=headers, json=payload, timeout=timeout
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientResponseError as e:
retries += 1
except aiohttp.ClientConnectionError:
retries += 1
except asyncio.TimeoutError:
retries += 1
except aiohttp.ClientError as e:
retries += 1
return None # Explicitly return None if all retries fail
async def fetch_page_with_semaphore(
semaphore: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str,
headers: Dict[str, str],
payload: Dict[str, Any],
) -> Optional[Dict]:
"""Fetch a page using a semaphore to limit concurrent requests."""
async with semaphore:
return await fetch_page(session, url, headers, payload)
async def fetch_all_pages_concurrently(
url: str,
headers: Dict[str, str],
initial_payload: Dict[str, Any],
max_concurrent_requests: int = MAX_CONCURRENT_REQUESTS,
) -> List[Dict[str, Any]]:
"""Fetch all pages concurrently using a semaphore to limit the number of requests."""
semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent_requests)
all_articles: List[Dict[str, Any]] = []
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=10)
) as session:
# Fetch the first page to determine the total number of pages
async with semaphore:
initial_payload["page"] = 1
data: Optional[Dict[str, Any]] = await fetch_page(
session, url, headers, initial_payload
)
if not data or "total_pages" not in data or "articles" not in data:
return []
total_pages: int = data["total_pages"]
all_articles.extend(data["articles"])
# Prepare tasks for fetching all remaining pages
tasks: List[asyncio.Task] = []
for page in range(2, total_pages + 1):
payload: Dict[str, Any] = initial_payload.copy()
payload["page"] = page
task: asyncio.Task = fetch_page_with_semaphore(
semaphore, session, url, headers, payload
)
tasks.append(
asyncio.wait_for(task, timeout=60)
) # Increased timeout for tasks
# Execute all tasks concurrently and gather results with tqdm progress bar
results: List[Optional[Dict[str, Any]]] = await tqdm.gather(
*tasks, desc="Fetching pages", unit="page", initial=1, total=total_pages
)
# Process results
for result in results:
if result and "articles" in result:
all_articles.extend(result["articles"])
return all_articles
def main() -> None:
"""Main function to execute pagination and fetch all articles."""
# Fetch all articles
articles: List[Dict[str, Any]] = asyncio.run(
fetch_all_pages_concurrently(URL, HEADERS, PAYLOAD, MAX_CONCURRENT_REQUESTS)
)
# Save fetched data to a JSON file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
logging.info(f"Total articles fetched: {len(articles)}")
logging.info(f"Data saved to {OUTPUT_FILE}")
if __name__ == "__main__":
main()