In this tutorial, we build an ultra-advanced agentic AI workflow that behaves like a production-grade research and reasoning system rather than a single quick call. We asynchronously ingest real web sources, split them into provenance-tracked chunks, and run hybrid retrieval using both TF-IDF (sparse) and OpenAI embeddings (dense), then fuse results for high recall and consistency. We organize multiple agents, planning, synthesizing and repairing, as well as implementing strict security measures so that every major claim is based on recovered evidence, and we preserve episodic memory. Therefore, the system improves its strategy over time. check it out full code here.
!pip -q install openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy
import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import List, Dict, Tuple, Optional, Any
import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from openai import AsyncOpenAI
from agents import Agent, Runner, SQLiteSession
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
if not os.environ.get("OPENAI_API_KEY"):
raise RuntimeError("OPENAI_API_KEY not provided.")
print("✅ OpenAI API key loaded securely.")
oa = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
def sha1(s: str) -> str:
return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()
def normalize_url(u: str) -> str:
u = (u or "").strip()
return u.rstrip(").,]\"'")
def clean_html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
txt = soup.get_text("\n")
txt = re.sub(r"\n{3,}", "\n\n", txt).strip()
txt = re.sub(r"[ \t]+", " ", txt)
return txt
def chunk_text(text: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> List[str]:
if not text:
return []
text = re.sub(r"\s+", " ", text).strip()
n = len(text)
step = max(1, chunk_chars - overlap_chars)
chunks = []
i = 0
while i < n:
chunks.append(text[i:i + chunk_chars])
i += step
return chunks
def canonical_chunk_id(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
s = s.strip("<>\"'()[]{}")
s = s.rstrip(".,;:")
return s
def inject_exec_summary_citations(exec_summary: str, citations: List[str], allowed_chunk_ids: List[str]) -> str:
exec_summary = exec_summary or ""
cset = []
for c in citations:
c = canonical_chunk_id(c)
if c and c in allowed_chunk_ids and c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) < 2:
for c in allowed_chunk_ids:
if c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) >= 2:
needed = [c for c in cset if c not in exec_summary]
if needed:
exec_summary = exec_summary.strip()
if exec_summary and not exec_summary.endswith("."):
exec_summary += "."
exec_summary += f" (cite: {cset[0]}) (cite: {cset[1]})"
return exec_summary
We set up the environment, securely load the OpenAI API key, and initialize the core utilities on which everything else depends. We define hashing, URL normalization, HTML cleaning, and chunking so that all downstream steps operate on clean, consistent text. We also add deterministic helpers to normalize and inject quotes, ensuring that guardrails are always satisfied. check it out full code here.
async def fetch_many(urls: List[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -> Dict[str, str]:
headers = {"User-Agent": "Mozilla/5.0 (AgenticAI/4.2)"}
urls = [normalize_url(u) for u in urls]
urls = [u for u in urls if u.startswith("http")]
urls = list(dict.fromkeys(urls))
out: Dict[str, str] = {}
async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as client:
async def _one(url: str):
try:
r = await client.get(url)
r.raise_for_status()
out[url] = clean_html_to_text(r.text)[:per_url_char_limit]
except Exception as e:
out[url] = f"__FETCH_ERROR__ {type(e).__name__}: {e}"
await asyncio.gather(*[_one(u) for u in urls])
return out
def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
seen = set()
out = {}
for url, txt in sources.items():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
continue
h = sha1(txt[:25000])
if h in seen:
continue
seen.add(h)
out[url] = txt
return out
class ChunkRecord(BaseModel):
chunk_id: str
url: str
chunk_index: int
text: str
class RetrievalHit(BaseModel):
chunk_id: str
url: str
chunk_index: int
score_sparse: float = 0.0
score_dense: float = 0.0
score_fused: float = 0.0
text: str
class EvidencePack(BaseModel):
query: str
hits: List[RetrievalHit]
We fetch multiple web sources in parallel and aggressively deduplicate content to avoid redundant evidence. We convert raw pages into structured text and define core data models that represent fragmentation and recovery hits. We ensure that each piece of text can be traced back to a specific source and section index. check it out full code here.
EPISODE_DB = "agentic_episode_memory.db"
def episode_db_init():
con = sqlite3.connect(EPISODE_DB)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
question TEXT NOT NULL,
urls_json TEXT NOT NULL,
retrieval_queries_json TEXT NOT NULL,
useful_sources_json TEXT NOT NULL
)
""")
con.commit()
con.close()
def episode_store(question: str, urls: List[str], retrieval_queries: List[str], useful_sources: List[str]):
con = sqlite3.connect(EPISODE_DB)
cur = con.cursor()
cur.execute(
"INSERT INTO episodes(ts, question, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)",
(int(time.time()), question, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
)
con.commit()
con.close()
def episode_recall(question: str, top_k: int = 2) -> List[Dict[str, Any]]:
con = sqlite3.connect(EPISODE_DB)
cur = con.cursor()
cur.execute("SELECT ts, question, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200")
rows = cur.fetchall()
con.close()
q_tokens = set(re.findall(r"[A-Za-z]{3,}", (question or "").lower()))
scored = []
for ts, q2, u, rq, us in rows:
t2 = set(re.findall(r"[A-Za-z]{3,}", (q2 or "").lower()))
if not t2:
continue
score = len(q_tokens & t2) / max(1, len(q_tokens))
if score > 0:
scored.append((score, {
"ts": ts,
"question": q2,
"urls": json.loads(u),
"retrieval_queries": json.loads(rq),
"useful_sources": json.loads(us),
}))
scored.sort(key=lambda x: x[0], reverse=True)
return [x[1] for x in scored[:top_k]]
episode_db_init()
We introduce episodic memory supported by SQLite so that the system can remember what work was done in the previous run. We store questions, recovery strategies, and useful sources to guide future planning. We also apply lightweight similarity-based recall to bias the system toward historically dominant patterns. check it out full code here.
class HybridIndex:
def __init__(self):
self.records: List[ChunkRecord] = []
self.tfidf: Optional[TfidfVectorizer] = None
self.tfidf_mat = None
self.emb_mat: Optional[np.ndarray] = None
def build_sparse(self):
corpus = [r.text for r in self.records] if self.records else [""]
self.tfidf = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=80000)
self.tfidf_mat = self.tfidf.fit_transform(corpus)
def search_sparse(self, query: str, k: int) -> List[Tuple[int, float]]:
if not self.records or self.tfidf is None or self.tfidf_mat is None:
return []
qv = self.tfidf.transform([query])
sims = cosine_similarity(qv, self.tfidf_mat).flatten()
top = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in top]
def set_dense(self, mat: np.ndarray):
self.emb_mat = mat.astype(np.float32)
def search_dense(self, q_emb: np.ndarray, k: int) -> List[Tuple[int, float]]:
if self.emb_mat is None or not self.records:
return []
M = self.emb_mat
q = q_emb.astype(np.float32).reshape(1, -1)
M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
q_norm = q / (np.linalg.norm(q) + 1e-9)
sims = (M_norm @ q_norm.T).flatten()
top = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in top]
def rrf_fuse(rankings: List[List[int]], k: int = 60) -> Dict[int, float]:
scores: Dict[int, float] = {}
for r in rankings:
for pos, idx in enumerate(r, start=1):
scores[idx] = scores.get(idx, 0.0) + 1.0 / (k + pos)
return scores
HYBRID = HybridIndex()
ALLOWED_URLS: List[str] = []
EMBED_MODEL = "text-embedding-3-small"
async def embed_batch(texts: List[str]) -> np.ndarray:
resp = await oa.embeddings.create(model=EMBED_MODEL, input=texts, encoding_format="float")
vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)
async def embed_texts(texts: List[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
sem = asyncio.Semaphore(max_concurrency)
mats: List[Tuple[int, np.ndarray]] = []
async def _one(start: int, batch: List[str]):
async with sem:
m = await embed_batch(batch)
mats.append((start, m))
tasks = []
for start in range(0, len(texts), batch_size):
batch = [t[:7000] for t in texts[start:start + batch_size]]
tasks.append(_one(start, batch))
await asyncio.gather(*tasks)
mats.sort(key=lambda x: x[0])
emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
if emb.shape[0] != len(texts):
raise RuntimeError(f"Embedding rows mismatch: got {emb.shape[0]} expected {len(texts)}")
return emb
async def embed_query(query: str) -> np.ndarray:
m = await embed_batch([query[:7000]])
return m[0] if m.shape[0] else np.zeros((0,), dtype=np.float32)
async def build_index(urls: List[str], max_chunks_per_url: int = 60):
global ALLOWED_URLS
fetched = await fetch_many(urls)
fetched = dedupe_texts(fetched)
records: List[ChunkRecord] = []
allowed: List[str] = []
for url, txt in fetched.items():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
continue
allowed.append(url)
chunks = chunk_text(txt)[:max_chunks_per_url]
for i, ch in enumerate(chunks):
cid = f"{sha1(url)}:{i}"
records.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, text=ch))
if not records:
err_view = {normalize_url(u): fetched.get(normalize_url(u), "") for u in urls}
raise RuntimeError("No sources fetched successfully.\n" + json.dumps(err_view, indent=2)[:4000])
ALLOWED_URLS = allowed
HYBRID.records = records
HYBRID.build_sparse()
texts = [r.text for r in HYBRID.records]
emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
HYBRID.set_dense(emb)
We create a hybrid retrieval index that combines sparse TF-IDF search with dense OpenAI embeddings. We enable reciprocal rank fusion, so that sparse and dense signals complement each other rather than compete. We create the index once per run and reuse it across all retrieval queries for efficiency. check it out full code here.
def build_evidence_pack(query: str, sparse: List[Tuple[int,float]], dense: List[Tuple[int,float]], k: int = 10) -> EvidencePack:
sparse_rank = [i for i,_ in sparse]
dense_rank = [i for i,_ in dense]
sparse_scores = {i:s for i,s in sparse}
dense_scores = {i:s for i,s in dense}
fused = rrf_fuse([sparse_rank, dense_rank], k=60) if dense_rank else rrf_fuse([sparse_rank], k=60)
top = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]
hits: List[RetrievalHit] = []
for idx in top:
r = HYBRID.records[idx]
hits.append(RetrievalHit(
chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
score_sparse=float(sparse_scores.get(idx, 0.0)),
score_dense=float(dense_scores.get(idx, 0.0)),
score_fused=float(fused.get(idx, 0.0)),
text=r.text
))
return EvidencePack(query=query, hits=hits)
async def gather_evidence(queries: List[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
evidence: List[EvidencePack] = []
useful_sources_count: Dict[str, int] = {}
all_chunk_ids: List[str] = []
for q in queries:
sparse = HYBRID.search_sparse(q, k=sparse_k)
q_emb = await embed_query(q)
dense = HYBRID.search_dense(q_emb, k=dense_k)
pack = build_evidence_pack(q, sparse, dense, k=per_query_k)
evidence.append(pack)
for h in pack.hits[:6]:
useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
for h in pack.hits:
all_chunk_ids.append(h.chunk_id)
useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
all_chunk_ids = sorted(list(dict.fromkeys(all_chunk_ids)))
return evidence, useful_sources[:8], all_chunk_ids
class Plan(BaseModel):
objective: str
subtasks: List[str]
retrieval_queries: List[str]
acceptance_checks: List[str]
class UltraAnswer(BaseModel):
title: str
executive_summary: str
architecture: List[str]
retrieval_strategy: List[str]
agent_graph: List[str]
implementation_notes: List[str]
risks_and_limits: List[str]
citations: List[str]
sources: List[str]
def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: List[str]) -> UltraAnswer:
data = ans.model_dump()
data["citations"] = [canonical_chunk_id(x) for x in (data.get("citations") or [])]
data["citations"] = [x for x in data["citations"] if x in allowed_chunk_ids]
data["executive_summary"] = inject_exec_summary_citations(data.get("executive_summary",""), data["citations"], allowed_chunk_ids)
return UltraAnswer(**data)
def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: List[str]) -> None:
extras = [u for u in ans.sources if u not in ALLOWED_URLS]
if extras:
raise ValueError(f"Non-allowed sources in output: {extras}")
cset = set(ans.citations or [])
missing = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
if missing:
raise ValueError(f"Citations reference unknown chunk_ids (not retrieved): {missing}")
if len(cset) < 6:
raise ValueError("Need at least 6 distinct chunk_id citations in ultra mode.")
es_text = ans.executive_summary or ""
es_count = sum(1 for cid in cset if cid in es_text)
if es_count < 2:
raise ValueError("Executive summary must include at least 2 chunk_id citations verbatim.")
PLANNER = Agent(
name="Planner",
model="gpt-4o-mini",
instructions=(
"Return a technical Plan schema.\n"
"Make 10-16 retrieval_queries.\n"
"Acceptance must include: at least 6 citations and exec_summary contains at least 2 citations verbatim."
),
output_type=Plan,
)
SYNTHESIZER = Agent(
name="Synthesizer",
model="gpt-4o-mini",
instructions=(
"Return UltraAnswer schema.\n"
"Hard constraints:\n"
"- executive_summary MUST include at least TWO citations verbatim as: (cite: <chunk_id>).\n"
"- citations must be chosen ONLY from ALLOWED_CHUNK_IDS list.\n"
"- citations list must include at least 6 unique chunk_ids.\n"
"- sources must be subset of allowed URLs.\n"
),
output_type=UltraAnswer,
)
FIXER = Agent(
name="Fixer",
model="gpt-4o-mini",
instructions=(
"Repair to satisfy guardrails.\n"
"Ensure executive_summary includes at least TWO citations verbatim.\n"
"Choose citations ONLY from ALLOWED_CHUNK_IDS list.\n"
"Return UltraAnswer schema."
),
output_type=UltraAnswer,
)
session = SQLiteSession("ultra_agentic_user", "ultra_agentic_session.db")
We gather evidence by running multiple targeted queries, combining sparse and dense results, and assembling evidence packs with scores and provenance. We define strict schema for the plans and final answers, then normalize and validate the citations against the retrieved segment IDs. We enforce strict guardrails so that every answer remains grounded and listenable. check it out full code here.
async def run_ultra_agentic(question: str, urls: List[str], max_repairs: int = 2) -> UltraAnswer:
await build_index(urls)
recall_hint = json.dumps(episode_recall(question, top_k=2), indent=2)[:2000]
plan_res = await Runner.run(
PLANNER,
f"Question:\n{question}\n\nAllowed URLs:\n{json.dumps(ALLOWED_URLS, indent=2)}\n\nRecall:\n{recall_hint}\n",
session=session
)
plan: Plan = plan_res.final_output
queries = (plan.retrieval_queries or [])[:16]
evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)
evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)
draft_res = await Runner.run(
SYNTHESIZER,
f"Question:\n{question}\n\nAllowed URLs:\n{json.dumps(ALLOWED_URLS, indent=2)}\n\n"
f"ALLOWED_CHUNK_IDS:\n{allowed_chunk_ids_json}\n\n"
f"Evidence packs:\n{evidence_json}\n\n"
"Return UltraAnswer.",
session=session
)
draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)
last_err = None
for i in range(max_repairs + 1):
try:
validate_ultra(draft, allowed_chunk_ids)
episode_store(question, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
return draft
except Exception as e:
last_err = str(e)
if i >= max_repairs:
draft = normalize_answer(draft, allowed_chunk_ids)
validate_ultra(draft, allowed_chunk_ids)
return draft
fixer_res = await Runner.run(
FIXER,
f"Question:\n{question}\n\nAllowed URLs:\n{json.dumps(ALLOWED_URLS, indent=2)}\n\n"
f"ALLOWED_CHUNK_IDS:\n{allowed_chunk_ids_json}\n\n"
f"Guardrail error:\n{last_err}\n\n"
f"Draft:\n{json.dumps(draft.model_dump(), indent=2)[:12000]}\n\n"
f"Evidence packs:\n{evidence_json}\n\n"
"Return corrected UltraAnswer that passes guardrails.",
session=session
)
draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)
raise RuntimeError(f"Unexpected failure: {last_err}")
question = (
"Design a production-lean but advanced agentic AI workflow in Python with hybrid retrieval, "
"provenance-first citations, critique-and-repair loops, and episodic memory. "
"Explain why each layer matters, failure modes, and evaluation."
)
urls = [
"https://openai.github.io/openai-agents-python/",
"https://openai.github.io/openai-agents-python/agents/",
"https://openai.github.io/openai-agents-python/running_agents/",
"https://github.com/openai/openai-agents-python",
]
ans = await run_ultra_agentic(question, urls, max_repairs=2)
print("\nTITLE:\n", ans.title)
print("\nEXECUTIVE SUMMARY:\n", ans.executive_summary)
print("\nARCHITECTURE:")
for x in ans.architecture:
print("-", x)
print("\nRETRIEVAL STRATEGY:")
for x in ans.retrieval_strategy:
print("-", x)
print("\nAGENT GRAPH:")
for x in ans.agent_graph:
print("-", x)
print("\nIMPLEMENTATION NOTES:")
for x in ans.implementation_notes:
print("-", x)
print("\nRISKS & LIMITS:")
for x in ans.risks_and_limits:
print("-", x)
print("\nCITATIONS (chunk_ids):")
for c in ans.citations:
print("-", c)
print("\nSOURCES:")
for s in ans.sources:
print("-", s)
We orchestrate the full agentic loop by chaining planning, synthesis, verification, and repair in an async-safe pipeline. We automatically retry and fix the outputs until they pass all the obstacles without human intervention. We finish by running a full example and printing a fully grounded, production-ready Agentic response.
Finally, we developed a comprehensive agentive pipeline robust to common failure modes: unstable embedding sizes, citation drift, and missing grounding in the executive summary. We validated the output against allowed listed sources, retrieved section IDs, automatically normalized citations, and injected deterministic citations when necessary to guarantee compliance without sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic memory, we have created a reusable foundation that we can extend with robust evaluation (claim-to-evidence coverage scoring, adversarial red-teaming, and regression testing) to continually strengthen the system as it scales to new domains and larger corpora.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.