Quick Answer: Why Agents Go Stale
An AI agent answers from the context it retrieves. If that retrieval index updates nightly, the agent is always reasoning over yesterday’s world. A customer asks “is this in stock?” at 3 PM. The index was rebuilt at 2 AM. The agent confidently says yes. It sold out at 9 AM.
That’s not a hallucination. That’s a stale retrieval layer — and it’s the most common production failure in deployed agents.
Key Takeaways:
- The model isn’t the problem — The retrieval index is. Your LLM is fine. Your data pipeline is the bottleneck.
- Nightly batch = 24-hour blind spot — Every event that happens after the last sync is invisible to your agent.
- Real-time connectors close the gap to 30 seconds — Events stream into the retrieval index as they occur.
- You probably already have a real-time connector — Segment, RudderStack, Amplitude, Kafka, Kinesis, or direct API push all work out of the box with Shaped.
- The fix is in the data layer, not the model — You don’t retrain the LLM. You re-architect how context reaches it.
What You’ll Learn
- The stale context problem — Why batch indexing breaks agents in production
- What real-time retrieval actually requires — Events, streams, and 30-second indexing
- Building it the traditional way — Kafka consumer + embedding pipeline + vector DB (and all the glue)
- Building it with Shaped — All your real-time connection options, from Kafka to Segment to direct API push
The Stale Context Problem
Imagine a customer support agent for an e-commerce platform. Users ask:
- “Is this item available in my size?”
- “Where is my order right now?”
- “Is the flash sale still running?”
Every one of these questions depends on live state — inventory, shipment location, active promotions. But here’s how most production agent architectures handle that state:
The index is rebuilt once a night. Everything that happens between rebuilds is invisible to the agent. A product sells out at 11 AM. A user asks at 3 PM. The agent — working from a 2 AM snapshot — says it’s available. That answer is confidently, architecturally wrong.
Why Batch Breaks Agents
This isn’t an edge case. It’s the default failure mode of deployed agents, and it compounds:
Confident wrongness is worse than uncertainty. An agent that says “I’m not sure” is recoverable. An agent that says “Yes, that’s in stock!” when it isn’t has already damaged trust. Users don’t blame the index — they blame the product.
Time-sensitive context has no grace period. A flash sale that starts at noon, a stock alert, a breaking news story, an order status change — these aren’t facts that can wait 12 hours to become retrievable. The value is entirely in the immediacy.
The gap compounds in practice. Nightly batch means 24-hour worst case. But real-world pipelines have ETL failures, transform jobs that run long, retry logic that adds hours. Production lag is rarely the theoretical minimum.
Support escalations spike. When agents give outdated answers, users escalate to humans — exactly what the AI was supposed to prevent. The cost of a stale index isn’t abstract: it shows up directly in support ticket volume.
What Real-Time Retrieval Actually Requires
The fix isn’t a better model or more prompt engineering. It’s shrinking the gap between an event occurring in the world and that event becoming retrievable context:
This requires three things to work together:
1. A streaming event source. Something that emits events as they happen: Kafka, Kinesis, Segment, RudderStack, Amplitude, PostHog, or a direct API push. Not a database that gets dumped once a day.
2. A retrieval engine that subscribes to the stream. Not a vector database with a nightly import script — a system that consumes events continuously and updates its index in near-real-time.
3. A query interface that reflects the latest index. So when your agent retrieves context, it gets state as of 30 seconds ago, not 14 hours ago.
Part 1: The Traditional Stack
Getting live events into an agent’s retrieval layer without managed infrastructure means building and operating this yourself:
Every arrow is a system you build, deploy, and keep alive. Here’s what that looks like in practice.
Step 1: Kafka Consumer
A Kafka consumer isn’t a script — it’s a long-running process that needs to be deployed, monitored, and kept alive. You’re responsible for offset management, consumer group rebalancing, and handling partition failures.
# kafka_consumer_service.py
from kafka import KafkaConsumer
from openai import OpenAI
import pinecone
import json
consumer = KafkaConsumer(
'product-events',
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
group_id='agent-retrieval-indexer',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
openai_client = OpenAI()
pinecone_index = pinecone.Index('product-catalog')
for message in consumer:
event = message.value
if event['event_type'] not in ['product_updated', 'inventory_changed', 'promotion_started']:
continue
text = f"""
Product: {event['product_name']}
Status: {event['status']}
Inventory: {event['inventory_count']} units
Price: ${event['price']}
Tags: {', '.join(event.get('tags', []))}
"""
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
embedding = response.data[0].embedding
pinecone_index.upsert(vectors=[{
'id': event['product_id'],
'values': embedding,
'metadata': {
'product_name': event['product_name'],
'status': event['status'],
'inventory_count': event['inventory_count'],
'price': event['price'],
'updated_at': event['timestamp']
}
}])
Calling the embedding API inside a consumer loop creates a hard dependency: if the API is slow, the consumer slows. If the API rate-limits you, the consumer backs up. At volume, you need a separate batching layer just to handle embedding throughput.
Step 2: Kinesis Consumer (AWS)
If you’re on AWS, you’re polling Kinesis instead. Different API, same fundamental problem — a long-running process you have to operate:
# kinesis_consumer_service.py
import boto3
import json
import time
import base64
kinesis = boto3.client('kinesis', region_name='us-east-1')
def consume_stream(stream_name: str):
stream_info = kinesis.describe_stream(StreamName=stream_name)
shards = stream_info['StreamDescription']['Shards']
shard_iterators = {
shard['ShardId']: kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard['ShardId'],
ShardIteratorType='LATEST'
)['ShardIterator']
for shard in shards
}
while True:
for shard_id, iterator in shard_iterators.items():
response = kinesis.get_records(ShardIterator=iterator, Limit=100)
for record in response['Records']:
data = json.loads(
base64.b64decode(record['Data']).decode('utf-8')
)
process_and_embed(data) # embed + upsert to vector DB
shard_iterators[shard_id] = response['NextShardIterator']
time.sleep(1)
Step 3: Agent Retrieval
With events finally in the vector DB, your agent retrieval looks like this:
# agent_retrieval.py
from openai import OpenAI
import pinecone
openai_client = OpenAI()
pinecone_index = pinecone.Index('product-catalog')
def retrieve_context(query: str, top_k: int = 5) -> list[dict]:
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = response.data[0].embedding
results = pinecone_index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
return [{'content': match.metadata, 'score': match.score}
for match in results.matches]
def agent_response(user_question: str) -> str:
context = retrieve_context(user_question)
context_str = "\n".join([
f"- {item['content']['product_name']}: "
f"{item['content']['inventory_count']} in stock, "
f"${item['content']['price']}, status: {item['content']['status']}"
for item in context
])
messages = [
{
"role": "system",
"content": (
"You are a helpful shopping assistant. "
"Use only the context below. If it isn't here, say so.\n\n"
f"Live product context:\n{context_str}"
)
},
{"role": "user", "content": user_question}
]
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
What You’re Actually Operating
| Component | What it is | Failure mode |
|---|---|---|
| Consumer service | Long-running process (Kafka or Kinesis) | Offset drift, OOM, silent death |
| Embedding pipeline | API calls per event + batching logic | Rate limits, backpressure |
| Vector database | Pinecone / Weaviate cluster | Index corruption, cold-start latency |
| Deployment | Docker / Kubernetes | Consumer dies without health checks |
| Monitoring | Prometheus + Grafana | Lag invisible without custom alerting |
Infrastructure cost: ~$1,500–$3,000/month at moderate scale. Time to production: 6–10 weeks for an experienced team.
Part 2: The Shaped Way — All Your Real-Time Options
Shaped replaces the entire consumer-embedder-vector DB pipeline with a single connector declaration. Define your schema, point it at your event source, and Shaped handles ingestion, embedding, and indexing — with events appearing in the retrieval index within 30 seconds.
The architecture collapses to this:
The key insight: you probably already have a real-time event source. Shaped has a native connector for it.
Option 1: Kafka
If you have a Kafka cluster, define the schema and point Shaped at your topic. Shaped provisions its own consumer group and handles offset management automatically.
# product_events_kafka.yaml
name: product_events
schema_type: KAFKA
unique_keys: [product_id]
column_schema:
product_id: String
product_name: String
status: String
inventory_count: Int32
price: Float64
tags: Array(String)
updated_at: DateTime
bootstrap_server: xxxxxxx.us-east-2.aws.confluent.cloud:9092
topic: product-events
username: YOUR_SASL_USERNAME
password: YOUR_SASL_PASSWORD
shaped create-table --file product_events_kafka.yaml
No consumer code to write, deploy, or monitor. Shaped creates a consumer group in the format shaped-*, subscribes to your topic, and begins indexing immediately.
Option 2: Kinesis
If you’re on AWS with Kinesis, the schema is nearly identical. Shaped provisions the stream and provides the ARN and IAM role — you send events using the standard AWS SDK or Shaped’s Table Insert API.
# product_events_kinesis.yaml
name: product_events
schema_type: KINESIS
unique_keys: [product_id]
column_schema:
product_id: String
product_name: String
status: String
inventory_count: Int32
price: Float64
tags: Array(String)
updated_at: DateTime
tenant_aws_account_id: "123456789012"
shaped create-table --file product_events_kinesis.yaml
When the table reaches ACTIVE, Shaped provides the Kinesis Stream ARN and Access Role ARN. Wire those into your application using the AWS Kinesis SDK and events begin flowing immediately.
Option 3: Segment
If you’re already using Segment to track user events, you have real-time context. One YAML declaration, then point your existing Segment destination at the stream Shaped provisions — no new event pipeline required.
# segment_events.yaml
name: segment_events
schema_type: SEGMENT
shaped create-table --file segment_events.yaml
shaped view-table --table-name segment_events
Once the table reaches ACTIVE, retrieve the Kinesis Stream ARN and IAM Role ARN.
Then in your Segment dashboard: add Amazon Kinesis as a destination, paste in the Stream Name and IAM Role ARN from the output above, and enable. Every Segment event now flows into Shaped within 30 seconds. Your existing event tracking becomes your agent’s live context — no new infrastructure.
Option 4: RudderStack
Same pattern as Segment. If RudderStack is your CDP, one declaration connects it:
# rudderstack_events.yaml
name: rudderstack_events
schema_type: RUDDERSTACK
description: Events including clicks, likes and comments
shaped create-table --file rudderstack_events.yaml
Retrieve the stream details, then in RudderStack: navigate to Cloud Destinations, add Amazon Kinesis, paste the Stream Name and IAM Role ARN, and select which events to forward. Done.
Option 5: Amplitude, PostHog, Snowplow
These platforms connect via Shaped’s managed Kinesis infrastructure — the same 30-second latency, zero consumer code. Amplitude and PostHog expose a Kinesis destination in their settings panels. Snowplow forwards events via Snowbridge or a custom Kinesis Client Library application. In each case: create a table with the appropriate schema_type, retrieve the ARN details, and configure the destination in the platform UI.
Option 6: Direct API Push (No Kafka or Kinesis Required)
If you don’t use any of the above platforms, you can push events directly to Shaped via the Table Insert API. No Kafka cluster, no Kinesis stream, no infrastructure to manage.
Define a custom table:
# custom_events.yaml
name: product_events
schema_type: CUSTOM
unique_keys: [product_id]
column_schema:
product_id: String
product_name: String
status: String
inventory_count: Int32
price: Float64
tags: Array(String)
updated_at: DateTime
shaped create-table --file custom_events.yaml
curl -X POST "https://api.shaped.ai/v2/tables/product_events/table_insert" \
-H "x-api-key: YOUR_API_KEY" \
-d '[
{
"product_id": "prod_456",
"product_name": "Running Shoe - Size 10",
"status": "in_stock",
"inventory_count": 3,
"price": 129.99,
"tags": ["running", "footwear", "sale"],
"updated_at": "2025-02-17T14:32:00Z"
}
]'
Call this endpoint from any service that produces state changes — your order management system, your inventory service, your promotion engine. Each call updates the retrieval index within 30 seconds. No stream infrastructure needed at all.
The Engine and Agent Query (Same for All Options)
However you connect your event source, the engine config and agent query are identical. Define the engine once:
# agent_context_engine.yaml
name: agent_context_engine
data:
item_table:
name: product_events
type: table
user_table:
name: users
type: table
interaction_table:
name: user_sessions
type: table
index:
embeddings:
- name: product_semantic_index
encoder:
type: hugging_face
model_name: sentence-transformers/all-MiniLM-L6-v2
source_column: product_name
training:
models:
- name: relevance
policy_type: elsa
shaped create-engine --file agent_context_engine.yaml
Then query from your agent — one REST call, always reflecting the latest indexed state:
# agent_with_shaped.py
import requests
import os
from openai import OpenAI
SHAPED_API_KEY = os.environ['SHAPED_API_KEY']
openai_client = OpenAI()
def retrieve_context(query: str, user_id: str, top_k: int = 5) -> list[dict]:
response = requests.post(
"https://api.shaped.ai/v2/engines/agent_context_engine/query",
headers={
"x-api-key": SHAPED_API_KEY,
"Content-Type": "application/json"
},
json={
"query": """
SELECT
product_id,
product_name,
status,
inventory_count,
price,
updated_at
FROM text_search(
mode='vector',
text_embedding_ref='product_semantic_index',
input_text_query=$query,
limit=20
)
WHERE status != 'discontinued'
ORDER BY score(
expression='retrieval.score',
input_user_id=$user_id
)
LIMIT $top_k
""",
"parameters": {
"query": query,
"user_id": user_id,
"top_k": top_k
}
}
)
return response.json().get("items", [])
def agent_response(user_question: str, user_id: str) -> str:
context_items = retrieve_context(user_question, user_id)
context_str = "\n".join([
f"- {item['product_name']}: "
f"{item['inventory_count']} in stock, "
f"${item['price']}, status: {item['status']}"
for item in context_items
])
messages = [
{
"role": "system",
"content": (
"You are a helpful shopping assistant. "
"Use only the context below. If it isn't here, say so.\n\n"
f"Live product context (updated in real-time):\n{context_str}"
)
},
{"role": "user", "content": user_question}
]
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
Mixing Real-Time and Batch
Not everything needs to stream. Your inventory events need 30-second freshness. Your product catalog descriptions and user preference history can batch-sync every 15 minutes — and that’s fine.
Shaped lets you mix both in the same engine. Use real-time connectors for state that changes the answer (inventory, order status, active promotions). Use batch connectors for reference data that doesn’t (product metadata, historical preferences, user profiles):
# hybrid_engine.yaml
name: agent_context_engine
data:
item_table:
name: product_events # Real-time: Kafka/Kinesis/Segment — 30s latency
type: table
user_table:
name: users # Batch: PostgreSQL — 15min latency, fine for profiles
type: table
interaction_table:
name: user_sessions # Real-time: Segment/RudderStack — 30s latency
type: table
The rule: if the event changes the correct answer to a user’s question, it needs to be real-time. If it doesn’t, batch is cheaper and simpler.
Comparison: Traditional vs Shaped
| Component | Traditional Stack | Shaped |
|---|---|---|
| Kafka ingestion | Custom consumer service (Python, 24/7) | schema_type: KAFKA in YAML |
| Kinesis ingestion | Custom polling service (boto3, 24/7) | schema_type: KINESIS in YAML |
| Segment ingestion | N/A — would need custom Kinesis consumer | schema_type: SEGMENT, configure destination |
| RudderStack ingestion | N/A — would need custom Kinesis consumer | schema_type: RUDDERSTACK, configure destination |
| No stream infra | Build Kafka/Kinesis from scratch | Direct API push via Table Insert endpoint |
| Embedding | OpenAI API calls per event + batching logic | Automatic via encoder in engine config |
| Vector DB | Pinecone / Weaviate cluster to operate | Unified in Shaped engine |
| Index freshness | Depends on consumer throughput + API rate limits | Within 30 seconds, guaranteed |
| Agent query | Embed query → vector DB search (two round trips) | Single REST call to Shaped |
| Infrastructure cost | ~$1,500–3,000/month | ~$300–500/month |
| Code to maintain | ~800 lines (consumer + embedder + retrieval) | ~50 lines (YAML + query) |
| Time to production | 6–10 weeks | 1–2 weeks |
FAQ
Q: What real-time connectors does Shaped support? A: Kafka, Kinesis, Segment, RudderStack, Amplitude, PostHog, and Snowplow all have native connectors. If your platform isn’t listed, the Custom connector lets you push events directly via the Table Insert API — no stream infrastructure needed.
Q: How does Shaped achieve 30-second indexing latency? A: Real-time connectors run a continuous consumer (for Kafka) or managed stream (for Kinesis-based connectors) that processes events as they arrive. The 30-second window covers ingestion, embedding generation, and index update. This is contrasted with batch connectors, which sync every 15 minutes.
Q: I already use Segment to track user events. Do I need to build anything new? A: No. Create a Shaped table with schema_type: SEGMENT, retrieve the Kinesis Stream ARN and IAM Role ARN from the table details, and add Amazon Kinesis as a destination in your Segment dashboard. Your existing event tracking becomes your agent’s live retrieval context.
Q: Do I need a Kafka cluster or AWS account to get real-time indexing? A: No. If you don’t have Kafka or Kinesis, use the Custom connector and the Table Insert API to push events directly to Shaped from any service. This is the simplest path — no stream infrastructure at all.
Q: Does Shaped manage Kafka consumer groups and offset tracking? A: Yes. When you create a table with schema_type: KAFKA, Shaped provisions its own consumer group (format shaped-*) and handles offset management, rebalancing, and failure recovery automatically.
Q: Can I apply filters or transformations to the stream before it hits the index? A: Real-time connectors stream raw events into Shaped tables. Transformations happen in SQL Views on top of those tables, which the engine then trains on. You cannot attach query logic directly to the real-time stream — filtering and transformation must happen in views or at query time via ShapedQL.
Q: When should I use real-time vs batch connectors? A: Use real-time connectors for anything that changes the correct answer to a user’s question: inventory levels, order status, pricing, active promotions, breaking news. Use batch connectors for things that don’t change minute-to-minute: product descriptions, user preference history, reference data. You can use both in the same engine.
Q: How does this compare to nightly batch indexing? A: Nightly batch means your agent is working from data that’s up to 24 hours old. With real-time connectors, the worst-case lag is 30 seconds. For time-sensitive contexts — stock availability, delivery status, live promotions — that difference is the gap between an agent that helps and one that actively misleads users.
Conclusion: The Data Layer Is the Agent Layer
Most teams debug agent quality at the model level — better prompts, smarter chunking, larger context windows. What they miss is that the retrieval layer is fetching yesterday’s facts and presenting them as today’s truth.
The fix isn’t a better model. It’s connecting your agent’s retrieval index to the event streams that describe the world as it is right now.
Whatever your event infrastructure looks like — Kafka, Kinesis, Segment, RudderStack, or none of the above — Shaped has a connector for it. One YAML declaration connects your live event stream to a retrieval engine that updates within 30 seconds. Your agent stops reasoning from a snapshot and starts reasoning from a stream.
That’s not an incremental improvement. It’s the difference between an agent that knows what’s in stock and one that sends your customers to an empty shelf.
Related Resources
- Shaped Connector Types: Real-Time vs Batch
- Kafka Connector Reference
- Kinesis Connector Reference
- Segment Connector Reference
- RudderStack Connector Reference
- Custom Connector / Table Insert API
- ShapedQL Query Reference
Ready to wire your event stream into a real-time retrieval engine?Sign up for Shaped with $300 in free credits.