Skip to main content
News API v3 offers extensive historical news data from 2019, but retrieving large volumes requires an understanding of technical aspects. This guide explains how our data is structured and provides best practices for efficiently working with historical data.

Understanding data indexing structure

Our system stores data in monthly indexes. This architecture optimizes the search and retrieval of large volumes of news content. Key implications of this structure:
  • Data is organized into separate monthly indexes.
  • Queries spanning multiple months need to access multiple indexes.
  • Performance is optimal when querying within a single monthly index.
  • Queries across very long time periods (e.g., 5+ years) can cause performance issues.

Technical limitations

While you technically can query data across our entire historical range (2019 to present), doing so in a single request is not recommended for several reasons:

Performance degradation

Queries spanning multiple years require searching across numerous indexes, significantly increasing response time.

Request timeouts

Complex queries combined with long time ranges may time out before completion (default: 30 seconds).

Multi-index complexity

Long time ranges require coordinating searches across multiple monthly indexes.

Limited result access

Long time range queries may miss most relevant historical data, as the API limits responses to 10,000 articles per request.

❌ Incorrect approach

q=financial crisis&from_=2019-01-01&to_=2025-01-01
This query attempts to search approximately 72 monthly indexes at once, which may lead to poor performance or timeout errors (408 Request Timeout). To retrieve historical data while maintaining performance efficiently, follow this systematic approach:
1

Estimate data volume using aggregation

Before retrieving actual articles, use the /aggregation_count endpoint to understand the volume of data matching your query across time periods.Example request:
{
    "q": "your search query",
    "aggregation_by": "month",
    "from_": "2020-01-01",
    "to_": "2020-12-31",
    "lang": "en"
}
Example response:
{
    "aggregations": [
        {
            "aggregation_count": [
                {
                    "time_frame": "2020-01-01 00:00:00",
                    "article_count": 2450
                },
                {
                    "time_frame": "2020-02-01 00:00:00",
                    "article_count": 3120
                }
                // Additional months...
            ]
        }
    ]
}
This step helps you:
  • Identify which time periods have the most relevant content.
  • Determine if your query is too broad or too narrow.
  • Plan your time-chunking retrieval strategy.
  • Calculate if time chunks need further subdivision (if >10,000 articles per chunk).
2

Process data in time chunks

Once you understand the data volume, retrieve articles in appropriate time chunks to avoid potential timeout issues. While longer ranges work, complex queries spanning 30+ days can cause 408 timeout errors.
{
    "q": "your search query",
    "from_": "2020-01-01",
    "to_": "2020-01-31",
    "page_size": 100,
    "page": 1
    // Additional parameters as needed
}
For each time chunk:
  1. Implement pagination to retrieve all articles for the period.
  2. Process and store the data for that period.
  3. Move to the next time chunk only after completely retrieving the current period’s data.
For detailed guidance on implementing pagination, refer to our guide on How to paginate large datasets.

Example: Retrieving historical data

Here’s a practical example showing how to retrieve a week of data using the recommended approach. The same logic scales to retrieve months or years by adjusting the date ranges and aggregation period (day/month):
import datetime
import time
from typing import List, Dict, Any

from newscatcher import NewscatcherApi
from newscatcher.core.api_error import ApiError

import os
from dotenv import load_dotenv

load_dotenv()
API_KEY = os.getenv("NEWSCATCHER_API_KEY")


import datetime
import time
import json
from typing import List, Dict, Any, Optional

from newscatcher import NewscatcherApi
from newscatcher.core.api_error import ApiError


def retrieve_week_of_data(
    client: NewscatcherApi,
    query: str,
    start_date: datetime.datetime,
    end_date: datetime.datetime,
    output_file: Optional[str] = None,
) -> List[Dict[str, Any]]:
    """
    Retrieve a week of historical data using daily aggregation.

    Args:
        client: Configured NewscatcherApi client instance
        query: Search query string
        start_date: Start date for the week
        end_date: End date for the week
        output_file: Filename for JSON output (without .json extension)

    Returns:
        List of all articles retrieved for the entire week
    """
    results = []

    # Step 1: Get daily data volumes
    try:
        aggregation_response = client.aggregation.post(
            q=query,
            from_=start_date,
            to=end_date,
            aggregation_by="day",
            lang=["en"],
        )

        # Log daily volumes for planning
        if aggregation_response.aggregations:
            print("Daily data volumes:")
            aggregation_data = aggregation_response.aggregations[0]["aggregation_count"]
            for day_data in aggregation_data:
                print(
                    f"  {day_data['time_frame']}: {day_data['article_count']} articles"
                )
            print(f"Total articles expected: {aggregation_response.total_hits}")

    except ApiError as e:
        print(f"Error getting aggregation data: {e.status_code} - {e.body}")
        return results

    # Step 2: Process each day in the week
    current_date = start_date.date()
    end_date_only = end_date.date()

    while current_date <= end_date_only:
        # Set time bounds for the current day
        day_start = datetime.datetime.combine(current_date, datetime.time.min)
        day_end = datetime.datetime.combine(current_date, datetime.time.max)

        print(f"Processing {current_date}")

        current_page = 1
        total_pages = 1
        daily_articles = 0

        # Step 3: Paginate through the day's data
        while current_page <= total_pages:
            try:
                response = client.search.post(
                    q=query,
                    from_=day_start,
                    to=day_end,
                    lang=["en"],
                    page=current_page,
                    page_size=100,
                )

                # Add articles to results (store as original JSON/dict)
                if response.articles:
                    # Convert to dict to ensure JSON serialization works
                    for article in response.articles:
                        results.append(article.__dict__)
                    daily_articles += len(response.articles)

                # Get pagination info from response
                total_pages = response.total_pages or 1
                current_page += 1

                print(f"  Retrieved page {current_page - 1} of {total_pages}")

                # Add delay between requests to respect rate limits
                time.sleep(1)

            except ApiError as e:
                if e.status_code == 408:
                    print(
                        "  Request timeout. The time window might contain too many articles."
                    )
                    # For daily windows, this is less likely, but could divide into hours if needed
                elif e.status_code == 429:
                    print("  Rate limit hit. Waiting longer...")
                    time.sleep(5)
                    continue  # Retry the same page
                else:
                    print(f"  API Error ({e.status_code}): {e.body}")
                # Break the pagination loop on non-recoverable errors
                break
            except Exception as e:
                print(f"  Unexpected error: {e}")
                break

        print(
            f"  Completed {current_date}, retrieved {daily_articles} articles for this day"
        )
        print(f"  Total articles so far: {len(results)}")

        # Move to next day
        current_date += datetime.timedelta(days=1)

    # Save results if output file specified
    if output_file and results:
        save_articles_to_json(results, output_file)

    return results


def save_articles_to_json(articles: List[Dict[str, Any]], filename: str):
    """Save articles array to JSON file."""
    json_filename = f"{filename}.json"
    with open(json_filename, "w", encoding="utf-8") as f:
        json.dump(articles, f, indent=2, ensure_ascii=False, default=str)
    print(f"Saved {len(articles)} articles to {json_filename}")


def main():
    """Test the weekly data retrieval function with transport strike query."""
    client = NewscatcherApi(api_key=API_KEY)

    # Define the test week (adjust dates as needed)
    start_date = datetime.datetime(2025, 5, 15)
    end_date = datetime.datetime(2025, 5, 22)

    # Your complex transport strike query
    query = '(airport OR "freight port" OR train) AND (strike OR "union protest" OR "planned closure" OR "worker dispute") AND NOT (past OR historical OR ended)'

    try:
        print(
            f"Testing weekly data retrieval from {start_date.date()} to {end_date.date()}"
        )
        print(f"Query: {query}")
        print("=" * 80)

        # Generate output filename based on date range
        output_file = f"transport_strikes_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"

        articles = retrieve_week_of_data(
            client, query, start_date, end_date, output_file=output_file
        )

        print("=" * 80)
        print(f"SUCCESS: Retrieved {len(articles)} articles total")

        # Optional: Show some sample data
        if articles:
            print("\nSample articles:")
            for i, article in enumerate(articles[:3]):  # Show first 3 articles
                print(f"{i+1}. {article.get('title', 'No title')}")
                print(f"   Published: {article.get('published_date', 'Unknown date')}")
                print(f"   Source: {article.get('name_source', 'Unknown source')}")
                print(f"   URL: {article.get('link', 'No URL')}")
                print()

    except Exception as error:
        print(f"FAILED: {error}")


if __name__ == "__main__":
    main()

Best practices

  • Use specific queries: Narrow your search terms to reduce result volume.
  • Prioritize recent data first: Start with recent periods and work backward if needed.
  • Implement rate limiting: Space out requests to avoid hitting concurrency limits.
  • Handle timeouts gracefully: Implement retry logic with exponential backoff.
  • Monitor performance: Track query response times and adjust your approach as needed.
  • Consider data storage: For large historical analyses, store retrieved data in a database or file system.

Common pitfalls to avoid

PitfallImpactSolution
Querying multiple years at onceSlow performance, timeouts (408 errors)Break queries into monthly chunks
Using overly broad search termsExcessive result volumeRefine query terms to be more specific
Insufficient error handlingFailed data retrievalImplement robust retry and error handling
Underestimating data volumeResource constraintsUse aggregation endpoint to estimate volume first
Requesting too many results per pageSlow response timesUse reasonable page sizes (100-1000)
Improper pagination implementationIncomplete data retrievalFollow our pagination guide
I