Python client library for the NewsCatcher News API with async support
pip install newscatcherapi-python-sdk
from newscatcherapi_client import Newscatcher newscatcher = Newscatcher( api_key="YOUR_API_KEY" )
# Regular search results = newscatcher.search.get( q="technology", lang="en", include_nlp_data=True # optional, adds NLP analysis layer ) # Clustered search cluster_results = newscatcher.search.get( q="AI technology", lang="en", clustering_enabled=True, clustering_threshold=0.6, include_nlp_data=True )
headlines = newscatcher.latest_headlines.get( lang="en", countries="US", clustering_enabled=True, include_nlp_data=True )
author_articles = newscatcher.authors.get( author_name="Sam Altman", include_nlp_data=True )
similar = newscatcher.search_similar.get( q="SpaceX launch", include_nlp_data=True )
sources = newscatcher.sources.get( lang="en" )
subscription = newscatcher.subscription.get()
.raw
raw_response = newscatcher.search.raw.get( q="tech", ) print(raw_response.body) print(raw_response.headers) print(raw_response.status) print(raw_response.round_trip_time)
a
import asyncio async def main(): results = await newscatcher.search.aget( q="tech", lang="en" ) print(results) asyncio.run(main())
ApiException
try: response = newscatcher.search.get(q="tech news") except ApiException as e: print(f"Status: {e.status}") print(f"Reason: {e.reason}") print(f"Headers: {e.headers}") print(f"Response time: {e.round_trip_time}")
import asyncio from functools import wraps async def with_retry(operation, max_retries=3, delay=1): for i in range(max_retries): try: return await operation() except ApiException as e: if e.status == 429 and i < max_retries - 1: await asyncio.sleep(delay * (2 ** i)) continue raise raise Exception("Max retries exceeded") # Usage results = await with_retry( lambda: newscatcher.search.aget(q="tech") )
async def get_all_results(newscatcher, query, max_pages=5): results = [] for page in range(1, max_pages + 1): response = await newscatcher.search.aget( q=query, page=page, page_size=100 ) data = response if hasattr(data, 'clusters'): articles = [ article for cluster in data.clusters for article in cluster.articles ] else: articles = data.articles results.extend(articles) if page >= data.total_pages: break return results
Was this page helpful?