Auth Flows API
Test the reliability of authentication flows against any target URL. AIdenID provisions an identity, triggers the flow, intercepts the verification email, and reports the results.
Auth flow testing is ideal for monitoring the reliability of your own signup, password reset, and invite flows — or for validating that third-party services correctly deliver verification emails.
Create password reset run
Initiate a password reset flow test against a target URL. AIdenID will attempt to trigger a password reset email to the specified identity, then monitor for extraction.
Request body
| Field | Type | Required | Description |
|---|---|---|---|
target_url | string | Yes | URL of the password reset page to test |
identity_id | string | Yes | Identity to use for the test (must have an active inbox) |
timeout_seconds | integer | No | Max time to wait for the email (default: 120) |
Generate a unique opaque Idempotency-Key per mutation intent. Reuse it only for retries of the same mutation payload. Rotate keys for new operations.
# Server-only note: inject Authorization credentials in backend code, never in browser/client bundles.
curl -X POST https://api.aidenid.com/v1/reliability/password-reset \
-H "X-Server-Auth-Injected: true" \
-H "X-Org-Id: org_abc123" \
-H "X-Project-Id: proj_def456" \
-H "Idempotency-Key: <GENERATED_UNIQUE_KEY>" \
-H "Content-Type: application/json" \
-d '{
"target_url": "https://example.com/forgot-password",
"identity_id": "ident_a1b2c3d4e5",
"timeout_seconds": 120
}'Response
{
"id": "run_t1u2v3w4",
"flow_type": "password_reset",
"status": "pending",
"identity_id": "ident_a1b2c3d4e5",
"target_url": "https://example.com/forgot-password",
"created_at": "2026-04-04T10:10:00Z"
}Create invite flow run
Test an invite flow. Works the same as password reset testing but targets invite or signup URLs.
Request body
| Field | Type | Required | Description |
|---|---|---|---|
target_url | string | Yes | URL of the invite page to test |
identity_id | string | Yes | Identity to use for the test |
timeout_seconds | integer | No | Max time to wait for the email (default: 120) |
Get run status
Check the status of a reliability test run. The response includes the current status and, once complete, the result of the test.
Run statuses
| Status | Description |
|---|---|
pending | Run created, waiting for email delivery |
email_received | Email arrived, extraction in progress |
completed | Flow completed successfully, code extracted |
timeout | Email was not received within the timeout period |
failed | Flow failed (target URL error, extraction failure, etc.) |
Completed run response
{
"id": "run_t1u2v3w4",
"flow_type": "password_reset",
"status": "completed",
"identity_id": "ident_a1b2c3d4e5",
"target_url": "https://example.com/forgot-password",
"created_at": "2026-04-04T10:10:00Z",
"completed_at": "2026-04-04T10:10:45Z",
"result": {
"email_received_at": "2026-04-04T10:10:38Z",
"extraction_type": "otp",
"has_secret": true,
"redacted_value": "***921",
"secret_retrieval_available": true,
"delivery_time_ms": 38200,
"confidence": 0.98
}
}Secret values are intentionally redacted from the default run payload. Retrieve raw OTP/link values only through a privileged server-side endpoint and never log retrieval credentials in traces or app logs.
Typical workflow
- Create an identity with a short TTL (e.g., 1 hour)
- Start a reliability run with the identity and target URL
- Poll the run status until it reaches
completed,timeout, orfailed - Inspect the result for delivery timing and extraction confidence
- Squash the identity when done
External dependency circuit-breaker contract
Reliability runners call external targets and email providers. Use a shared-state breaker store (Redis/DB), fail closed when that store is unavailable, and keep retries bounded.
# Shared-state breaker contract (process-safe; not in-memory dicts).
# Implement with Redis/DB atomic operations.
class BreakerStore:
def should_allow(self, key: str, now_s: float) -> bool: ...
def record_success(self, key: str) -> None: ...
def record_failure(self, key: str, *, open_for_s: int) -> None: ...
def dependency_error(code: str, message: str, request_id: str, *, retryable: bool = True) -> dict:
return {
"error": {
"code": code,
"message": message,
"details": {"requestId": request_id},
},
"requestId": request_id,
"retryable": retryable,
}
def guarded_call(callable_fn, *, breaker: BreakerStore, breaker_key: str, request_id: str):
now = time.time()
try:
if not breaker.should_allow(breaker_key, now):
return 503, dependency_error(
"dependency_timeout",
"Dependency circuit is open. Retry shortly.",
request_id,
)
except Exception:
return 503, dependency_error(
"dependency_unavailable",
"Dependency breaker store unavailable.",
request_id,
)
try:
response = callable_fn()
except (TimeoutError, ConnectionError, OSError):
try:
breaker.record_failure(breaker_key, open_for_s=30)
except Exception:
return 503, dependency_error(
"dependency_unavailable",
"Dependency breaker store unavailable.",
request_id,
)
return 503, dependency_error(
"dependency_timeout",
"Dependency call timed out.",
request_id,
)
except Exception:
return 502, dependency_error(
"dependency_failed",
"Dependency call failed.",
request_id,
retryable=False,
)
else:
try:
breaker.record_success(breaker_key)
except Exception:
return 503, dependency_error(
"dependency_unavailable",
"Dependency breaker store unavailable.",
request_id,
)
return 200, responseIf breaker state cannot be read or written, return a deterministic 503 dependency_timeout response and do not continue the external call path.
# Python example: password reset reliability test
import requests
import time
import uuid
API = "https://api.aidenid.com"
def auth_headers_from_backend() -> dict[str, str]:
# Server-only integration point:
# fetch credentials from your secret manager and inject Authorization here.
raise RuntimeError("Server-only auth header injection required.")
H = {
**auth_headers_from_backend(),
"X-Org-Id": "org_abc123",
"X-Project-Id": "proj_def456",
"Content-Type": "application/json",
}
TIMEOUT = (3.05, 10)
SENSITIVE_RESULT_FRAGMENTS = ("token", "secret", "password", "otp")
def sanitize_result_for_logs(value):
if isinstance(value, dict):
sanitized = {}
for key, nested_value in value.items():
lowered = str(key).lower()
if any(fragment in lowered for fragment in SENSITIVE_RESULT_FRAGMENTS):
sanitized[key] = "[REDACTED]"
else:
sanitized[key] = sanitize_result_for_logs(nested_value)
return sanitized
if isinstance(value, list):
return [sanitize_result_for_logs(item) for item in value]
return value
def new_idempotency_key() -> str:
return str(uuid.uuid4())
def record_reliability_metric(*, run_id: str, status: str, delivery_time_ms: int | None) -> None:
# Send only allowlisted non-sensitive telemetry fields to your metrics sink.
_ = {"run_id": run_id, "status": status, "delivery_time_ms": delivery_time_ms}
# 1. Create identity
ident_resp = requests.post(
f"{API}/v1/identities",
headers={**H, "Idempotency-Key": new_idempotency_key()},
json={"label": "reset-test", "ttl_hours": 1},
timeout=TIMEOUT,
)
ident_resp.raise_for_status()
ident = ident_resp.json()
# 2. Start reliability run
run_resp = requests.post(
f"{API}/v1/reliability/password-reset",
headers={**H, "Idempotency-Key": new_idempotency_key()},
json={
"target_url": "https://example.com/forgot-password",
"identity_id": ident["id"],
"timeout_seconds": 120,
},
timeout=TIMEOUT,
)
run_resp.raise_for_status()
run = run_resp.json()
# 3. Poll for completion
# Keep retries bounded with exponential backoff and idempotency keys on all mutations.
backoff_seconds = 1
for _ in range(60):
status_resp = requests.get(
f"{API}/v1/reliability/{run['id']}",
headers=H,
timeout=TIMEOUT,
)
status_resp.raise_for_status()
status = status_resp.json()
if status["status"] in ("completed", "timeout", "failed"):
# Record only coarse status/latency metrics and sanitize any secret-bearing
# payload fields before touching logs/telemetry.
result_payload = status.get("result")
sanitized_result = sanitize_result_for_logs(result_payload if isinstance(result_payload, dict) else {})
delivery_time_ms = sanitized_result.get("delivery_time_ms")
record_reliability_metric(
run_id=run["id"],
status=status["status"],
delivery_time_ms=delivery_time_ms if isinstance(delivery_time_ms, int) else None,
)
break
time.sleep(backoff_seconds)
backoff_seconds = min(backoff_seconds * 2, 8)
# 4. Clean up
squash_resp = requests.post(
f"{API}/v1/identities/{ident['id']}/squash",
headers={**H, "Idempotency-Key": new_idempotency_key()},
timeout=TIMEOUT,
)
squash_resp.raise_for_status()TypeScript example
const API = "https://api.aidenid.com";
const getServerAuthHeaders = (): Record<string, string> => {
// Server-only integration point: inject Authorization headers from your backend.
throw new Error("Server-only auth header injection required.");
};
const headers = {
...getServerAuthHeaders(),
"X-Org-Id": "org_abc123",
"X-Project-Id": "proj_def456",
"Content-Type": "application/json",
};
const newIdempotencyKey = (operation: string) => {
if (!globalThis.crypto?.randomUUID) {
throw new Error("Secure idempotency key generation requires crypto.randomUUID()");
}
return `${operation}-${crypto.randomUUID()}`;
};
const fetchWithTimeout = async (url: string, init: RequestInit = {}, timeoutMs = 10_000) => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetch(url, { ...init, signal: controller.signal });
} finally {
clearTimeout(timeout);
}
};
const recordReliabilityMetric = (metric: {
runId: string;
status: string;
deliveryTimeMs: number | null;
}) => {
// Send only allowlisted, non-sensitive telemetry fields.
void metric;
};
const SENSITIVE_RESULT_KEY_PATTERN = /(token|secret|password|otp)/i;
const sanitizeResultForLogs = (value: unknown): unknown => {
if (Array.isArray(value)) {
return value.map((entry) => sanitizeResultForLogs(entry));
}
if (value && typeof value === "object") {
const sanitized: Record<string, unknown> = {};
for (const [key, nestedValue] of Object.entries(value as Record<string, unknown>)) {
if (SENSITIVE_RESULT_KEY_PATTERN.test(key)) {
sanitized[key] = "[REDACTED]";
} else {
sanitized[key] = sanitizeResultForLogs(nestedValue);
}
}
return sanitized;
}
return value;
};
// 1. Create identity
const identResponse = await fetchWithTimeout(`${API}/v1/identities`, {
method: "POST",
headers: { ...headers, "Idempotency-Key": newIdempotencyKey("rel-ident") },
body: JSON.stringify({ label: "reset-test", ttl_hours: 1 }),
});
if (!identResponse.ok) {
throw new Error(`identity_create_failed:${identResponse.status}`);
}
const ident = await identResponse.json();
// 2. Start reliability run
const runResponse = await fetchWithTimeout(`${API}/v1/reliability/password-reset`, {
method: "POST",
headers: { ...headers, "Idempotency-Key": newIdempotencyKey("rel-run") },
body: JSON.stringify({
target_url: "https://example.com/forgot-password",
identity_id: ident.id,
timeout_seconds: 120,
}),
});
if (!runResponse.ok) {
throw new Error(`run_start_failed:${runResponse.status}`);
}
const run = await runResponse.json();
// 3. Poll for completion
const baseDelayMs = 500;
const maxDelayMs = 8_000;
for (let attempt = 0; attempt < 60; attempt++) {
const response = await fetchWithTimeout(`${API}/v1/reliability/${run.id}`, { headers });
if (response.status === 429 || response.status === 503) {
const jitterMs = Math.floor(Math.random() * 250);
const retryDelayMs = Math.min(baseDelayMs * (2 ** attempt), maxDelayMs) + jitterMs;
await new Promise((r) => setTimeout(r, retryDelayMs));
continue;
}
if (!response.ok) {
throw new Error(`run_status_failed:${response.status}`);
}
const status = await response.json();
if (["completed", "timeout", "failed"].includes(status.status)) {
const sanitizedResult = sanitizeResultForLogs(status?.result);
const deliveryTimeMs = (
sanitizedResult
&& typeof sanitizedResult === "object"
&& "delivery_time_ms" in sanitizedResult
&& typeof (sanitizedResult as { delivery_time_ms?: unknown }).delivery_time_ms === "number"
)
? (sanitizedResult as { delivery_time_ms?: number }).delivery_time_ms ?? null
: null;
recordReliabilityMetric({
runId: run.id,
status: status.status,
deliveryTimeMs,
});
break;
}
const jitterMs = Math.floor(Math.random() * 250);
const nextDelayMs = Math.min(baseDelayMs * (2 ** attempt), maxDelayMs) + jitterMs;
await new Promise((r) => setTimeout(r, nextDelayMs));
}
// 4. Clean up
await fetchWithTimeout(`${API}/v1/identities/${ident.id}/squash`, {
method: "POST",
headers: { ...headers, "Idempotency-Key": newIdempotencyKey("rel-squash") },
});Auth error envelope contract
All auth-related failures return a stable envelope with a top-level requestId and mirrored error.details.requestId for traceability.
{
"error": {
"code": "scope_mismatch",
"message": "Consumer session does not match provided org/project scope headers.",
"details": {
"requestId": "req_abc123"
}
},
"requestId": "req_abc123",
"timestamp": "2026-04-06T21:33:45.225313+00:00",
"path": "/v1/reliability/password-reset",
"retryable": false
}| Code | Status | When returned | Retryable |
|---|---|---|---|
unauthorized | 401 | Missing/invalid bearer token or session credential | No |
forbidden | 403 | Authenticated caller lacks required access scope | No |
scope_mismatch | 403 | Provided X-Org-Id/X-Project-Id do not match resolved session scope | No |
dependency_timeout | 503 | Transient backing dependency timeout during auth/session lookup | Yes |
Error codes
| Code | Status | Description |
|---|---|---|
run_not_found | 404 | Reliability run does not exist |
identity_not_found | 404 | Referenced identity does not exist |
identity_already_squashed | 409 | Cannot use a squashed identity for testing |
missing_idempotency_key | 400 | Mutation request missing Idempotency-Key header |