> ## Documentation Index
> Fetch the complete documentation index at: https://newscatcherinc-docs.mintlify.site/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Python SDK

> Python client library for CatchAll API

Python SDK provides access to the CatchAll API from Python applications with
support for both synchronous and asynchronous operations.

## Installation

<Tabs>
  <Tab title="pip">
    ```bash theme={null}
    pip install newscatcher-catchall-sdk
    ```
  </Tab>

  <Tab title="poetry">
    ```bash theme={null}
    poetry add newscatcher-catchall-sdk
    ```
  </Tab>

  <Tab title="pipenv">
    ```bash theme={null}
    pipenv install newscatcher-catchall-sdk
    ```
  </Tab>
</Tabs>

## Quickstart

Get started with CatchAll in three steps:

<Steps>
  <Step title="Initialize the client">
    ```python theme={null}
    from newscatcher_catchall import CatchAllApi

    client = CatchAllApi(api_key="YOUR_API_KEY")
    ```
  </Step>

  <Step title="Create a job">
    ```python theme={null}
    job = client.jobs.create_job(
        query="AI company acquisitions",
        limit=10,
    )
    job_id = job.job_id
    ```
  </Step>

  <Step title="Wait and retrieve results">
    ```python theme={null}
    import time

    POLL_INTERVAL_SECONDS = 60

    # Poll for completion
    while True:
        status = client.jobs.get_job_status(job_id)
        if status.status == "completed":
            break
        time.sleep(POLL_INTERVAL_SECONDS)

    # Get results
    results = client.jobs.get_job_results(job_id)
    print(f"Found {results.valid_records} valid records")
    ```
  </Step>
</Steps>

<Note>
  Jobs process asynchronously and typically complete in 10-15 minutes. See the
  [Quickstart](/web-search-api/get-started/quickstart) for a complete walkthrough.
</Note>

## Working with jobs

<Tabs>
  <Tab title="Get suggestions">
    Get suggested validators, enrichments, and date ranges before creating a job:

    ```python theme={null}
    import json
    from newscatcher_catchall import CatchAllApi

    client = CatchAllApi(api_key="YOUR_API_KEY")

    suggestions = client.jobs.initialize(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details"
    )

    print(json.dumps(suggestions.model_dump(), indent=2, default=str))
    ```

    <Expandable title="suggestions response">
      ```json theme={null}
      {
        "validators": [
          {
            "name": "is_acquisition_event",
            "description": "true if article describes a completed or announced acquisition",
            "type": "boolean"
          },
          {
            "name": "involves_ai_company",
            "description": "true if acquiring or acquired company is in AI sector",
            "type": "boolean"
          }
        ],
        "enrichments": [
          {
            "name": "acquirer_company",
            "description": "Extract the acquiring company name",
            "type": "company"
          },
          {
            "name": "acquired_company",
            "description": "Extract the acquired company name",
            "type": "company"
          },
          {
            "name": "deal_value",
            "description": "Extract acquisition price if mentioned",
            "type": "number"
          },
          {
            "name": "announcement_date",
            "description": "Extract date of announcement",
            "type": "date"
          },
          {
            "name": "acquirer_details",
            "description": "Extract details about the acquiring company",
            "type": "text"
          }
        ],
        "start_date": "2026-02-01T14:12:57.292205+00:00",
        "end_date": "2026-02-06T14:12:57.292205+00:00",
        "date_modification_message": [
          "No dates were provided; using a default window of 5 days (2026-02-01 to 2026-02-06)."
        ]
      }
      ```
    </Expandable>

    <Tip>
      To learn more, see the [Initialize endpoint](/web-search-api/api-reference/jobs/initialize-job).
    </Tip>
  </Tab>

  <Tab title="Create and track">
    Submit a query and track its progress:

    ```python theme={null}
    import time

    POLL_INTERVAL_SECONDS = 60

    # Create job with custom validators and enrichments
    job = client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
        limit=10,
        validators=[
            {
                "name": "is_acquisition_event",
                "description": "true if article describes a completed or announced acquisition",
                "type": "boolean"
            }
        ],
        enrichments=[
            {
                "name": "acquirer_company",
                "description": "Extract the acquiring company name",
                "type": "company"
            },
            {
                "name": "acquired_company",
                "description": "Extract the acquired company name",
                "type": "company"
            },
            {
                "name": "deal_value",
                "description": "Extract acquisition price if mentioned",
                "type": "number"
            }
        ]
    )
    print(f"Job created: {job.job_id}")

    # Monitor progress
    job_id = job.job_id
    while True:
        status = client.jobs.get_job_status(job_id)

        if status.status == "completed":
            break

        current_step = next((s for s in status.steps if not s.completed), None)
        if current_step:
            print(f"Step {current_step.order}/7: {current_step.status}")

        time.sleep(POLL_INTERVAL_SECONDS)

    # Retrieve results
    results = client.jobs.get_job_results(job_id)
    print(f"\nFound {results.valid_records} valid records")
    for record in results.all_records:
        print(f"  {record.record_title}")
    ```

    <Note>
      Validators and enrichments are optional. If not provided, the system
      generates them automatically based on your query.
    </Note>
  </Tab>

  <Tab title="Continue jobs">
    Extend processing limits for completed jobs:

    ```python theme={null}
    POLL_INTERVAL_SECONDS = 60

    # Continue job to process more records
    continued = client.jobs.continue_job(
        job_id=job_id,
        new_limit=50,
    )
    print(f"Continued: {continued.previous_limit} -> {continued.new_limit} records")

    # Wait for completion
    while True:
        status = client.jobs.get_job_status(job_id)
        if status.status == "completed":
            break
        time.sleep(POLL_INTERVAL_SECONDS)

    # Get final results
    final_results = client.jobs.get_job_results(job_id)
    print(f"Total: {final_results.valid_records} valid records")
    ```

    <Tip>
      Use `limit` parameter when creating jobs to start with fewer records for quick testing.
      Continue the job if you need more records after reviewing initial results.
    </Tip>
  </Tab>

  <Tab title="Early results">
    Retrieve partial results during the enriching stage:

    ```python theme={null}
    POLL_INTERVAL_SECONDS = 60

    while True:
        status = client.jobs.get_job_status(job_id)

        if status.status in ["enriching", "completed"]:
            results = client.jobs.get_job_results(job_id)
            
            if results.valid_records is not None:
                print(f"Progress: {results.progress_validated}/{results.candidate_records} validated, "
                      f"{results.valid_records} valid")

            if status.status == "completed":
                break

        time.sleep(POLL_INTERVAL_SECONDS)
    ```
  </Tab>

  <Tab title="List jobs">
    Retrieve all jobs created by your account:

    ```python theme={null}
    jobs = client.jobs.get_user_jobs()

    for job in jobs:
        print(f"Job {job.job_id}: {job.query} ({job.status})")
    ```
  </Tab>
</Tabs>

<Accordion title="Complete example with all features">
  ```python theme={null}
  from newscatcher_catchall import CatchAllApi
  from newscatcher_catchall.core.api_error import ApiError
  import time

  POLL_INTERVAL_SECONDS = 60

  client = CatchAllApi(api_key="YOUR_API_KEY")

  try:
      # Create job with custom enrichments
      job = client.jobs.create_job(
          query="AI company acquisitions",
          context="Focus on deal size and acquiring company details",
          limit=10,
          enrichments=[
              {
                  "name": "acquirer_company",
                  "description": "Extract the acquiring company name",
                  "type": "company"
              },
              {
                  "name": "deal_value",
                  "description": "Extract acquisition price if mentioned",
                  "type": "number"
              }
          ]
      )
      job_id = job.job_id
      print(f"Job created: {job_id}")

      # Poll with early results access
      while True:
          status = client.jobs.get_job_status(job_id)

          if status.status in ["enriching", "completed"]:
              results = client.jobs.get_job_results(job_id)
              if results.valid_records is not None:
                  print(f"Progress: {results.valid_records} valid records")

              if status.status == "completed":
                  break

          time.sleep(POLL_INTERVAL_SECONDS)

      # Continue if needed
      if results.valid_records >= 10:
          client.jobs.continue_job(job_id=job_id, new_limit=50)
          
          while True:
              status = client.jobs.get_job_status(job_id)
              if status.status == "completed":
                  break
              time.sleep(POLL_INTERVAL_SECONDS)
          
          results = client.jobs.get_job_results(job_id)

      # Display results
      print(f"\nFinal: {results.valid_records} valid records")
      for record in results.all_records:
          print(f"  {record.record_title}")

  except ApiError as e:
      print(f"Status: {e.status_code}")
      print(f"Error: {e.body}")
  ```
</Accordion>

## Working with monitors

Automate recurring queries with scheduled execution.

<Tabs>
  <Tab title="Create monitor">
    Create a monitor from a completed job:

    ```python theme={null}
    monitor = client.monitors.create_monitor(
        reference_job_id=job_id,
        schedule="every day at 12 PM UTC",
        webhook_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
    )
    print(f"Monitor created: {monitor.monitor_id}")
    ```

    <Note>
      Monitors require a minimum 24-hour interval between executions. Learn more in the
      [Monitors documentation](/web-search-api/concepts/monitors).
    </Note>
  </Tab>

  <Tab title="Update monitor">
    Update webhook configuration for an existing monitor:

    ```python theme={null}
    updated = client.monitors.update_monitor(
        monitor_id=monitor.monitor_id,
        webhook_ids=["b2c3d4e5-f6a7-8901-bcde-f12345678901"],
        limit=100,
    )
    print(f"Monitor updated: {updated.status}")
    ```
  </Tab>

  <Tab title="Pause/Resume">
    Control monitor execution:

    ```python theme={null}
    # Pause monitor
    client.monitors.disable_monitor(monitor.monitor_id)
    print("Monitor paused")

    # Resume monitor
    client.monitors.enable_monitor(monitor.monitor_id)
    print("Monitor resumed")
    ```
  </Tab>

  <Tab title="List monitors">
    Retrieve all monitors for your account:

    ```python theme={null}
    monitors = client.monitors.list_monitors()

    print(f"Total monitors: {monitors.total_monitors}")
    for m in monitors.monitors:
        status = "active" if m.enabled else "paused"
        print(f"{m.monitor_id}: {m.reference_job_query} ({status})")
    ```
  </Tab>

  <Tab title="Retrieve results">
    Access aggregated results from all monitor executions:

    ```python theme={null}
    # List execution history
    jobs = client.monitors.list_monitor_jobs(
        monitor_id=monitor.monitor_id,
        sort="desc",
    )
    print(f"Monitor executed {jobs.total_jobs} jobs")

    # Get all collected records
    results = client.monitors.pull_monitor_results(monitor.monitor_id)
    print(f"Total records: {results.records}")

    for record in results.all_records:
        print(f"  {record.record_title}")
        print(f"  Added: {record.added_on}")
        print(f"  Updated: {record.updated_on}")
    ```
  </Tab>
</Tabs>

<Accordion title="Complete monitor example">
  ```python theme={null}
  from newscatcher_catchall import CatchAllApi
  from newscatcher_catchall.core.api_error import ApiError

  client = CatchAllApi(api_key="YOUR_API_KEY")

  try:
      # Create monitor from completed job
      job_id = "af7a26d6-cf0b-458c-a6ed-4b6318c74da3"
      
      monitor = client.monitors.create_monitor(
          reference_job_id=job_id,
          schedule="every day at 12 PM UTC",
          webhook_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
      )
      monitor_id = monitor.monitor_id
      print(f"Monitor created: {monitor_id}")

      # Update webhook
      client.monitors.update_monitor(
          monitor_id=monitor_id,
          webhook_ids=["b2c3d4e5-f6a7-8901-bcde-f12345678901"],
          limit=100,
      )

      # List all monitors
      all_monitors = client.monitors.list_monitors()
      for m in all_monitors.monitors:
          status = "active" if m.enabled else "paused"
          print(f"{m.monitor_id}: {status}")

      # Control execution
      client.monitors.disable_monitor(monitor_id)
      client.monitors.enable_monitor(monitor_id)

      # List execution history
      jobs = client.monitors.list_monitor_jobs(
          monitor_id=monitor_id,
          sort="desc",
      )
      print(f"\nMonitor executed {jobs.total_jobs} jobs")
      for job in jobs.jobs:
          print(f"  Job {job.job_id}: {job.start_date} to {job.end_date}")

      # Get aggregated results
      results = client.monitors.pull_monitor_results(monitor_id)
      print(f"\nCollected {results.records} total records")
      for record in results.all_records:
          print(f"  {record.record_title}")
          print(f"  Added: {record.added_on}")

  except ApiError as e:
      print(f"Status: {e.status_code}")
      print(f"Error: {e.body}")
  ```
</Accordion>

## Company watchlist

Company watchlist lets you track specific companies across jobs. Create entities,
group them into a dataset, then connect the dataset to any job to get per-company
relevance scores in results.

<Tabs>
  <Tab title="Create entity">
    ```python theme={null}
    from newscatcher_catchall import (
        CatchAllApi,
        AdditionalAttributes,
        CompanyAttributes,
    )

    client = CatchAllApi(api_key="YOUR_API_KEY")

    entity = client.entities.create_entity(
        name="NewsCatcher",
        entity_type="company",
        description="AI-powered news data provider",
        additional_attributes=AdditionalAttributes(
            company_attributes=CompanyAttributes(
                domain="newscatcherapi.com",
                alternative_names=["NC", "NewsCatcher API"],
                key_persons=["Artem Bugara", "Maksym Sugonyaka"],
            )
        ),
    )
    entity_id = entity.id
    ```
  </Tab>

  <Tab title="Create dataset from CSV">
    Upload a CSV to create a dataset and entities in one step:

    ```python theme={null}
    dataset = client.datasets.create_dataset_from_csv(
        file=open("companies.csv", "rb"),
        name="My Portfolio",
    )
    dataset_id = dataset.dataset_id
    ```

    CSV format:

    ```csv theme={null}
    name,description,domain,alternative_names,key_persons
    NewsCatcher,"AI-powered news data provider",newscatcherapi.com,"NC;NewsCatcher API","Artem Bugara;Maksym Sugonyaka"
    OpenAI,"Artificial intelligence research company",openai.com,"Open AI","Sam Altman"
    ```
  </Tab>

  <Tab title="Submit connected job">
    Wait for the dataset to reach `ready` status, then submit a job:

    ```python theme={null}
    import time

    # Poll until ready
    while True:
        status = client.datasets.get_dataset(dataset_id)
        if status.latest_status == "ready":
            break
        time.sleep(5)

    # Submit connected job
    job = client.jobs.create_job(
        query="AI chip partnerships",
        connected_dataset_ids=[dataset_id],
    )
    ```
  </Tab>

  <Tab title="Read results">
    Each record includes a `connected_entities` array with relevance scores:

    ```python theme={null}
    results = client.jobs.get_job_results(job.job_id)

    for record in results.all_records:
        print(record.record_title)
        for entity in record.connected_entities:
            print(f"  {entity.name}: score {entity.ed_score}/10")
            print(f"  {entity.relation}")
    ```
  </Tab>
</Tabs>

<Tip>
  For a full step-by-step walkthrough including batch entity creation and the
  JSON API path, see [Company Watchlist](/web-search-api/concepts/company-search).
</Tip>

## Async usage

Use the async client for non-blocking API calls:

```python theme={null}
import asyncio
from newscatcher_catchall import AsyncCatchAllApi

POLL_INTERVAL_SECONDS = 60

async def main():
    client = AsyncCatchAllApi(api_key="YOUR_API_KEY")
    
    job = await client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
    )
    
    while True:
        status = await client.jobs.get_job_status(job.job_id)
        if status.status == "completed":
            break
        await asyncio.sleep(POLL_INTERVAL_SECONDS)
    
    results = await client.jobs.get_job_results(job.job_id)
    print(f"Found {results.valid_records} records")

asyncio.run(main())
```

## Error handling

Handle API errors with structured exception handling:

```python theme={null}
from newscatcher_catchall.core.api_error import ApiError

try:
    client.jobs.create_job(query="AI company acquisitions")
except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")
```

## Advanced usage

### Pagination

Retrieve large result sets page by page:

```python theme={null}
page = 1
while True:
    results = client.jobs.get_job_results(
        job_id=job_id,
        page=page,
        page_size=100,
    )

    print(f"Page {results.page}/{results.total_pages}")

    for record in results.all_records:
        print(f"  {record.record_title}")

    if results.page >= results.total_pages:
        break
    page += 1
```

### Timeouts

Configure custom timeouts at client or request level:

<Tabs>
  <Tab title="Client-level">
    ```python theme={null}
    client = CatchAllApi(
        api_key="YOUR_API_KEY",
        timeout=30.0,
    )
    ```
  </Tab>

  <Tab title="Request-level">
    ```python theme={null}
    client.jobs.create_job(
        query="AI company acquisitions",
        request_options={"timeout_in_seconds": 10},
    )
    ```
  </Tab>
</Tabs>

### Retries

Configure automatic retry behavior for failed requests:

<Tabs>
  <Tab title="Client-level">
    ```python theme={null}
    client = CatchAllApi(
        api_key="YOUR_API_KEY",
        max_retries=3,
    )
    ```
  </Tab>

  <Tab title="Request-level">
    ```python theme={null}
    client.jobs.create_job(
        query="AI company acquisitions",
        request_options={"max_retries": 3},
    )
    ```
  </Tab>
</Tabs>

## Resources

* [GitHub Repository](https://github.com/Newscatcher/newscatcher-catchall-python)
* [PyPI Package](https://pypi.org/project/newscatcher-catchall-sdk/)
* [API Reference](/web-search-api/api-reference/jobs/create-job)
* [Quickstart Guide](/web-search-api/get-started/quickstart)
