Examples¶
Basic Evaluation¶
from prodloop import ProdloopClient, EvaluationParameter
client = ProdloopClient(api_key="sk_live_...")
result = client.evaluate_call(
audio_file_path="sample_call.mp3",
parameters=[EvaluationParameter.E2E_RESPONSE_TIME, EvaluationParameter.HALLUCINATION],
thresholds={"e2e_response_time_max_ms": 800},
)
print(result)
With Extraction Variables¶
from prodloop import ProdloopClient, EvaluationParameter
client = ProdloopClient(api_key="sk_live_...")
result = client.evaluate_call(
audio_file_path="sample_call.mp3",
parameters=[EvaluationParameter.EXTRACTION_VARIABLES],
extraction_schema={
"customer_name": "string",
"budget_mentioned": "int",
},
bot_captured_variables={
"customer_name": "ram",
"budget_mentioned": 12000,
},
)
print(result)
result includes:
extraction_variables(model extracted values)extraction_validation(match/mismatch summary vsbot_captured_variables)
Prompt-Aware Post-Call Checks¶
Prompt-aware parameters grade the call against the bot prompt you pass as input_prompt.
from prodloop import ProdloopClient, EvaluationParameter
client = ProdloopClient(api_key="sk_live_...")
result = client.evaluate_call(
audio_file_path="sample_call.mp3",
parameters=[
EvaluationParameter.SECTION_SEQUENCING,
EvaluationParameter.MANDATORY_FIELD_GATING,
EvaluationParameter.INTERNAL_JARGON_LEAKAGE,
],
input_prompt="The production prompt used by the bot during this call...",
)
print(result["section_sequencing"])
# {"passed": "true", "explanation": "..."}
For prompt-aware parameters, passed is "true", "false", or "N/A". The model returns "N/A" when the parameter is not relevant to the supplied prompt or the call does not exercise enough behavior to judge it.
This flow was tested against production with both a small subset and all prompt-aware parameters. Example response for a call that did not match the supplied bot prompt:
{
"section_sequencing": {
"passed": "false",
"explanation": "The bot did not follow the section flow defined in the supplied prompt."
},
"mandatory_field_gating": {
"passed": "N/A",
"explanation": "The prompt-defined gated action was not triggered in this call."
},
"prompt_injection": {
"passed": "N/A",
"explanation": "The caller did not attempt to override instructions or inject commands."
}
}
Self Simulation¶
import os
import time
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
client = ProdloopClient(api_key=os.environ["PRODLOOP_API_KEY"])
start = client.simulate_prompt(
simulation_mode=SimulationMode.SELF_SIMULATION,
prompt="You are a concise support bot. Do not invent policy details.",
parameters=[EvaluationParameter.HALLUCINATION],
bot_llm=plugins.LiteLLM(
model="vertex_ai/gemini-2.5-pro",
temperature=0.2,
max_tokens=512,
),
max_turns=10,
adaptive_max_conversations=50,
)
chat_id = start["chat_id"]
while True:
result = client.get_simulation(chat_id)
if result["status"] in {"completed", "failed"}:
print(result)
break
time.sleep(2)
Audit Discovery¶
Audit discovery is a production backend mode for deeper prompt-risk discovery. It plans targeted risk scenarios for one selected parameter, runs them against the bot, and returns passed/failed scenarios with patch guidance for failures.
The after-PyPI production demo lives at simulation_demo/prod_testing/after_pypi/audit_discovery_demo.py. A production smoke test for section_sequencing completed with:
{
"status": "completed",
"final_result": {
"overall_pass": true,
"stop_reason": "audit_discovery_completed",
"stop_message": "Audit discovery completed across planned risk scenarios.",
"audit_discovery": {
"enabled": true,
"passed_scenarios": [
{
"risk_id": "fatal_emergency_interruption",
"planned_risk_passed": true
}
],
"failed_scenarios": [],
"error_scenarios": []
}
}
}
User Orchestrated Simulation¶
import os
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
bot = plugins.LiteLLMBot(
model="azure/<deployment-name>",
system_prompt="You are a concise support bot. Do not invent policy details.",
options={"temperature": 0.2, "max_tokens": 256},
)
client = ProdloopClient(api_key=os.environ["PRODLOOP_API_KEY"])
result = client.simulate_prompt(
simulation_mode=SimulationMode.USER_ORCHESTRATED,
prompt="You are a concise support bot. Do not invent policy details.",
parameters=[EvaluationParameter.HALLUCINATION],
max_turns=10,
adaptive_max_conversations=50,
bot_turn_handler=bot,
)
print(result)
In this mode, set local credentials for the selected supported route. Use Vertex ADC/project/location for vertex_ai/..., or Azure endpoint/API version/API key for azure/....
Discover Simulation Parameters¶
from prodloop import ProdloopClient
client = ProdloopClient(api_key="sk_live_...")
params = client.get_simulation_parameters()
print([item["key"] for item in params["parameters"]])
Use one returned key per simulation request.
turn_by_turn_latency is also included in every simulation final result automatically, so users do not need a separate request just to see per-turn bot latency.
Full Runnable Examples¶
The repository includes copy-pasteable examples in examples/. These are embedded below so the documentation stays in sync with the runnable files.
Environment Template¶
# Prodloop SDK example environment.
# Copy this file to .env and fill in your own values.
# Common: required for every demo.
# For self_simulation demos, this is the only SDK-side value required.
# Bot provider credentials for self_simulation are configured on the Prodloop backend.
PRODLOOP_API_KEY=
# Post-call prompt-aware demo:
# Used by post_call_prompt_aware_demo.py.
POST_CALL_AUDIO_FILE=sample_call.mp3
POST_CALL_PROMPT_FILE=sample_prompt.txt
# Audit discovery demo:
# Used by audit_discovery_demo.py.
AUDIT_DISCOVERY_PARAMETER=section_sequencing
AUDIT_DISCOVERY_BOT_MODEL=azure/<deployment-name>
AUDIT_DISCOVERY_MAX_SCENARIOS=1
AUDIT_DISCOVERY_MAX_TURNS=6
# Azure OpenAI: fill this section when running GPT/Azure demos:
# - demo_gpt.py
# - user_orchestrated_demo_gpt.py
# The demo derives AZURE_API_BASE from AZURE_RESOURCE_NAME.
AZURE_DEPLOYMENT=
AZURE_RESOURCE_NAME=
AZURE_API_VERSION=
AZURE_OPENAI_API_KEY=
# Vertex AI: fill this section when running Gemini/Vertex user-orchestrated demos:
# - user_orchestrated_demo.py
# GOOGLE_APPLICATION_CREDENTIALS can be absolute or relative to your current working directory.
USER_ORCH_LITELLM_MODEL=vertex_ai/gemini-2.5-pro
VERTEX_PROJECT=
VERTEX_LOCATION=global
GOOGLE_APPLICATION_CREDENTIALS=
Vertex AI Self Simulation¶
"""Self-simulation example using the Prodloop SDK and Vertex AI Gemini.
Use this when you want Prodloop to run the full tester-bot simulation on the
backend. The SDK sends only the prompt, selected parameter, and LiteLLM model
route. Bot/provider credentials are owned by the Prodloop backend, not by this
script.
"""
import os
import time
from datetime import datetime
from typing import Any, Mapping
def _load_env_for_demo() -> None:
# Loading .env is optional. Users can also export these variables directly.
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(override=False)
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
def _log(message: str) -> None:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}", flush=True)
def _ms(value: Any) -> str:
if value is None:
return "pending"
try:
return f"{float(value):.2f}ms"
except (TypeError, ValueError):
return str(value)
def _print_turn_metrics(turn: Mapping[str, Any]) -> None:
# A turn is stored as soon as tester+bot complete. Grading can finish later,
# so the same turn may print once as pending and once as completed.
turn_number = int(turn.get("turn_index", 0)) + 1
turn_id = str(turn.get("turn_id") or f"turn_{turn_number}")
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
tester_ms = timing.get("tester_llm_processing_latency_ms") or timing.get(
"tester_generation_latency_ms"
)
bot_ms = timing.get("bot_llm_processing_latency_ms") or timing.get("bot_response_latency_ms")
grade_ms = timing.get("tester_grading_llm_processing_latency_ms")
grade_status = timing.get("tester_grading_status") or (
"completed" if grade_ms is not None else "pending"
)
_log(
f"[turn {turn_number} | {turn_id}] "
f"tester={_ms(tester_ms)} bot={_ms(bot_ms)} "
f"grader={_ms(grade_ms)} grader_status={grade_status}"
)
def _print_stop_reason(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
stop_reason = final_result.get("stop_reason")
stop_message = final_result.get("stop_message")
if stop_reason:
print(f"stop_reason: {stop_reason}", flush=True)
if stop_message:
print(f"stop_message: {stop_message}", flush=True)
def _print_prompt_patch_suggestions(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
parameter_results = final_result.get("parameter_results")
if not isinstance(parameter_results, list):
return
printed_any = False
for item in parameter_results:
if not isinstance(item, Mapping):
continue
if bool(item.get("passed")):
continue
patch_location = str(item.get("prompt_patch_location") or "").strip()
patch_lines = item.get("prompt_patch_lines") or []
if not isinstance(patch_lines, list):
patch_lines = []
if not patch_location and not patch_lines:
continue
if not printed_any:
print("\nPrompt patch suggestions:", flush=True)
printed_any = True
parameter_name = str(item.get("parameter") or "unknown_parameter")
print(f"- parameter: {parameter_name}", flush=True)
if patch_location:
print(f" add_at: {patch_location}", flush=True)
if patch_lines:
print(" lines_to_add:", flush=True)
for line in patch_lines:
line_text = str(line).strip()
if line_text:
print(f" - {line_text}", flush=True)
def main() -> None:
_load_env_for_demo()
api_key = os.getenv("PRODLOOP_API_KEY")
if not api_key:
raise RuntimeError("Set PRODLOOP_API_KEY before running this demo.")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
# In self_simulation, the user selects the bot provider/model/options only.
# Prodloop runs this simulation using provider credentials configured on your backend account.
bot_llm = plugins.LiteLLM(
model="vertex_ai/gemini-2.5-pro",
temperature=0.2,
max_tokens=512,
)
start_response = client.simulate_prompt(
simulation_mode=SimulationMode.SELF_SIMULATION,
prompt=(
"You are a concise food delivery support assistant. "
"Answer only from the provided conversation and do not invent policy details."
),
parameters=[
EvaluationParameter.HALLUCINATION,
],
bot_llm=bot_llm,
max_turns=10,
adaptive_max_conversations=50,
scenario=(
"A customer reports a delayed order and then changes the issue mid-conversation."
),
)
chat_id = start_response["chat_id"]
_log("Simulation started")
_log(f"chat_id: {chat_id}")
_log(f"status: {start_response['status']}")
deadline = time.monotonic() + 1800
seen_turn_states: dict[str, str] = {}
poll_count = 0
while True:
# self_simulation returns immediately with chat_id. Poll until backend
# finishes tester generation, bot replies, grading, and final summary.
poll_count += 1
poll_start = time.monotonic()
status_response = client.get_simulation(chat_id)
poll_latency_ms = round((time.monotonic() - poll_start) * 1000.0, 2)
status = status_response["status"]
turns = status_response.get("turns") or []
turn_count = len(turns) if isinstance(turns, list) else 0
progress = status_response.get("progress") or {}
_log(
f"poll={poll_count} status={status} chat_id={chat_id} "
f"turns={turn_count} poll_latency={poll_latency_ms:.2f}ms "
f"phase={progress.get('current_phase')} "
f"activity={progress.get('latest_activity')} "
f"conversation={progress.get('current_conversation')}/"
f"{progress.get('max_conversations')} "
f"turns_completed={progress.get('turns_completed')}/"
f"{progress.get('max_total_turns')} "
f"eta={progress.get('estimated_completion')}"
)
if isinstance(turns, list):
for turn in turns:
if not isinstance(turn, Mapping):
continue
turn_key = str(turn.get("turn_id") or turn.get("turn_index", "unknown"))
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
grade_status = str(
timing.get("tester_grading_status")
or (
"completed"
if timing.get("tester_grading_llm_processing_latency_ms") is not None
else "pending"
)
)
if seen_turn_states.get(turn_key) == grade_status:
continue
seen_turn_states[turn_key] = grade_status
_print_turn_metrics(turn)
if status in {"completed", "failed"}:
_log("final response:")
print(status_response, flush=True)
_print_stop_reason(status_response)
_print_prompt_patch_suggestions(status_response)
return
if time.monotonic() >= deadline:
raise TimeoutError(f"Simulation did not finish within 30 minutes: {chat_id}")
time.sleep(2)
if __name__ == "__main__":
main()
Azure OpenAI Self Simulation¶
"""Self-simulation example using the Prodloop SDK and Azure OpenAI.
Use this when you want Prodloop to run the full tester-bot simulation on the
backend with an Azure OpenAI deployment. The SDK sends only the Azure deployment
route (for example, azure/my-deployment). Azure credentials must be configured
on the Prodloop backend for self-simulation.
"""
import os
import time
from datetime import datetime
from typing import Any, Mapping
def _load_env_for_demo() -> None:
# Loading .env is optional. Users can also export these variables directly.
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(override=False)
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
def _log(message: str) -> None:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}", flush=True)
def _ms(value: Any) -> str:
if value is None:
return "pending"
try:
return f"{float(value):.2f}ms"
except (TypeError, ValueError):
return str(value)
def _print_turn_metrics(turn: Mapping[str, Any]) -> None:
# Grading runs separately from the tester-bot conversation, so it may be
# pending the first time a turn appears and completed on a later poll.
turn_number = int(turn.get("turn_index", 0)) + 1
turn_id = str(turn.get("turn_id") or f"turn_{turn_number}")
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
tester_ms = timing.get("tester_llm_processing_latency_ms") or timing.get(
"tester_generation_latency_ms"
)
bot_ms = timing.get("bot_llm_processing_latency_ms") or timing.get("bot_response_latency_ms")
grade_ms = timing.get("tester_grading_llm_processing_latency_ms")
grade_status = timing.get("tester_grading_status") or (
"completed" if grade_ms is not None else "pending"
)
_log(
f"[turn {turn_number} | {turn_id}] "
f"tester={_ms(tester_ms)} bot={_ms(bot_ms)} "
f"grader={_ms(grade_ms)} grader_status={grade_status}"
)
def _print_stop_reason(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
stop_reason = final_result.get("stop_reason")
stop_message = final_result.get("stop_message")
if stop_reason:
print(f"stop_reason: {stop_reason}", flush=True)
if stop_message:
print(f"stop_message: {stop_message}", flush=True)
def _print_prompt_patch_suggestions(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
parameter_results = final_result.get("parameter_results")
if not isinstance(parameter_results, list):
return
printed_any = False
for item in parameter_results:
if not isinstance(item, Mapping):
continue
if bool(item.get("passed")):
continue
patch_location = str(item.get("prompt_patch_location") or "").strip()
patch_lines = item.get("prompt_patch_lines") or []
if not isinstance(patch_lines, list):
patch_lines = []
if not patch_location and not patch_lines:
continue
if not printed_any:
print("\nPrompt patch suggestions:", flush=True)
printed_any = True
parameter_name = str(item.get("parameter") or "unknown_parameter")
print(f"- parameter: {parameter_name}", flush=True)
if patch_location:
print(f" add_at: {patch_location}", flush=True)
if patch_lines:
print(" lines_to_add:", flush=True)
for line in patch_lines:
line_text = str(line).strip()
if line_text:
print(f" - {line_text}", flush=True)
def main() -> None:
_load_env_for_demo()
api_key = os.getenv("PRODLOOP_API_KEY")
if not api_key:
raise RuntimeError("Set PRODLOOP_API_KEY before running this demo.")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
# GPT/Azure self_simulation: SDK sends only the Azure deployment route.
# Prodloop runs this simulation using provider credentials configured on your backend account.
azure_deployment = os.getenv("AZURE_DEPLOYMENT") or "gpt-5.4"
bot_llm = plugins.LiteLLM(
model=f"azure/{azure_deployment}",
temperature=0.2,
max_tokens=512,
)
start_response = client.simulate_prompt(
simulation_mode=SimulationMode.SELF_SIMULATION,
prompt=(
"You are a concise food delivery support assistant. "
"Answer only from the provided conversation and do not invent policy details."
),
parameters=[
EvaluationParameter.HALLUCINATION,
],
bot_llm=bot_llm,
max_turns=10,
adaptive_max_conversations=50,
scenario=(
"A customer asks about a delayed order and pressures the bot to confirm fake refund policy details."
),
)
chat_id = start_response["chat_id"]
_log("Simulation started")
_log(f"chat_id: {chat_id}")
_log(f"status: {start_response['status']}")
deadline = time.monotonic() + 1800
seen_turn_states: dict[str, str] = {}
poll_count = 0
while True:
# self_simulation starts a backend job and gives a chat_id immediately.
# Polling lets users watch turns and grading status as they arrive.
poll_count += 1
poll_start = time.monotonic()
status_response = client.get_simulation(chat_id)
poll_latency_ms = round((time.monotonic() - poll_start) * 1000.0, 2)
status = status_response["status"]
turns = status_response.get("turns") or []
turn_count = len(turns) if isinstance(turns, list) else 0
progress = status_response.get("progress") or {}
_log(
f"poll={poll_count} status={status} chat_id={chat_id} "
f"turns={turn_count} poll_latency={poll_latency_ms:.2f}ms "
f"phase={progress.get('current_phase')} "
f"activity={progress.get('latest_activity')} "
f"conversation={progress.get('current_conversation')}/"
f"{progress.get('max_conversations')} "
f"turns_completed={progress.get('turns_completed')}/"
f"{progress.get('max_total_turns')} "
f"eta={progress.get('estimated_completion')}"
)
if isinstance(turns, list):
for turn in turns:
if not isinstance(turn, Mapping):
continue
turn_key = str(turn.get("turn_id") or turn.get("turn_index", "unknown"))
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
grade_status = str(
timing.get("tester_grading_status")
or (
"completed"
if timing.get("tester_grading_llm_processing_latency_ms") is not None
else "pending"
)
)
if seen_turn_states.get(turn_key) == grade_status:
continue
seen_turn_states[turn_key] = grade_status
_print_turn_metrics(turn)
if status in {"completed", "failed"}:
_log("final response:")
print(status_response, flush=True)
_print_stop_reason(status_response)
_print_prompt_patch_suggestions(status_response)
return
if time.monotonic() >= deadline:
raise TimeoutError(f"Simulation did not finish within 30 minutes: {chat_id}")
time.sleep(2)
if __name__ == "__main__":
main()
Prompt-Aware Post-Call Evaluation¶
"""Post-call prompt-aware evaluation using the installed Prodloop SDK.
Run:
python post_call_prompt_aware_demo.py partial
python post_call_prompt_aware_demo.py all
Set POST_CALL_AUDIO_FILE and POST_CALL_PROMPT_FILE in examples/.env first.
Prompt-aware parameters return passed="true", "false", or "N/A".
"""
import json
import os
import sys
from pathlib import Path
from typing import Sequence
from prodloop import EvaluationParameter, ProdloopClient
ENV_PATH = Path(__file__).with_name(".env")
PROMPT_AWARE_PARAMETERS: tuple[EvaluationParameter, ...] = (
EvaluationParameter.SECTION_SEQUENCING,
EvaluationParameter.MANDATORY_FIELD_GATING,
EvaluationParameter.INTERRUPT_RESUME_PRECISION,
EvaluationParameter.CLOSING_VERBATIM_DELIVERY,
EvaluationParameter.SINGLE_ATTEMPT_CONSTRAINTS,
EvaluationParameter.INFO_DUMP_HANDLING,
EvaluationParameter.MID_FLOW_INTENT_SWITCH,
EvaluationParameter.SIDE_TALK_LEAKAGE,
EvaluationParameter.AMBIGUOUS_PARTIAL_RESPONSES,
EvaluationParameter.INTERNAL_JARGON_LEAKAGE,
EvaluationParameter.IDENTITY_EXTRACTION,
EvaluationParameter.PROMPT_INJECTION,
EvaluationParameter.COMMITMENT_EXTRACTION,
EvaluationParameter.SCOPE_BOUNDARY_TESTING,
EvaluationParameter.ROLEPLAY_JAILBREAK,
EvaluationParameter.CONTEXT_MEMORY_ACROSS_TURNS,
EvaluationParameter.HALLUCINATION_FABRICATION,
)
PARTIAL_PARAMETERS: tuple[EvaluationParameter, ...] = (
EvaluationParameter.SECTION_SEQUENCING,
EvaluationParameter.MANDATORY_FIELD_GATING,
EvaluationParameter.PROMPT_INJECTION,
)
def _load_env() -> None:
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(dotenv_path=ENV_PATH, override=False)
def _parameters_for_mode(mode: str) -> Sequence[EvaluationParameter]:
if mode == "partial":
return PARTIAL_PARAMETERS
if mode == "all":
return PROMPT_AWARE_PARAMETERS
raise RuntimeError("Usage: python post_call_prompt_aware_demo.py [partial|all]")
def _required_path(env_name: str) -> Path:
raw_value = os.getenv(env_name, "").strip()
if not raw_value:
raise RuntimeError(f"Set {env_name} in {ENV_PATH}.")
path = Path(raw_value)
if not path.exists():
raise RuntimeError(f"{env_name} path not found: {path}")
return path
def main() -> None:
_load_env()
api_key = os.getenv("PRODLOOP_API_KEY", "").strip()
if not api_key:
raise RuntimeError(f"Set PRODLOOP_API_KEY in {ENV_PATH}.")
mode = sys.argv[1].strip().lower() if len(sys.argv) > 1 else "partial"
parameters = _parameters_for_mode(mode)
audio_path = _required_path("POST_CALL_AUDIO_FILE")
prompt_path = _required_path("POST_CALL_PROMPT_FILE")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
result = client.evaluate_call(
audio_file_path=str(audio_path),
input_prompt=prompt_path.read_text(encoding="utf-8"),
parameters=parameters,
timeout_seconds=1800,
)
print(json.dumps(result, indent=2, ensure_ascii=False), flush=True)
if __name__ == "__main__":
main()
Audit Discovery¶
"""Audit discovery using the installed Prodloop SDK.
Set POST_CALL_PROMPT_FILE, AUDIT_DISCOVERY_PARAMETER, and bot model settings
in examples/.env first. Audit discovery runs one selected prompt-risk parameter
through planned backend scenarios and returns passed/failed scenario evidence.
"""
import json
import os
import time
from pathlib import Path
from typing import Any
from prodloop import ProdloopClient, plugins
ENV_PATH = Path(__file__).with_name(".env")
def _load_env() -> None:
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(dotenv_path=ENV_PATH, override=False)
def _required_path(env_name: str) -> Path:
raw_value = os.getenv(env_name, "").strip()
if not raw_value:
raise RuntimeError(f"Set {env_name} in {ENV_PATH}.")
path = Path(raw_value)
if not path.exists():
raise RuntimeError(f"{env_name} path not found: {path}")
return path
def _int_env(name: str, default: int) -> int:
raw_value = os.getenv(name, "").strip()
return int(raw_value) if raw_value else default
def _safe_bot_payload(client: ProdloopClient, model: str, max_tokens: int) -> dict[str, Any]:
bot_llm = plugins.LiteLLM(
model=model,
temperature=0.2,
max_tokens=max_tokens,
)
return client._self_simulation_bot_payload(bot_llm.to_payload()) # noqa: SLF001
def main() -> None:
_load_env()
api_key = os.getenv("PRODLOOP_API_KEY", "").strip()
if not api_key:
raise RuntimeError(f"Set PRODLOOP_API_KEY in {ENV_PATH}.")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
prompt_path = _required_path("POST_CALL_PROMPT_FILE")
parameter = os.getenv("AUDIT_DISCOVERY_PARAMETER", "section_sequencing").strip()
bot_model = os.getenv("AUDIT_DISCOVERY_BOT_MODEL", "azure/<deployment-name>").strip()
max_scenarios = _int_env("AUDIT_DISCOVERY_MAX_SCENARIOS", 1)
max_turns = _int_env("AUDIT_DISCOVERY_MAX_TURNS", 6)
start = client._post_json( # noqa: SLF001
"/simulate/start",
{
"simulation_mode": "audit_discovery",
"prompt": prompt_path.read_text(encoding="utf-8"),
"parameters": [parameter],
"bot_llm": _safe_bot_payload(client, bot_model, max_tokens=700),
"max_turns": max_turns,
"adaptive_max_conversations": max_scenarios,
"scenario": "Audit discovery smoke test for the selected parameter.",
},
None,
)
chat_id = start["chat_id"]
print("chat_id:", chat_id, flush=True)
while True:
result = client.get_simulation(chat_id)
print(
"status=",
result.get("status"),
"turns=",
len(result.get("turns") or []),
flush=True,
)
if result.get("status") in {"completed", "failed"}:
print(json.dumps(result, indent=2, ensure_ascii=False), flush=True)
return
time.sleep(5)
if __name__ == "__main__":
main()
Vertex AI User Orchestrated Simulation¶
"""User-orchestrated example using the Prodloop SDK and Vertex AI Gemini.
Use this when your local process should run the bot model with your own Vertex
credentials. Prodloop runs the tester and grader, while this script sends only
bot replies and local bot latency back to Prodloop.
"""
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Mapping, Sequence
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
def _log(message: str) -> None:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}", flush=True)
def _ms(value: Any) -> str:
if value is None:
return "pending"
try:
return f"{float(value):.2f}ms"
except (TypeError, ValueError):
return str(value)
def _load_env_for_demo() -> None:
# Loading .env is optional. Users can also export these variables directly.
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(override=False)
credentials_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "").strip()
if credentials_path and not os.path.isabs(credentials_path):
# Let users set GOOGLE_APPLICATION_CREDENTIALS=./service-account.json
# relative to the folder where they run this script.
candidate = Path.cwd() / credentials_path
if candidate.exists():
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(candidate)
_LOCAL_BOT_HANDLER: plugins.LiteLLMBot | None = None
_TURN_COUNTER = 0
def _get_or_create_local_bot_handler() -> plugins.LiteLLMBot:
global _LOCAL_BOT_HANDLER
if _LOCAL_BOT_HANDLER is None:
_load_env_for_demo()
model = os.getenv("USER_ORCH_LITELLM_MODEL") or "vertex_ai/gemini-2.5-pro"
# LiteLLMBot is a callable used by the SDK loop. It runs locally with
# the user's Vertex credentials; Prodloop never receives those creds.
_LOCAL_BOT_HANDLER = plugins.LiteLLMBot(
model=model,
system_prompt=(
"You are a concise food delivery support assistant under test. "
"Do not invent details not present in the chat context."
),
endpoint={
"vertex_project": os.getenv("VERTEX_PROJECT"),
"vertex_location": os.getenv("VERTEX_LOCATION", "global"),
},
options={"temperature": 0.2, "max_tokens": 256},
)
return _LOCAL_BOT_HANDLER
def real_bot_turn_handler(
tester_message: str,
transcript: Sequence[Mapping[str, Any]],
) -> Mapping[str, Any]:
global _TURN_COUNTER
_TURN_COUNTER += 1
_log(f"[turn {_TURN_COUNTER}] tester: {tester_message}")
handler = _get_or_create_local_bot_handler()
result = handler(tester_message, transcript)
text = str(result["response"]).strip()
llm_processing_latency_ms = float(result["llm_processing_latency_ms"])
_log(f"[turn {_TURN_COUNTER}] bot: {text}")
_log(f"[turn {_TURN_COUNTER}] bot_llm_processing_latency={_ms(llm_processing_latency_ms)}")
return result
def _print_stop_reason(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
stop_reason = final_result.get("stop_reason")
stop_message = final_result.get("stop_message")
if stop_reason:
print(f"stop_reason: {stop_reason}", flush=True)
if stop_message:
print(f"stop_message: {stop_message}", flush=True)
def _print_prompt_patch_suggestions(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
parameter_results = final_result.get("parameter_results")
if not isinstance(parameter_results, list):
return
printed_any = False
for item in parameter_results:
if not isinstance(item, Mapping):
continue
if bool(item.get("passed")):
continue
patch_location = str(item.get("prompt_patch_location") or "").strip()
patch_lines = item.get("prompt_patch_lines") or []
if not isinstance(patch_lines, list):
patch_lines = []
if not patch_location and not patch_lines:
continue
if not printed_any:
print("\nPrompt patch suggestions:", flush=True)
printed_any = True
parameter_name = str(item.get("parameter") or "unknown_parameter")
print(f"- parameter: {parameter_name}", flush=True)
if patch_location:
print(f" add_at: {patch_location}", flush=True)
if patch_lines:
print(" lines_to_add:", flush=True)
for line in patch_lines:
line_text = str(line).strip()
if line_text:
print(f" - {line_text}", flush=True)
def main() -> None:
_load_env_for_demo()
global _TURN_COUNTER
_TURN_COUNTER = 0
api_key = os.getenv("PRODLOOP_API_KEY")
if not api_key:
raise RuntimeError("Set PRODLOOP_API_KEY before running this demo.")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
prompt = (
"You are a concise food delivery support assistant. "
"Answer only from available conversation context."
)
max_turns = 10
_log("Starting user_orchestrated simulation...")
# Start the simulation first so the chat_id is visible before bot turns are submitted.
current_response = client._post_json(
"/simulate/start",
{
"simulation_mode": SimulationMode.USER_ORCHESTRATED.value,
"prompt": prompt,
"parameters": [EvaluationParameter.HALLUCINATION.value],
"max_turns": max_turns,
"adaptive_max_conversations": 50,
"scenario": (
"A customer reports delayed delivery and then asks about a possible extra charge."
),
},
None,
)
chat_id = str(current_response.get("chat_id") or "")
_log(f"chat_id: {chat_id}")
_log(f"status: {current_response.get('status')}")
while current_response.get("status") != "completed":
tester_message = str(current_response.get("tester_message") or "").strip()
if not tester_message:
raise RuntimeError("Backend response missing tester_message.")
transcript = current_response.get("transcript") or []
if not isinstance(transcript, list):
raise RuntimeError("Backend transcript must be a list.")
handler_output = real_bot_turn_handler(tester_message, transcript)
# Only the bot reply and measured local bot latency are sent to Prodloop.
# Provider credentials stay in this process.
current_response = client._post_json(
"/simulate/turn",
{
"chat_id": chat_id,
"bot_response": str(handler_output["response"]).strip(),
"bot_response_latency_ms": handler_output["llm_processing_latency_ms"],
},
None,
)
progress = current_response.get("progress") or {}
_log(
f"submitted turn={_TURN_COUNTER} status={current_response.get('status')} "
f"chat_id={chat_id} "
f"phase={progress.get('current_phase')} "
f"activity={progress.get('latest_activity')} "
f"conversation={progress.get('current_conversation')}/"
f"{progress.get('max_conversations')} "
f"turns_completed={progress.get('turns_completed')}/"
f"{progress.get('max_total_turns')} "
f"eta={progress.get('estimated_completion')}"
)
final_response = current_response
_log("User orchestrated simulation completed")
_log(f"chat_id: {final_response.get('chat_id')}")
_log(f"status: {final_response.get('status')}")
turns = final_response.get("turns") or []
if isinstance(turns, list):
for turn in turns:
if not isinstance(turn, Mapping):
continue
turn_number = int(turn.get("turn_index", 0)) + 1
turn_id = str(turn.get("turn_id") or f"turn_{turn_number}")
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
tester_ms = timing.get("tester_llm_processing_latency_ms") or timing.get(
"tester_generation_latency_ms"
)
bot_ms = timing.get("bot_llm_processing_latency_ms") or timing.get(
"bot_response_latency_ms"
)
grade_ms = timing.get("tester_grading_llm_processing_latency_ms")
_log(
f"[turn {turn_number} | {turn_id}] "
f"tester={_ms(tester_ms)} bot={_ms(bot_ms)} grader={_ms(grade_ms)}"
)
_log("final response:")
print(final_response, flush=True)
_print_stop_reason(final_response)
_print_prompt_patch_suggestions(final_response)
if __name__ == "__main__":
main()
Azure OpenAI User Orchestrated Simulation¶
"""User-orchestrated example using the Prodloop SDK and Azure OpenAI.
Use this when your local process should run the bot model with your own Azure
OpenAI credentials. Prodloop runs the tester and grader, while this script sends
only bot replies and local bot latency back to Prodloop.
"""
import os
from datetime import datetime
from typing import Any, Mapping, Sequence
from prodloop import EvaluationParameter, ProdloopClient, SimulationMode, plugins
def _log(message: str) -> None:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}", flush=True)
def _ms(value: Any) -> str:
if value is None:
return "pending"
try:
return f"{float(value):.2f}ms"
except (TypeError, ValueError):
return str(value)
def _load_env_for_demo() -> None:
# Loading .env is optional. Users can also export these variables directly.
try:
from dotenv import load_dotenv
except Exception:
return
load_dotenv(override=False)
def _azure_api_base() -> str:
# Keep the shareable .env simple: users can provide either a full endpoint
# or just the Azure resource name.
explicit_base = os.getenv("AZURE_API_BASE", "").strip()
if explicit_base:
return explicit_base
resource_name = os.getenv("AZURE_RESOURCE_NAME", "").strip()
if not resource_name:
raise RuntimeError("Set AZURE_API_BASE or AZURE_RESOURCE_NAME for GPT user_orchestrated demo.")
if resource_name.startswith("https://"):
return resource_name.rstrip("/")
return f"https://{resource_name}.openai.azure.com"
def _azure_api_key() -> str:
api_key = (
os.getenv("AZURE_API_KEY", "").strip()
or os.getenv("AZURE_OPENAI_API_KEY", "").strip()
)
if not api_key:
raise RuntimeError("Set AZURE_API_KEY or AZURE_OPENAI_API_KEY for GPT user_orchestrated demo.")
return api_key
def _azure_api_version() -> str:
api_version = os.getenv("AZURE_API_VERSION", "").strip()
if not api_version:
raise RuntimeError("Set AZURE_API_VERSION for GPT user_orchestrated demo.")
return api_version
_LOCAL_BOT_HANDLER: plugins.LiteLLMBot | None = None
_TURN_COUNTER = 0
def _get_or_create_local_bot_handler() -> plugins.LiteLLMBot:
global _LOCAL_BOT_HANDLER
if _LOCAL_BOT_HANDLER is None:
_load_env_for_demo()
azure_deployment = os.getenv("AZURE_DEPLOYMENT", "").strip() or "gpt-5.4"
# This local bot uses the user's Azure credentials. Prodloop receives
# only bot replies and latency, never Azure secrets.
_LOCAL_BOT_HANDLER = plugins.LiteLLMBot(
model=f"azure/{azure_deployment}",
system_prompt=(
"You are a concise food delivery support assistant under test. "
"Do not invent details not present in the chat context."
),
api_key=_azure_api_key(),
base_url=_azure_api_base(),
endpoint={"api_version": _azure_api_version()},
options={"temperature": 0.2, "max_tokens": 256},
)
return _LOCAL_BOT_HANDLER
def real_bot_turn_handler(
tester_message: str,
transcript: Sequence[Mapping[str, Any]],
) -> Mapping[str, Any]:
global _TURN_COUNTER
_TURN_COUNTER += 1
_log(f"[turn {_TURN_COUNTER}] tester: {tester_message}")
handler = _get_or_create_local_bot_handler()
result = handler(tester_message, transcript)
text = str(result["response"]).strip()
llm_processing_latency_ms = float(result["llm_processing_latency_ms"])
_log(f"[turn {_TURN_COUNTER}] bot: {text}")
_log(f"[turn {_TURN_COUNTER}] bot_llm_processing_latency={_ms(llm_processing_latency_ms)}")
return result
def _print_stop_reason(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
stop_reason = final_result.get("stop_reason")
stop_message = final_result.get("stop_message")
if stop_reason:
print(f"stop_reason: {stop_reason}", flush=True)
if stop_message:
print(f"stop_message: {stop_message}", flush=True)
def _print_prompt_patch_suggestions(final_response: Mapping[str, Any]) -> None:
final_result = final_response.get("final_result")
if not isinstance(final_result, Mapping):
return
parameter_results = final_result.get("parameter_results")
if not isinstance(parameter_results, list):
return
printed_any = False
for item in parameter_results:
if not isinstance(item, Mapping):
continue
if bool(item.get("passed")):
continue
patch_location = str(item.get("prompt_patch_location") or "").strip()
patch_lines = item.get("prompt_patch_lines") or []
if not isinstance(patch_lines, list):
patch_lines = []
if not patch_location and not patch_lines:
continue
if not printed_any:
print("\nPrompt patch suggestions:", flush=True)
printed_any = True
parameter_name = str(item.get("parameter") or "unknown_parameter")
print(f"- parameter: {parameter_name}", flush=True)
if patch_location:
print(f" add_at: {patch_location}", flush=True)
if patch_lines:
print(" lines_to_add:", flush=True)
for line in patch_lines:
line_text = str(line).strip()
if line_text:
print(f" - {line_text}", flush=True)
def main() -> None:
_load_env_for_demo()
global _TURN_COUNTER
_TURN_COUNTER = 0
api_key = os.getenv("PRODLOOP_API_KEY")
if not api_key:
raise RuntimeError("Set PRODLOOP_API_KEY before running this demo.")
client = ProdloopClient(api_key=api_key, timeout_seconds=1800)
prompt = (
"You are a concise food delivery support assistant. "
"Answer only from available conversation context."
)
max_turns = 10
_log("Starting GPT/Azure user_orchestrated simulation...")
# Start manually instead of using client.simulate_prompt() so chat_id is
# printed immediately and each network turn is visible in the demo output.
current_response = client._post_json(
"/simulate/start",
{
"simulation_mode": SimulationMode.USER_ORCHESTRATED.value,
"prompt": prompt,
"parameters": [EvaluationParameter.HALLUCINATION.value],
"max_turns": max_turns,
"adaptive_max_conversations": 50,
"scenario": (
"A customer reports delayed delivery and pressures the bot to confirm a fake surcharge policy."
),
},
None,
)
chat_id = str(current_response.get("chat_id") or "")
_log(f"chat_id: {chat_id}")
_log(f"status: {current_response.get('status')}")
while current_response.get("status") != "completed":
tester_message = str(current_response.get("tester_message") or "").strip()
if not tester_message:
raise RuntimeError("Backend response missing tester_message.")
transcript = current_response.get("transcript") or []
if not isinstance(transcript, list):
raise RuntimeError("Backend transcript must be a list.")
handler_output = real_bot_turn_handler(tester_message, transcript)
# Submit only the local bot response and LLM latency measurement.
current_response = client._post_json(
"/simulate/turn",
{
"chat_id": chat_id,
"bot_response": str(handler_output["response"]).strip(),
"bot_response_latency_ms": handler_output["llm_processing_latency_ms"],
},
None,
)
progress = current_response.get("progress") or {}
_log(
f"submitted turn={_TURN_COUNTER} status={current_response.get('status')} "
f"chat_id={chat_id} "
f"phase={progress.get('current_phase')} "
f"activity={progress.get('latest_activity')} "
f"conversation={progress.get('current_conversation')}/"
f"{progress.get('max_conversations')} "
f"turns_completed={progress.get('turns_completed')}/"
f"{progress.get('max_total_turns')} "
f"eta={progress.get('estimated_completion')}"
)
final_response = current_response
_log("User orchestrated simulation completed")
_log(f"chat_id: {final_response.get('chat_id')}")
_log(f"status: {final_response.get('status')}")
turns = final_response.get("turns") or []
if isinstance(turns, list):
for turn in turns:
if not isinstance(turn, Mapping):
continue
turn_number = int(turn.get("turn_index", 0)) + 1
turn_id = str(turn.get("turn_id") or f"turn_{turn_number}")
timing = turn.get("timing") or {}
if not isinstance(timing, Mapping):
timing = {}
tester_ms = timing.get("tester_llm_processing_latency_ms") or timing.get(
"tester_generation_latency_ms"
)
bot_ms = timing.get("bot_llm_processing_latency_ms") or timing.get(
"bot_response_latency_ms"
)
grade_ms = timing.get("tester_grading_llm_processing_latency_ms")
_log(
f"[turn {turn_number} | {turn_id}] "
f"tester={_ms(tester_ms)} bot={_ms(bot_ms)} grader={_ms(grade_ms)}"
)
_log("final response:")
print(final_response, flush=True)
_print_stop_reason(final_response)
_print_prompt_patch_suggestions(final_response)
if __name__ == "__main__":
main()