- Submission
- Preliminary Ping.Maps and data analytics
- Clearance against appetite and underwriting rules
- Acceptance, triggering final data correction and enrichment
Understanding the Workflow ArchitectureEach step is labeled as:
- USER ACTION: Operations initiated by a broker, underwriter, or clearance team
- PING: Operations handled by Ping’s team or automated backend processes
0. Preamble
Import necessary libraries, set up authentication, and define helper functions for API calls and console output formatting.clear_and_triage.py
import argparse
import os
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
import requests
# How often to poll for status changes (in seconds)
POLL_INTERVAL = 15
# Shared state dictionary for tracking submission statuses across threads
# Maps pingid -> current status UUID
db: dict[str, str] = {}
def color_text(text: str, color: str) -> str:
"""
Wrap *text* in ANSI escape codes so it renders in the given *color* when
printed to a terminal.
Supported colors: "red", "yellow", "green". If an unsupported color is
passed the text is returned unmodified.
"""
colors = {"red": "\033[31m", "yellow": "\033[33m", "green": "\033[32m"}
if color not in colors:
return text
reset = "\033[0m"
return f"{colors[color]}{text}{reset}"
def authenticate(api_url: str, auth_token: str) -> dict:
"""
Authenticate to the Ping.Vision API and return the necessary headers and base URL.
"""
if auth_token is None:
auth_token = os.environ.get("PINGVISION_AUTH_TOKEN_LOCAL")
if not auth_token:
raise RuntimeError("No auth token provided. Set PINGVISION_AUTH_TOKEN_LOCAL or pass --auth-token")
base_url = api_url.rstrip("/")
if not base_url.endswith("/api/v1"):
base_url = f"{base_url}/api/v1"
headers = {"Authorization": f"Token {auth_token}"}
return headers, base_url
1. List User Teams
Retrieve your team configuration includingteam_uuid, division_uuid, and workflow statuses.
Use List User Teams to get team info and
List Submission Statuses to map status names to UUIDs.
Example code:
clear_and_triage.py
def get_team(base_url: str, headers: dict, company_name: str, team_name: str) -> dict:
"""
Look up a Ping.Vision team by its company and team display names.
This calls the Ping.Vision list_teams endpoint and searches for an exact
match on both company_name and team_name. Use it early in your workflow
to obtain the team_uuid and division_uuid you'll need for submission
creation and status lookups.
If the team is not found, all available teams are printed to the console
to help you identify the correct names, and a RuntimeError is raised.
"""
list_teams_url = f"{base_url}/user/teams/"
response = requests.get(list_teams_url, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to list teams: {response.status_code}")
teams = response.json()
for team in teams:
if team["company_name"] == company_name and team["team_name"] == team_name:
return team
print("Available teams:")
for team in teams:
print(f" * {team['company_name']} / {team['team_name']} ({team['team_uuid']})")
raise RuntimeError(f"Team not found: {company_name} / {team_name}")
def get_statuses(base_url: str, headers: dict, division_uuid: str) -> dict[str, str]:
"""
Retrieve the workflow statuses configured for a division and return as a name->uuid mapping.
This calls the list_submission_statuses endpoint to get all available workflow
statuses (e.g., "Received", "Pending Clearance", "Cleared", "Data Entry", etc.).
Returns a dict mapping status names to their UUIDs for easy lookup when
transitioning submissions between stages.
"""
statuses_url = f"{base_url}/submission-status"
params = {"division": division_uuid}
response = requests.get(statuses_url, params=params, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to list statuses: {response.status_code}")
statuses = response.json()
return {s["name"]: s["uuid"] for s in statuses}
JSON output
[
{
"division_uuid": "12345678-abc1-cde2-efg3-123456789abcd",
"team_uuid": "9876543-abc1-cde2-efg3-123456789abcd",
"team_name": "Ping Intel",
"statuses": [
{ "name": "Received", "uuid": "019c6d26-4468-7b89-8328-d840dc9b5e3b", "...": "..." },
{ "name": "Cleared", "uuid": "019c6d26-4e1b-7c89-ab12-d840dc9b5e3c", "...": "..." },
{ "name": "Data Entry", "uuid": "019c6d26-4e21-784d-bd26-9d7d70991d91", "...": "..." },
{ "name": "Underwriting", "uuid": "019c6d26-4e24-73d5-a2f0-ac50f422ba51", "...": "..." },
"..."
],
"...": "..."
}
]
2. Create Submission
USER ACTION Upload a submission (e.g., email with SOV attachment) using Initiate New Submission. This kicks off the automated intake and clearance workflow. Example code:clear_and_triage.py
def create_submission(
base_url: str,
headers: dict,
team_uuid: str,
file_path: str,
insured_name: str = "Acme Corp",
client_ref: str = None,
) -> str:
"""
Upload files to create a new submission.
Returns the pingid of the created submission.
"""
with open(file_path, "rb") as eml_file:
files = [("files", (Path(file_path).name, eml_file))]
payload = {
"team_uuid": team_uuid,
"insured_name": insured_name,
}
if client_ref:
payload["client_ref"] = client_ref
create_submission_url = f"{base_url}/submission"
response = requests.post(create_submission_url, data=payload, files=files, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to create submission: {response.status_code}")
return response.json()["id"]
JSON output
{
"id": "p-mk-tempo-esw5ab",
"message": "OK",
"url": "http://vision.pingintel.com/submission/i/p-mk-tempo-esw5ab"
}
3. Poll for Status Changes via List Submission Events
USER ACTION Poll List Submission Events to watch forSSC (status change) events
and track submission progress through the workflow.
Production Architecture: This polling typically runs as a background
process watching all submissions for your team/division. This demo filters
to a single
pingid for clarity.clear_and_triage.py
# =============================================================================
# Background Event Monitor (USER ACTION)
# =============================================================================
# Poll for submission status changes. In production, this polling typically
# runs as a background process watching ALL submissions for your team/division.
# This demo filters to a single pingid for clarity and to show clear
# cause-and-effect in the console output.
# =============================================================================
def poll_submission_events(
base_url: str,
headers: dict,
team_uuid: str,
pingid: str,
earliest_allowed_time: datetime,
):
"""
Background polling loop that watches for submission status-change events.
This function is meant to be run in a daemon thread. It calls the
list_submission_events endpoint every POLL_INTERVAL seconds and updates
the shared *db* dict with the latest status UUID for the pingid it sees.
Status-change events are printed to the console as they arrive so you
can observe the submission progressing through the workflow in real time.
Note: This demo polls for a single pingid for clarity. In production, you
would omit the pingid filter and track all submissions, potentially routing
events to different handlers or queues based on the submission's pingid.
"""
submission_events_url = f"{base_url}/submission-events"
last_cursor_id = None
while True:
time.sleep(POLL_INTERVAL)
try:
events_params = {
"pingid": pingid, # (pingid filter typically omitted)
"start": earliest_allowed_time.strftime("%Y%m%d%H%M%S"),
"page_size": 50,
"team": team_uuid, # as needed. can filter by division, team, or nothing if you want 'everything'.
}
if last_cursor_id:
events_params["cursor_id"] = last_cursor_id
response = requests.get(submission_events_url, params=events_params, headers=headers)
if response.status_code not in (200, 201):
print(color_text(f"Error polling events: {response.status_code}", "red"))
continue
events_json = response.json()
last_cursor_id = events_json.get("cursor_id") or last_cursor_id
for event in events_json.get("results", []):
if event.get("event_type") == "SSC": # Submission Status Change
new_status = event.get("new_value", "")
db[pingid] = new_status
print(
color_text(f"Status change: {event.get('message', '')}", "green"),
event.get("old_value", ""),
"->",
new_status,
)
except Exception as e:
print(color_text(f"Error in event polling: {e}", "red"))
JSON output
{
"cursor_id": "019c7d25-8290-7e4b-a094-1c880f73c534",
"results": [
{
"event_type": "SSC",
"message": "Submission status was changed from Initializing to Pending Clearance",
"new_value": "019c6d26-4e1d-7b8a-bad1-c05ed808dc4d",
"pingid": "p-mk-tempo-esw5ab",
"...": "..."
}
"...",
]
}
4. Change Submission Status
PING Advance the submission through workflow stages using Change Submission Status. The clearance team reviews for conflicts, then advances to “Cleared” and “Data Entry”. Example code:clear_and_triage.py
def change_status(base_url: str, headers: dict, pingid: str, new_status_uuid: str):
"""
Change the workflow status of a submission.
Calls the change_status endpoint to transition the submission to a new
workflow stage. Use this to programmatically advance submissions through
clearance, data entry, underwriting, etc.
"""
change_status_url = f"{base_url}/submission/{pingid}/change_status"
response = requests.patch(
change_status_url,
json={"workflow_status_uuid": new_status_uuid},
headers=headers,
)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to change status: {response.status_code}")
JSON output
{
"status": "success"
}
5. Ping Prepares Data for Underwriting
PING Once in Data Entry, Ping’s AI and human-in-the-loop process immediately begins perfecting the SOV data by reviewing and correcting the extracted data, and finally certifying the submission. This triggers the transition to “Underwriting” status.Demo Script Note: The Python demo script below simplifies this step for
staging/testing by using Update
Submission to set
is_building_data_ready=True and auto-advance to Underwriting. In production,
Ping’s certification process handles this transition.clear_and_triage.py
def update_submission(base_url: str, headers: dict, pingid: str, attr_to_update: str, value):
"""
Update an attribute of the submission.
This calls the Update Submission Details endpoint to modify attributes of a submission.
In this demo, we use it to set is_building_data_ready=True to simulate Ping's certification step.
In production, Ping's backend would set this automatically when certification completes.
"""
update_submission_url = f"{base_url}/submission/{pingid}"
response = requests.patch(
update_submission_url,
json={attr_to_update: value},
headers=headers,
)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to update submission: {response.status_code}")
# [DEMO ONLY] Mark building data as ready to trigger RUN_OUTPUTTERS
# In production, this is set automatically when Ping completes correcting the data
update_submission(BASE_URL, headers, pingid, attr_to_update="is_building_data_ready", value=True)
6. Wait for Underwriting Status
USER ACTION Poll List Submission Events to watch for the submission to move to “Underwriting” status after Ping completes certification. Example code:clear_and_triage.py
def wait_for_status(
pingid: str,
desired_status: str,
status_uuids: dict[str, str],
timeout: int = 600,
):
"""
Block until the submission identified by *pingid* reaches *desired_status*.
This polls the shared *db* dict (populated by poll_submission_events) and
compares the current status UUID against the UUID of *desired_status*.
Progress is printed to the console every 6 seconds.
"""
desired_status_uuid = status_uuids.get(desired_status)
start = time.time()
while time.time() - start < timeout:
status_uuid = db.get(pingid)
if status_uuid == desired_status_uuid:
print(color_text(f"Status for {pingid} reached: {desired_status}", "green"))
return
current_status_name = next((k for k, v in status_uuids.items() if v == status_uuid), "unknown")
print(
f"Waiting for {pingid} to reach {color_text(desired_status, 'green')} "
f"(current: {color_text(current_status_name, 'yellow')})"
)
time.sleep(6)
raise TimeoutError(f"Timed out waiting for {pingid} to reach '{desired_status}' after {timeout}s")
# Usage: wait for human certification to complete
wait_for_status(pingid, "Underwriting", status_uuids)
JSON output
{
"cursor_id": "019c7d3e-dd84-74f1-a7a1-cc41ee75ad8e",
"results": [
{
"event_type": "SSC",
"message": "Submission status was changed from Data Entry to Underwriting",
"pingid": "p-mk-tempo-esw5ab",
"...": "..."
},
"...",
]
}
7. Listen for Outputters Complete Event to Download Latest Outputs
USER ACTION Using the same event polling from Step 3, listen forOC (Outputters Complete) events via
List Submission Events.
When Ping finishes generating output files, an OC event is emitted containing download URLs
for the final outputs. The event metadata includes the submission_status at the time of completion,
which informs on how to handle the outputs.
Example code:
clear_and_triage.py
# =============================================================================
# Background Event Monitor for Outputters Complete (USER ACTION)
# =============================================================================
# Poll for OC (Outputters Complete) events. Uses the same polling pattern as
# Step 3 but listens for a different event type. In production, both SSC and
# OC handlers would live in a single polling loop.
# =============================================================================
def poll_for_outputters_complete(
base_url: str,
headers: dict,
team_uuid: str,
earliest_allowed_time: datetime,
):
"""
Background polling loop that watches for Outputters Complete events.
This function is meant to be run in a daemon thread. It calls the
list_submission_events endpoint every POLL_INTERVAL seconds and handles
OC events by downloading the generated output files.
The event metadata includes the submission_status at the time of
completion, which informs on how to process the outputs:
- "Data Entry": fetch scores for the submission
- "Underwriting": download the final output documents
"""
submission_events_url = f"{base_url}/submission-events"
last_cursor_id = None
while True:
time.sleep(POLL_INTERVAL)
try:
events_params = {
"start": earliest_allowed_time.strftime("%Y%m%d%H%M%S"),
"page_size": 50,
"team": team_uuid,
}
if last_cursor_id:
events_params["cursor_id"] = last_cursor_id
response = requests.get(submission_events_url, params=events_params, headers=headers)
if response.status_code not in (200, 201):
print(color_text(f"Error polling events: {response.status_code}", "red"))
continue
events_json = response.json()
last_cursor_id = events_json.get("cursor_id") or last_cursor_id
for event in events_json.get("results", []):
event_type = event.get("event_type")
event_pingid = event.get("pingid")
if event_type == "OC": # Outputters Complete
metadata = event.get("metadata", {})
outputs = metadata.get("outputs", [])
submission_status = metadata.get("submission_status", "")
sudid = metadata.get("sudid", "")
print(
color_text(f"Outputters complete: {event.get('message', '')}", "green"),
)
if submission_status in ["Underwriting"]:
print(color_text(f"Downloading {len(outputs)} output(s) for {sudid=} ...", "yellow"))
for output in outputs:
output_format = output.get("output_format")
scrubbed_filename = output.get("scrubbed_filename")
output_path = f"{sudid}_{scrubbed_filename}"
url = output.get("url")
print(f" Downloading {output_format}: {scrubbed_filename} from {color_text(url, 'blue')}")
download_document(
headers=headers,
url=url,
output_path=output_path,
)
except Exception as e:
print(color_text(f"Error in event polling: {e}", "red"))
JSON output
{
"cursor_id": "019c7d3e-dd84-74f1-a7a1-cc41ee75ad8e",
"results": [
{
"uuid": "019d73c0-2a2e-766f-bdfb-45a9bea7c213",
"created_time": "2026-04-09T19:37:46.798287Z",
"event_type": "OC",
"message": "Outputters complete",
"pingid": "p-mk-tempo-esw5ab",
"division_uuid": "12345678-abc1-cde2-efg3-123456789abcd",
"new_value": null,
"old_value": null,
"user_id": 13,
"metadata": {
"job_id": "93af186e-344b-11f1-91fb-922cce342ed8",
"submission_status": "Underwriting",
"sudid": "019c7d3e-0001-7abc-9000-abcdef123456",
"outputs": [
{
"label": "Ping SOV",
"output_format": "AirModeler",
"scrubbed_filename": "Acme Corp 2026-02-AirModeler.xlsm",
"url": "https://vision.pingintel.com/api/v1/sov/019c7d3e-0001-7abc-9000-abcdef123456/output/Acme%20Corp%202026-02-AirModeler.xlsm"
},
{
"label": "JSON",
"output_format": "JSON",
"scrubbed_filename": "Acme Corp 2026-02.json",
"url": "https://vision.pingintel.com/api/v1/sov/019c7d3e-0001-7abc-9000-abcdef123456/output/Acme%20Corp%202026-02.json"
}
]
}
},
"...",
]
}
8. Download Final Output Documents
USER ACTION Downloads are handled automatically by theOC event handler in Step 7. When the event poller
receives an Outputters Complete event, it extracts the download URLs from the event metadata
and saves each output file locally.
Example code:
clear_and_triage.py
def download_document(
headers: dict,
url: str,
output_path: str,
):
"""
Download a document from a submission.
Fetches the document content and writes it to *output_path*.
"""
response = requests.get(url, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to download {output_path}: {response.status_code}")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
with open(output_path, "wb") as f:
f.write(response.content)
Python output
Downloading AirModeler: Acme Corp 2026-02-AirModeler.xlsm from https://vision.pingintel.com/api/v1/sov/019c7d3e-0001-7abc-9000-abcdef123456/output/Acme%20Corp%202026-02-AirModeler.xlsm
Downloading JSON: Acme Corp 2026-02.json from https://vision.pingintel.com/api/v1/sov/019c7d3e-0001-7abc-9000-abcdef123456/output/Acme%20Corp%202026-02.json
Python Demo
Download Python Script here ||| Download Example Email hereclear_and_triage_api.py
"""
Ping.Vision Clearance & Triage Workflow — Raw API Example
This script demonstrates an end-to-end clearance workflow using direct HTTP requests
to the Ping.Vision API. It uploads a submission (.eml file), monitors status changes
via polling, programmatically advances the submission through clearance stages,
waits for human certification, and finally downloads the finished SOV Fixer outputs.
This is the "raw requests" version of pingvision_clear_and_triage.py, intended to
show exactly what HTTP calls are being made without the abstraction of the
pingintel_api client library.
Prerequisites:
- A valid Ping.Vision API auth token set in PINGVISION_AUTH_TOKEN_LOCAL environment variable
- Network access to the Ping.Vision instance at the configured BASE_URL
- An .eml file containing the submission you want to process
Usage:
python clear_and_triage_api.py --eml /path/to/submission.eml --company "Acme Corp" --team "Acme Corp"
python clear_and_triage_api.py --eml demo_email.eml --company "Ping Intel" --team "Ping Intel" --api-url "http://localhost:8002"
"""
import argparse
import os
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
import requests
# How often to poll for status changes (in seconds)
POLL_INTERVAL = 15
# Shared state dictionary for tracking submission statuses across threads
# Maps pingid -> current status UUID
db: dict[str, str] = {}
def color*text(text: str, color: str) -> str:
"""
Wrap \_text* in ANSI escape codes so it renders in the given _color_ when
printed to a terminal.
Supported colors: "red", "yellow", "green". If an unsupported color is
passed the text is returned unmodified.
"""
colors = {"red": "\033[31m", "yellow": "\033[33m", "green": "\033[32m"}
if color not in colors:
return text
reset = "\033[0m"
return f"{colors[color]}{text}{reset}"
def authenticate(api_url: str, auth_token: str) -> dict:
"""
Authenticate to the Ping.Vision API and return the necessary headers and base URL.
"""
if auth_token is None:
auth_token = os.environ.get("PINGVISION_AUTH_TOKEN_LOCAL")
if not auth_token:
raise RuntimeError("No auth token provided. Set PINGVISION_AUTH_TOKEN_LOCAL or pass --auth-token")
base_url = api_url.rstrip("/")
if not base_url.endswith("/api/v1"):
base_url = f"{base_url}/api/v1"
headers = {"Authorization": f"Token {auth_token}"}
return headers, base_url
def get_team(base_url: str, headers: dict, company_name: str, team_name: str) -> dict:
"""
Look up a Ping.Vision team by its company and team display names.
This calls the Ping.Vision list_teams endpoint and searches for an exact
match on both company_name and team_name. Use it early in your workflow
to obtain the team_uuid and division_uuid you'll need for submission
creation and status lookups.
If the team is not found, all available teams are printed to the console
to help you identify the correct names, and a RuntimeError is raised.
"""
list_teams_url = f"{base_url}/user/teams/"
response = requests.get(list_teams_url, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to list teams: {response.status_code}")
teams = response.json()
for team in teams:
if team["company_name"] == company_name and team["team_name"] == team_name:
return team
print("Available teams:")
for team in teams:
print(f" * {team['company_name']} / {team['team_name']} ({team['team_uuid']})")
raise RuntimeError(f"Team not found: {company_name} / {team_name}")
def get_statuses(base_url: str, headers: dict, division_uuid: str) -> dict[str, str]:
"""
Retrieve the workflow statuses configured for a division and return as a name->uuid mapping.
This calls the list_submission_statuses endpoint to get all available workflow
statuses (e.g., "Received", "Pending Clearance", "Cleared", "Data Entry", etc.).
Returns a dict mapping status names to their UUIDs for easy lookup when
transitioning submissions between stages.
"""
statuses_url = f"{base_url}/submission-status"
params = {"division": division_uuid}
response = requests.get(statuses_url, params=params, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to list statuses: {response.status_code}")
statuses = response.json()
return {s["name"]: s["uuid"] for s in statuses}
def create_submission(
base_url: str,
headers: dict,
team_uuid: str,
file_path: str,
insured_name: str = "Acme Corp",
client_ref: str = None,
) -> str:
"""
Upload files to create a new submission.
Returns the pingid of the created submission.
"""
with open(file_path, "rb") as eml_file:
files = [("files", (Path(file_path).name, eml_file))]
payload = {
"team_uuid": team_uuid,
"insured_name": insured_name,
}
if client_ref:
payload["client_ref"] = client_ref
create_submission_url = f"{base_url}/submission"
response = requests.post(create_submission_url, data=payload, files=files, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to create submission: {response.status_code}")
return response.json()["id"]
# =============================================================================
# Background Event Monitor (USER ACTION)
# =============================================================================
# Poll for submission status changes. In production, this polling typically
# runs as a background process watching ALL submissions for your team/division.
# This demo filters to a single pingid for clarity and to show clear
# cause-and-effect in the console output.
# =============================================================================
def poll_submission_events(
base_url: str,
headers: dict,
team_uuid: str,
earliest_allowed_time: datetime,
):
"""
Background polling loop that watches for submission events.
This function is meant to be run in a daemon thread. It calls the
list_submission_events endpoint every POLL_INTERVAL seconds and handles:
- SSC (Submission Status Change): updates the shared *db* dict with the
latest status UUID for each pingid.
- OC (Outputters Complete): downloads output files when they are ready.
Events are printed to the console as they arrive so you can observe
submissions progressing through the workflow in real time.
"""
submission_events_url = f"{base_url}/submission-events"
last_cursor_id = None
while True:
time.sleep(POLL_INTERVAL)
try:
events_params = {
"start": earliest_allowed_time.strftime("%Y%m%d%H%M%S"),
"page_size": 50,
"team": team_uuid,
}
if last_cursor_id:
events_params["cursor_id"] = last_cursor_id
response = requests.get(submission_events_url, params=events_params, headers=headers)
if response.status_code not in (200, 201):
print(color_text(f"Error polling events: {response.status_code}", "red"))
continue
events_json = response.json()
last_cursor_id = events_json.get("cursor_id") or last_cursor_id
for event in events_json.get("results", []):
event_type = event.get("event_type")
event_pingid = event.get("pingid")
if event_type == "SSC": # Submission Status Change
new_status = event.get("new_value", "")
db[event_pingid] = new_status
print(
color_text(f"Status change: {event.get('message', '')}", "green"),
event.get("old_value", ""),
"->",
new_status,
)
elif event_type == "OC": # Outputters Complete
metadata = event.get("metadata", {})
outputs = metadata.get("outputs", [])
submission_status = metadata.get("submission_status", "")
sudid = metadata.get("sudid", "")
print(
color_text(f"Outputters complete: {event.get('message', '')}", "green"),
)
if submission_status in ["Underwriting"]:
print(color_text(f"Downloading {len(outputs)} output(s) for {sudid=} ...", "yellow"))
for output in outputs:
output_format = output.get("output_format")
scrubbed_filename = output.get("scrubbed_filename")
output_path = f"{sudid}_{scrubbed_filename}"
url = output.get("url")
print(f" Downloading {output_format}: {scrubbed_filename} from {color_text(url, 'blue')}")
download_document(
headers=headers,
url=url,
output_path=output_path,
)
except Exception as e:
print(color_text(f"Error in event polling: {e}", "red"))
def wait*for_status(
pingid: str,
desired_status: str,
status_uuids: dict[str, str],
timeout: int = 600,
):
"""
Block until the submission identified by \_pingid* reaches _desired_status_.
This polls the shared *db* dict (populated by poll_submission_events) and
compares the current status UUID against the UUID of *desired_status*.
Progress is printed to the console every 6 seconds.
"""
desired_status_uuid = status_uuids.get(desired_status)
start = time.time()
while time.time() - start < timeout:
status_uuid = db.get(pingid)
if status_uuid == desired_status_uuid:
print(color_text(f"Status for {pingid} reached: {desired_status}", "green"))
return
current_status_name = next((k for k, v in status_uuids.items() if v == status_uuid), "unknown")
print(
f"Waiting for {pingid} to reach {color_text(desired_status, 'green')} "
f"(current: {color_text(current_status_name, 'yellow')})"
)
time.sleep(6)
raise TimeoutError(f"Timed out waiting for {pingid} to reach '{desired_status}' after {timeout}s")
def change_status(base_url: str, headers: dict, pingid: str, new_status_uuid: str):
"""
Change the workflow status of a submission.
Calls the change_status endpoint to transition the submission to a new
workflow stage. Use this to programmatically advance submissions through
clearance, data entry, underwriting, etc.
"""
change_status_url = f"{base_url}/submission/{pingid}/change_status"
response = requests.patch(
change_status_url,
json={"workflow_status_uuid": new_status_uuid},
headers=headers,
)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to change status: {response.status_code}")
def update_submission(base_url: str, headers: dict, pingid: str, attr_to_update: str, value):
"""
Update a property of the submission.
This is a critical step in the clearance workflow. When is_building_data_ready
is set to True, it signals that:
- The SOV data has been parsed and validated
- Addresses have been geocoded
- Third-party enrichment data has been attached
- The submission is ready for underwriting review
In production, this flag is typically set automatically by Ping's certification
process after a human reviewer has verified the data quality. For demo/testing
purposes, this can be set programmatically to advance the workflow.
This triggers the RUN_OUTPUTTERS job which generates the final output files.
"""
update_submission_url = f"{base_url}/submission/{pingid}"
response = requests.patch(
update_submission_url,
json={attr_to_update: value},
headers=headers,
)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to update submission: {response.status_code}")
def download*document(
headers: dict,
url: str,
output_path: str,
):
"""
Download a document from a submission.
Fetches the document content and writes it to \_output_path*.
"""
response = requests.get(url, headers=headers)
if response.status_code not in (200, 201):
raise RuntimeError(f"Failed to download {output_path}: {response.status_code}")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
with open(output_path, "wb") as f:
f.write(response.content)
def run_clearance_workflow(
\*,
eml_path: str,
company_name: str,
team_name: str,
api_url: str = "http://localhost:8002/api/v1",
auth_token: str | None = None,
) -> str:
"""
Execute the full clearance workflow for a single .eml submission.
This is the main entry point. It performs the following steps:
1. Resolves the team and division from Ping.Vision.
2. Uploads the .eml file as a new submission (USER ACTION).
3. Starts a background thread to monitor events — SSC and OC (USER ACTION).
4. Waits for "Pending Clearance", advances to "Cleared" and "Data Entry" (PING).
5. Ping prepares data for underwriting (PING - demo simplifies via Update Submission API).
6. Waits for "Underwriting" status (USER ACTION polling).
7-8. Background poller listens for OC events and downloads outputs automatically.
Returns the pingid of the created submission.
"""
# Setup API authentication and base URL
headers, base_url = authenticate(api_url, auth_token)
# ==========================================================================
# Step 1: Get Team and Statuses
# ==========================================================================
print(color_text("Step 1: Looking up team and workflow statuses...", "yellow"))
team = get_team(base_url, headers, company_name, team_name)
team_uuid = team["team_uuid"]
division_uuid = team["division_uuid"]
print(f" Team: {team['team_name']} ({team_uuid})")
status_uuids = get_statuses(base_url, headers, division_uuid)
print(f" Found {len(status_uuids)} workflow statuses")
# ==========================================================================
# Step 2: Create Submission
# ==========================================================================
# USER ACTION: A broker or underwriter uploads a submission (e.g., an email
# with SOV attachment) to Ping.Vision for processing. This kicks off the
# automated intake and clearance workflow.
# ==========================================================================
print(color_text("\nStep 2: Creating submission...", "yellow"))
pingid = create_submission(base_url, headers, team_uuid, eml_path)
print(color_text(f" Created submission: {pingid}", "green"))
# ==========================================================================
# Step 3: Start Background Event Polling
# ==========================================================================
# USER ACTION: Start polling for submission events to track status changes.
# In production, this typically runs as a background process watching all
# submissions for your team/division.
# ==========================================================================
print(color_text("\nStep 3: Starting background event polling...", "yellow"))
earliest_allowed_time = datetime.now(timezone.utc)
threading.Thread(
target=poll_submission_events,
args=(
base_url,
headers,
team_uuid,
earliest_allowed_time,
),
name="event_poller",
daemon=True,
).start()
# ==========================================================================
# Step 4: Wait for Pending Clearance, then Advance to Data Entry
# ==========================================================================
# PING: The clearance team reviews the submission for conflicts (e.g.,
# incumbent carriers, duplicate submissions). If cleared, they advance
# through "Cleared" to "Data Entry" where Ping's AI processing begins.
# ==========================================================================
print(color_text("\nStep 4: Waiting for Pending Clearance status...", "yellow"))
wait_for_status(pingid, "Pending Clearance", status_uuids)
print(color_text("\n Advancing to Cleared...", "yellow"))
change_status(base_url, headers, pingid, status_uuids["Cleared"])
wait_for_status(pingid, "Cleared", status_uuids)
print(color_text("\n Advancing to Data Entry...", "yellow"))
change_status(base_url, headers, pingid, status_uuids["Data Entry"])
wait_for_status(pingid, "Data Entry", status_uuids)
# ==========================================================================
# Step 5: Ping Corrects Data for Underwriting
# ==========================================================================
# PING: Ping's AI and human-in-the-loop process downloads the scrubber,
# reviews/corrects the extracted data, and certifies the submission.
# This triggers the transition to Underwriting.
#
# For the sake of this demo script, we simplify this step:
# - Only do this in staging, not production
# - Uses Update Submission API with is_building_data_ready=True
# ==========================================================================
print(color_text("\nStep 5: Ping corrects data...", "yellow"))
print(color_text(" (In production: Ping's team corrects the data)", "red"))
# [DEMO] Auto-advance to Underwriting and mark data as ready
# In production, this is triggered by Ping's certification process
print(color_text(" [DEMO] Auto-advancing to Underwriting...", "yellow"))
change_status(base_url, headers, pingid, status_uuids["Underwriting"])
# Mark building data as ready - this triggers the RUN_OUTPUTTERS job
# In production, this flag is set when Ping's certification is complete
print(color_text(" [DEMO] Marking building data as ready...", "yellow"))
update_submission(base_url, headers, pingid, to_update="is_building_data_ready", value=True)
# ==========================================================================
# Step 6: Wait for Underwriting Status
# ==========================================================================
# USER ACTION: Poll submission events to watch for the transition to
# "Underwriting" status after Ping completes certification.
# ==========================================================================
wait_for_status(pingid, "Underwriting", status_uuids)
# ==========================================================================
# Step 7 & 8: Listen for Outputters Complete and Download Outputs
# ==========================================================================
# The background event poller (Step 3) handles OC (Outputters Complete)
# events automatically — when outputs are ready, it downloads them.
# No additional action needed here; the poller thread runs until the
# process exits.
# ==========================================================================
print(color_text("\nSteps 7-8: Waiting for OC events (handled by background poller)...", "yellow"))
print(color_text(" Output downloads will appear as OC events arrive.", "green"))
# Keep the main thread alive so the daemon poller can continue processing
while True:
time.sleep(60)
print(color_text(f"\n✓ Workflow complete for submission {pingid}", "green"))
return pingid
if **name** == "**main**":
parser = argparse.ArgumentParser(
description="Run Ping.Vision clearance workflow using raw HTTP requests",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python clear_and_triage_api.py --eml demo_email.eml --company "Ping Intel" --team "Ping Intel"
python clear_and_triage_api.py --eml submission.eml --company "Acme" --team "Acme" --api-url "http://localhost:8002"
""",
)
parser.add_argument("--eml", required=True, help="Path to .eml file to submit")
parser.add_argument("--company", required=True, help="Company name in Ping.Vision")
parser.add_argument("--team", required=True, help="Team name in Ping.Vision")
parser.add_argument(
"--api-url",
default="http://localhost:8002",
help="Ping.Vision API base URL (default: localhost:8002)",
)
parser.add_argument(
"--auth-token",
default=None,
help="API auth token (default: PINGVISION_AUTH_TOKEN_LOCAL env var)",
)
args = parser.parse_args()
try:
pingid = run_clearance_workflow(
eml_path=args.eml,
company_name=args.company,
team_name=args.team,
api_url=args.api_url,
auth_token=args.auth_token,
)
except KeyboardInterrupt:
print(color_text("\nWorkflow interrupted by user", "yellow"))
except Exception as e:
print(color_text(f"\nError: {e}", "red"))
raise