Alloovium

Use Cases

Contract Intelligence Pipeline

Extract obligations, payment terms, and milestones from contracts automatically, then push structured data to your ERP or project management system.

What you will build

A Python service that accepts a contract PDF, uploads it to an Alloovium project vault, waits for processing to complete, then queries the document with targeted prompts to extract structured obligation, payment, and milestone data. The extracted data is then pushed to an external system such as Procore or Jira.

By the end of this guide you will have a reusable async function that takes a contract path and returns a typed Python dataclass with all extracted fields — ready to be inserted into your database or forwarded to a downstream API.

Difficulty and build time

This is an Intermediate use case. Expect roughly two days to production, including testing against real contracts. You will need scopes scopes on your API key.

Prerequisites

  • An Alloovium API key with scopes scopes. See the Quickstart for key creation.
  • Python 3.11+ and deps.
  • A contract in PDF format. Alloovium also accepts DOCX.
  • Optional: a Procore or Jira account for step 5.
bash
pip install httpx

Step 1 — Create a project and upload the contract

Every document in Alloovium lives inside a project. If you have an existing project, skip the creation step and use its ID directly. Projects are permanent — you would typically create one per job and reuse it for all documents on that job.

Create the project

python
import httpx import asyncio BASE_URL = "https://api.alloovium.com/api/v1" API_KEY = "ak_live_YOUR_KEY_HERE" HEADERS = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", } async def create_project(client: httpx.AsyncClient, name: str) -> str: """Create a project and return its ID.""" resp = await client.post( f"{BASE_URL}/projects", json={"name": name, "project_type": "commercial"}, headers=HEADERS, ) resp.raise_for_status() return resp.json()["id"]

Upload the contract using the two-step bulk flow

Alloovium uses a prepare/confirm flow for uploads. The prepare step returns signed S3 URLs. You upload directly to S3, then confirm — this keeps your API key out of the multipart boundary and removes the API server from the data path.

python
import os from pathlib import Path async def upload_contract( client: httpx.AsyncClient, project_id: str, contract_path: str, ) -> str: """Upload a contract PDF and return the document ID.""" path = Path(contract_path) filename = path.name file_size = path.stat().st_size # Step 1a — prepare: request a signed upload URL resp = await client.post( f"{BASE_URL}/projects/{project_id}/documents/bulk-prepare", json={ "documents": [ { "filename": filename, "content_type": "application/pdf", "size": file_size, } ] }, headers=HEADERS, ) resp.raise_for_status() prepared = resp.json()["documents"][0] doc_id = prepared["document_id"] upload_url = prepared["upload_url"] upload_fields = prepared.get("upload_fields", {}) # Step 1b — stream the file directly to S3 with open(contract_path, "rb") as f: if upload_fields: # Multipart POST to S3 s3_resp = await client.post( upload_url, data=upload_fields, files={"file": (filename, f, "application/pdf")}, ) else: # Simple PUT to presigned URL s3_resp = await client.put( upload_url, content=f.read(), headers={"Content-Type": "application/pdf"}, ) s3_resp.raise_for_status() # Step 1c — confirm: tell Alloovium the upload succeeded confirm_resp = await client.post( f"{BASE_URL}/projects/{project_id}/documents/bulk-confirm", json={"document_ids": [doc_id]}, headers=HEADERS, ) confirm_resp.raise_for_status() return doc_id

Step 2 — Poll for processing completion

After confirmation, Alloovium processes the document asynchronously — OCR, text extraction, chunking, and embedding. Typical processing times are 30–120 seconds for a standard contract PDF. Poll the document status endpoint until the status is status.

Do not hammer the polling endpoint

Use exponential backoff. The endpoint is rate-limited. Polling more than once every five seconds for a single document will consume burst tokens unnecessarily.
python
import asyncio async def wait_for_processing( client: httpx.AsyncClient, project_id: str, doc_id: str, timeout_seconds: int = 300, ) -> None: """Poll until the document status is 'processed'. Raises on timeout.""" deadline = asyncio.get_event_loop().time() + timeout_seconds delay = 5.0 while asyncio.get_event_loop().time() < deadline: resp = await client.get( f"{BASE_URL}/projects/{project_id}/documents/{doc_id}", headers=HEADERS, ) resp.raise_for_status() status = resp.json().get("status") if status == "processed": return if status in ("failed", "error"): raise RuntimeError(f"Document processing failed: {resp.json()}") await asyncio.sleep(delay) delay = min(delay * 1.5, 30.0) # cap at 30s raise TimeoutError(f"Document {doc_id} did not process within {timeout_seconds}s")

Step 3 — Extract structured data via the chat API

With the document processed, create a conversation scoped to the project and ask targeted extraction questions. The chat API uses server-sent events (SSE) for streaming. For batch extraction scripts, it is often simpler to accumulate the full answer rather than process the stream in real time — the example below does that.

Create a conversation

python
async def create_conversation( client: httpx.AsyncClient, project_id: str, ) -> str: """Create a conversation scoped to the project and return its ID.""" resp = await client.post( f"{BASE_URL}/conversations", json={"project_id": project_id, "title": "Contract extraction"}, headers=HEADERS, ) resp.raise_for_status() return resp.json()["id"]

Send extraction prompts and collect the full answer

python
async def ask_and_collect( client: httpx.AsyncClient, conversation_id: str, question: str, ) -> str: """Send a message and collect the full streamed answer as a string.""" full_answer: list[str] = [] async with client.stream( "POST", f"{BASE_URL}/conversations/{conversation_id}/messages", json={"content": question}, headers={**HEADERS, "Accept": "text/event-stream"}, timeout=120.0, ) as stream: async for line in stream.aiter_lines(): if not line.startswith("data:"): continue payload = line[5:].strip() if payload in ("", "[DONE]"): continue import json try: event = json.loads(payload) except json.JSONDecodeError: continue event_type = event.get("type") if event_type == "token": full_answer.append(event.get("token", "")) elif event_type == "complete": # complete event contains the full answer too return event.get("answer", "".join(full_answer)) return "".join(full_answer)

Define the extraction prompts

Use structured prompts that ask for a specific format. Asking for JSON output in the prompt makes parsing reliable. The chat API grounds its answers in the uploaded documents, so these prompts return values drawn directly from the contract text.

python
OBLIGATION_PROMPT = """ List all contractor obligations from this contract. For each obligation return a JSON object with these fields: - clause_ref: the clause number (e.g. "12.3") - description: a one-sentence plain-language description - party: who is obligated ("contractor", "principal", or "both") - deadline: ISO date if a deadline is stated, otherwise null Return a JSON array. No markdown, no commentary — raw JSON only. """ PAYMENT_TERMS_PROMPT = """ Extract all payment terms from this contract. Return a JSON object with: - payment_schedule: array of {milestone, amount_aud, due_date} - retention_percent: number (e.g. 5 for 5%) - retention_release_condition: string - late_payment_interest_percent: number or null - payment_claim_period_days: number Raw JSON only. """ MILESTONE_PROMPT = """ List all project milestones and completion dates from this contract. Return a JSON array where each item has: - name: milestone name - date: ISO date or null if not stated - liquidated_damages_per_day_aud: number or null Raw JSON only. """

Step 4 — Parse the response and build your data model

LLMs reliably return valid JSON when the prompt is specific enough. Use a simple extraction helper that strips any accidental markdown fences, then parse with func.

python
import json import re from dataclasses import dataclass, field from typing import Optional def extract_json(raw: str) -> object: """Strip markdown fences and parse JSON from an LLM response.""" # Strip leading/trailing whitespace and any markdown code fence markers cleaned = raw.strip() if cleaned.startswith("json"): cleaned = cleaned[4:] # Remove lines that are only backtick fence markers lines = [ln for ln in cleaned.splitlines() if ln.strip() not in ("json", "")] cleaned = "\n".join(lines).strip() return json.loads(cleaned) @dataclass class PaymentTerms: payment_schedule: list[dict] retention_percent: float retention_release_condition: str late_payment_interest_percent: Optional[float] payment_claim_period_days: int @dataclass class ContractIntelligence: project_id: str document_id: str obligations: list[dict] = field(default_factory=list) payment_terms: Optional[PaymentTerms] = None milestones: list[dict] = field(default_factory=list) async def extract_contract_intelligence( client: httpx.AsyncClient, project_id: str, doc_id: str, ) -> ContractIntelligence: """Run all extraction prompts and return a populated ContractIntelligence.""" conv_id = await create_conversation(client, project_id) raw_obligations = await ask_and_collect(client, conv_id, OBLIGATION_PROMPT) raw_payment = await ask_and_collect(client, conv_id, PAYMENT_TERMS_PROMPT) raw_milestones = await ask_and_collect(client, conv_id, MILESTONE_PROMPT) obligations = extract_json(raw_obligations) payment_data = extract_json(raw_payment) milestones = extract_json(raw_milestones) payment_terms = PaymentTerms( payment_schedule=payment_data.get("payment_schedule", []), retention_percent=float(payment_data.get("retention_percent", 0)), retention_release_condition=payment_data.get("retention_release_condition", ""), late_payment_interest_percent=payment_data.get("late_payment_interest_percent"), payment_claim_period_days=int(payment_data.get("payment_claim_period_days", 0)), ) return ContractIntelligence( project_id=project_id, document_id=doc_id, obligations=obligations if isinstance(obligations, list) else [], payment_terms=payment_terms, milestones=milestones if isinstance(milestones, list) else [], )

Step 5 — Push to Procore or Jira

Once you have the cls object, push to your downstream system. Below is pseudocode for Procore commitments and Jira issues — substitute your actual client libraries or REST calls.

Push milestones to Procore

python
async def push_to_procore( intelligence: ContractIntelligence, procore_project_id: int, procore_token: str, ) -> None: """Create Procore schedule items for each extracted milestone.""" async with httpx.AsyncClient() as client: for milestone in intelligence.milestones: await client.post( f"https://api.procore.com/rest/v1.0/projects/{procore_project_id}/schedule/tasks", json={ "task": { "name": milestone["name"], "start_date": milestone.get("date"), "end_date": milestone.get("date"), "notes": ( f"LD: AUD {milestone['liquidated_damages_per_day_aud']}/day" if milestone.get("liquidated_damages_per_day_aud") else "" ), } }, headers={"Authorization": f"Bearer {procore_token}"}, )

Create Jira issues for obligations

python
async def push_obligations_to_jira( intelligence: ContractIntelligence, jira_base_url: str, jira_project_key: str, jira_token: str, ) -> None: """Create a Jira issue for each contractor obligation.""" async with httpx.AsyncClient() as client: for obligation in intelligence.obligations: if obligation.get("party") not in ("contractor", "both"): continue await client.post( f"{jira_base_url}/rest/api/3/issue", json={ "fields": { "project": {"key": jira_project_key}, "issuetype": {"name": "Task"}, "summary": f"[{obligation['clause_ref']}] {obligation['description']}", "duedate": obligation.get("deadline"), "labels": ["contract-obligation"], } }, headers={ "Authorization": f"Bearer {jira_token}", "Content-Type": "application/json", }, )

Step 6 — Schedule recurring extraction

For projects where contracts are amended regularly, run extraction on a schedule. The example below uses a simple cron job with Python. For production, prefer a task queue (Celery, ARQ) or a cloud scheduler (AWS EventBridge, GCP Cloud Scheduler).

python
# cron_extract.py — run with cron: 0 8 * * 1 (every Monday at 08:00) import asyncio import httpx async def run_weekly_extraction() -> None: """Re-extract intelligence for all active contracts.""" async with httpx.AsyncClient(timeout=180.0) as client: # Fetch all projects tagged as active resp = await client.get( f"{BASE_URL}/projects?status=active&limit=100", headers=HEADERS, ) resp.raise_for_status() projects = resp.json().get("data", []) for project in projects: project_id = project["id"] # Fetch processed documents in this project docs_resp = await client.get( f"{BASE_URL}/projects/{project_id}/documents?status=processed&limit=50", headers=HEADERS, ) docs_resp.raise_for_status() docs = docs_resp.json().get("data", []) for doc in docs: if doc.get("document_type") != "contract": continue try: intelligence = await extract_contract_intelligence( client, project_id, doc["id"] ) await push_obligations_to_jira( intelligence, jira_base_url="https://yourorg.atlassian.net", jira_project_key="CONST", jira_token="JIRA_TOKEN", ) except Exception as exc: # Log and continue — do not let one failure block the rest print(f"Extraction failed for {doc['id']}: {exc}") if __name__ == "__main__": asyncio.run(run_weekly_extraction())

Rate limits

The Standard tier allows 60 chat requests per minute. If you have many contracts, add a semaphore to limit concurrent chat requests. See Rate Limits for tier details.

Going further