End-to-End Pipeline Design
class EfficientExtractionPipeline:
"""
D4 + D5 combined: quality extraction with minimal resource usage.
"""
# D4: Compact schema — nullable fields, no padding
SCHEMA = invoice_schema_with_confidence # from D4 T3.3
# D5: Batch API for non-urgent bulk work
BATCH_SIZE = 100 # process in batches of 100
# D5: Confidence routing thresholds (calibrated from D4 schema confidence)
ROUTING_THRESHOLDS = {
"high": "auto_process", # >90% accurate based on calibration
"medium": "spot_check", # 70-90% accurate
"low": "human_review" # <70% accurate
}
async def process_batch(self, invoices: dict) -> dict:
"""
D4: Extract with structured schema
D5: Batch API + validation + routing
"""
# D5 + D4: Submit to Batch API with forced schema
batch_requests = [
{
"custom_id": f"inv-{inv_id}",
"params": {
"model": "claude-sonnet-4-6",
"tools": [self.SCHEMA],
"tool_choice": {"type": "tool", "name": "extract_invoice"},
"messages": [{"role": "user", "content": f"Extract:\n{text}"}]
}
}
for inv_id, text in invoices.items()
]
# D5: Submit batch (50% cost saving)
batch = client.beta.messages.batches.create(requests=batch_requests)
results = await self.poll_and_collect(batch.id)
# D4: Validate extracted data
validated = []
failed = []
for result in results:
extraction = parse_tool_result(result)
# D4 validation: schema + semantic
if not self.validate(extraction):
failed.append({"id": result.custom_id, "reason": "validation_failed"})
continue
# D5: Route based on D4 confidence
route = self.route(extraction)
validated.append({"id": result.custom_id, "data": extraction, "route": route})
return {
"auto_process": [v for v in validated if v["route"] == "auto_process"],
"spot_check": [v for v in validated if v["route"] == "spot_check"],
"human_review": [v for v in validated if v["route"] == "human_review"],
"failed": failed,
}
def efficiency_metric(self, results: dict, batch_token_usage: int) -> dict:
"""End-to-end efficiency: tokens per successful extraction."""
total = sum(len(v) for v in results.values())
successful = len(results["auto_process"]) + len(results["spot_check"])
return {
"total_processed": total,
"auto_processed": len(results["auto_process"]),
"tokens_per_extraction": batch_token_usage / total if total > 0 else 0,
"human_review_rate": len(results["human_review"]) / total,
"failure_rate": len(results["failed"]) / total,
}
Key Takeaways
- Schema compactness affects cost — nullable fields save tokens vs empty strings
- Validate early — catch D4 failures before D5 pipeline processes bad data
- Batch API for non-urgent volume — D5 efficiency tool that complements D4 quality
- Confidence from D4 drives D5 routing — one system, not two
- Measure end-to-end — tokens per successful extraction captures both quality and efficiency