Overview
Understanding the sources covered by News API v3 is crucial for ensuring the
comprehensiveness and reliability of your news data analysis. This guide will
walk you through the process of checking source coverage using the /source
endpoint, including how to handle large numbers of sources efficiently.
Before you start
Before you begin, ensure you have:
- An active API key for NewsCatcher News API v3
- Basic knowledge of making API requests
- Python or another tool for making HTTP requests (e.g., cURL, Postman, or a
programming language with HTTP capabilities)
Steps
Understand key parameters
To make a valid request to the /sources
endpoint, use at least one of the
following key parameters:
lang
: The language(s) of the sources.
countries
: The countries where the news publishers are located.
predefined_sources
: Predefined top sources per country.
source_name
: Text to search within source names. You can specify any word,
term, phrase, or outlet name.
source_url
: The domain(s) of the news publication to search for. When
specifying this parameter, you can only use include_additional_info
as an
extra parameter.
To refine your search and obtain addional source info, you can use:
include_additional_info
: Set to true
to get extra details about each
source.
from_rank
and to_rank
: Filter sources by their SEO rank.
For detailed descriptions and usage of all parameters, refer to the
Sources reference documentation.
Construct a query and make an API request
Here’s a Python example demonstrating how to check for sources containing a
specific word:
import requests
import json
API_KEY = "YOUR_API_KEY_HERE"
URL = "https://v3-api.newscatcherapi.com/api/sources"
HEADERS = {"x-api-token": API_KEY}
PAYLOAD = {
"source_name": "sport",
"include_additional_info": True
}
try:
response = requests.post(URL, headers=HEADERS, json=PAYLOAD)
response.raise_for_status()
data = response.json()
print(json.dumps(data, indent=2))
except requests.exceptions.RequestException as e:
print(f"Failed to fetch source information: {e}")
Analyze the response
The API response will include information about the requested source(s). Here’s
an example response:
{
"message": "Maximum sources displayed according to your plan is set to 1000",
"sources": [
{
"name_source": "Sports Illustrated",
"domain_url": "si.com",
"logo": null,
"additional_info": null
},
{
"name_source": "Sportskeeda",
"domain_url": "sportskeeda.com",
"logo": null,
"additional_info": null
}
// ... other sources
],
"user_input": {
"lang": null,
"countries": null,
"predefined_sources": null,
"include_additional_info": true,
"from_rank": null,
"to_rank": null,
"source_name": ["sport"],
"source_url": null
}
}
This response shows a list of sources that include “sport” in their names.
Search by source_name
does not perform an exact match and returns all
sources that contain the specified term anywhere in their names.
Check coverage for specific sources
To check coverage for specific sources, you can use the source_url
parameter
for precise filtering:
PAYLOAD = {
"source_url": ["si.com", "sportskeeda.com"],
"include_additional_info": True
}
You can also combine multiple parameters to narrow down your search. For
example, to find sports-related sources in English-speaking countries:
PAYLOAD = {
"source_name": "sport",
"lang": "en",
"countries": ["US", "GB", "AU"],
"include_additional_info": True
}
The response will include information for each covered source that matches your
criteria.
Handling large numbers of sources
When dealing with a large number of sources, you may have them stored in a file
(e.g., CSV or JSON). Here’s an approach using asynchronous requests to check
which sources are covered by News API v3 and identify those that are not. This
implementation uses aiohttp
and asyncio
libraries for concurrent requests,
providing better performance and scalability.
The API key is stored in an environment variable for security purposes. Set
your NEWSCATCHER_API_KEY
in your environment before running the script.
For example, in a Unix-like terminal (Linux or macOS), you can set it like this:
export NEWSCATCHER_API_KEY="YOUR_API_KEY_HERE"
On Windows Command Prompt, you can use:
set NEWSCATCHER_API_KEY="YOUR_API_KEY_HERE"
import aiohttp
import asyncio
import csv
import logging
from typing import List, Dict, Optional
from tqdm.asyncio import tqdm
import os
# Constants
API_KEY: str = os.getenv("NEWSCATCHER_API_KEY")
if not API_KEY:
raise EnvironmentError("API key not set in environment variables")
URL: str = "https://v3-api.newscatcherapi.com/api/sources"
HEADERS: Dict[str, str] = {"x-api-token": API_KEY}
INPUT_FILE: str = "source_urls.csv"
OUTPUT_FILE: str = "uncovered_sources.csv"
MAX_CONCURRENT_REQUESTS: int = 5 # Set the desired number of concurrent requests
MAX_RETRIES: int = 3 # Number of retries for failed requests
TIMEOUT: int = 30 # Timeout for each request in seconds
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("check_sources.log"), # Log to a file
logging.StreamHandler(), # Also log to console
],
)
async def fetch_sources(
session: aiohttp.ClientSession,
batch: List[str],
headers: Dict[str, str],
retries: int = MAX_RETRIES,
) -> Optional[Dict]:
"""Fetch source coverage in a single batch with retry logic."""
payload: Dict[str, object] = {"source_url": batch, "include_additional_info": True}
for attempt in range(retries):
try:
async with session.post(
URL, headers=headers, json=payload, timeout=TIMEOUT
) as response:
response.raise_for_status()
data: Dict = await response.json()
return data
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logging.error(f"Attempt {attempt + 1}/{retries} failed: {e}")
await asyncio.sleep(2**attempt) # Exponential backoff
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
return None
async def fetch_sources_with_semaphore(
semaphore: asyncio.Semaphore,
session: aiohttp.ClientSession,
batch: List[str],
headers: Dict[str, str],
) -> Optional[Dict]:
"""Fetch sources with a semaphore to limit the number of concurrent requests."""
async with semaphore:
return await fetch_sources(session, batch, headers)
async def check_sources_concurrently(
source_urls: List[str], batch_size: int = 1000
) -> Dict[str, bool]:
"""Check if sources are covered by the API concurrently."""
coverage: Dict[str, bool] = {}
semaphore: asyncio.Semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS)
) as session:
tasks: List[asyncio.Task] = []
num_batches: int = (
len(source_urls) + batch_size - 1
) // batch_size # Calculate the number of batches
# Use tqdm.asyncio for async progress bar
for i in range(0, len(source_urls), batch_size):
batch: List[str] = source_urls[i : i + batch_size]
task: asyncio.Task = fetch_sources_with_semaphore(
semaphore, session, batch, HEADERS
)
tasks.append(task)
# Use tqdm to visualize the progress of batch processing
results: List[Optional[Dict]] = await tqdm.gather(
*tasks, desc="Checking source coverage", unit="batch"
)
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
logging.error(f"Error processing batch {i+1}: {result}")
continue
if result:
# Mark sources as covered
for source in result.get("sources", []):
coverage[source["domain_url"]] = True
# Mark sources not in the response as uncovered
batch = source_urls[i * batch_size : (i + 1) * batch_size]
for url in batch:
if url not in coverage:
coverage[url] = False
logging.info("Finished checking sources.")
return coverage
def read_sources_from_csv(file_path: str) -> List[str]:
"""Read source URLs from a CSV file."""
with open(file_path, "r") as file:
reader = csv.reader(file)
sources: List[str] = [
row[0] for row in reader if row
] # Assuming URLs are in the first column
logging.info(f"Loaded {len(sources)} sources from {file_path}.")
return sources
def write_uncovered_sources(uncovered_sources: List[str], file_path: str) -> None:
"""Write uncovered sources to a CSV file."""
with open(file_path, "w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["Uncovered Source URL"]) # Header
for source in uncovered_sources:
writer.writerow([source])
logging.info(f"Uncovered sources written to {file_path}.")
def main() -> None:
# Read sources from CSV
source_urls: List[str] = read_sources_from_csv(INPUT_FILE)
# Check sources coverage concurrently
coverage: Dict[str, bool] = asyncio.run(check_sources_concurrently(source_urls))
# Identify uncovered sources
uncovered_sources: List[str] = [
url for url, is_covered in coverage.items() if not is_covered
]
# Write uncovered sources to CSV
write_uncovered_sources(uncovered_sources, OUTPUT_FILE)
logging.info(f"Total sources: {len(source_urls)}")
logging.info(f"Covered sources: {len(source_urls) - len(uncovered_sources)}")
logging.info(f"Uncovered sources: {len(uncovered_sources)}")
if __name__ == "__main__":
main()
This script does the following:
- Reads source URLs from a CSV file.
- Asynchronously checks the sources against the API in batches of 1000.
- Identifies which sources are not covered by the API.
- Writes the uncovered sources to a new CSV file.
To use this script:
- Save your list of source URLs in a CSV file named
source_urls.csv
.
- Run the script. It will create a file named
uncovered_sources.csv
containing the URLs not covered by News API v3.
You can then send the uncovered_sources.csv
file to our support team. As News
API v3 is a flexible, corporate-level solution, we can manually add the sources
you need for your application.
This script assumes that your source URLs are in the first column of the input
CSV. Adjust the read_sources_from_csv
function if your file has a different
structure.
Best practices
Use the include_additional_info
parameter to get insights into source
reliability and output volume.
Use source_name
for a broader search and the source_url
for more precise
results.
For large numbers of sources, use the batching method described above to stay
within API limits.
Remember that the API may limit the number of sources returned based on your
subscription plan.
See also