Create and Run Streaming Agents with Confluent Cloud
Confluent Intelligence provides a high-level way to build AI agents that can reason over streaming data and take actions through tool invocation. This guide explains how to use the declarative agent syntax to create powerful, intelligent streaming workflows.
Confluent Intelligence enables agents to handle complex, multi-step tasks that require both reasoning and external tool usage.
- Agent definition
A declarative specification of an AI agent that includes:
Model reference for reasoning
System prompt defining the agent’s role
Tools the agent can use
Configuration options
- Tool resources
Encapsulated capabilities that agents can invoke:
Function-based tools (Flink UDFs)
MCP-based tools (external services)
Custom tool implementations
Create an agent
The following example shows general steps for creating a simple customer support agent that can answer questions and look up account information. All steps are shown in Flink SQL syntax.
Step 1: Define tools
Create the tools your agent will use. A tool is a first-class resource in Confluent Cloud for Apache Flink®, and you create it by using the CREATE TOOL statement. This example creates a function-based tool for account lookup and an MCP-based tool for external services.
-- Create a function for account lookup.
CREATE FUNCTION lookup_account
USING JAR 'account-lookup.jar'
COMMENT 'Function to lookup customer account information';
-- Create a tool based on the function.
CREATE TOOL account_lookup_tool
USING FUNCTION lookup_account
WITH (
'type' = 'function',
'description' = 'Lookup customer account information by ID'
);
-- Create a connection to an external MCP server.
CREATE CONNECTION support_mcp
WITH (
'type' = 'mcp_server',
'endpoint' = 'https://support-api.example.com',
'api-key' = '<your-api-key>'
);
-- Create an MCP tool based on the connection.
CREATE TOOL support_api_tool
USING CONNECTION support_mcp
WITH (
'type' = 'mcp',
'allowed_tools' = 'create_ticket,update_ticket,search_kb',
'request_timeout' = '30'
);
Step 2: Create the agent
Define the agent with its model, prompt, and tools. This example creates a model that uses the OpenAI API and an agent that uses the model and the tools you created in the previous step.
-- Create a model that uses the OpenAI API.
CREATE MODEL support_llm
INPUT (message STRING)
OUTPUT (response STRING)
WITH (
'provider' = 'openai',
'model' = 'gpt-4o',
'openai.connection' = 'your-openai-connection'
);
-- Create an agent that uses the model and tools.
CREATE AGENT customer_support_agent
USING MODEL support_llm
USING PROMPT 'You are a helpful customer support agent. Use the available tools to look up account information and create support tickets when needed.'
USING TOOLS account_lookup_tool, support_api_tool
WITH (
'max_iterations' = '5'
);
Step 3: Execute the agent
Run the agent on streaming data. This example creates an input table for customer queries and an output table for agent responses.
-- Create an input table for customer queries.
CREATE TABLE customer_queries (
query_id STRING,
customer_id STRING,
message STRING,
timestamp TIMESTAMP(3)
);
-- Execute the agent on streaming data.
SELECT query_id, customer_id, message, agent_output FROM customer_queries,
LATERAL TABLE(AI_RUN_AGENT(`customer_support_agent`, `message`, `query_id`));
Configure a Streaming Agent
Streaming Agents support various configuration options to customize their behavior.
- Options
max_iterations: Maximum loop iterations (default: 10)max_consecutive_failures: Maximum consecutive failures (optional, default: 3)
This example shows how to use advanced options.
CREATE AGENT advanced_agent
USING MODEL my_model
USING PROMPT 'You are an advanced AI agent...'
USING TOOLS tool1, tool2
WITH (
'max_iterations' = '8',
'max_consecutive_failures' = '5'
);
Tool types
Streaming Agents support function-based and MCP-based tools.
Function-based tools
A function-based tool wraps a Flink SQL user-defined function (UDF) and executes locally in the Flink runtime.
Function-based tools have these benefits:
Low-latency execution
No external dependencies
Full control over implementation
The following example shows how to create a function-based tool based on a UDF.
-- Create a UDF.
CREATE FUNCTION calculate_discount
USING JAR 'pricing.jar'
COMMENT 'Calculate discount based on customer tier';
-- Create tool based on the UDF.
CREATE TOOL discount_calculator
USING FUNCTION calculate_discount
WITH (
'type' = 'function',
'description' = 'Calculate customer discount percentage'
);
MCP-based tools
An MCP-based tool connects to external services.
MCP-based tools have these benefits:
Access to external APIs
Standardized tool interface
Easy integration with existing services
The following example shows how to create an MCP tool based on a connection resource.
-- Create a connection to an external MCP server.
CREATE CONNECTION external_api
WITH (
'type' = 'mcp_server',
'endpoint' = 'https://api.example.com',
'api-key' = '<your-api-key>'
);
-- Create an MCP tool based on the connection.
CREATE TOOL external_service
USING CONNECTION external_api
WITH (
'type' = 'mcp',
'allowed_tools' = 'get_weather,get_stock_price',
'request_timeout' = '30'
);
Best practices
Follow these best practices for designing and implementing agents and tools.
- Agent design
Keep prompts clear and specific.
Limit the number of tools per agent.
Use descriptive tool names and descriptions.
Set appropriate max_iterations based on task complexity.
- Tool design
Make tools idempotent when possible.
Provide clear error messages.
Use appropriate timeouts.
Handle edge cases gracefully.
- Performance
Use function-based tools for high-frequency operations.
Cache frequently accessed data.
Monitor agent execution times.
Use appropriate partitioning for large datasets.
- Security
Store API keys in secure connections.
Validate tool inputs.
Implement proper access controls.
Monitor tool usage and costs.
Common patterns
The following examples show common patterns for building agents.
Multi-agent workflows
For complex tasks, chain multiple agents together. The following example chains two agents together. The first agent classifies the request, and the second agent routes the request to the appropriate specialist.
-- First agent: Classify the request
CREATE AGENT classifier
USING MODEL my_model
USING PROMPT 'Classify customer requests into categories'
WITH ('max_iterations' = '3');
-- Second agent: Route the request to the appropriate specialist.
CREATE AGENT router
USING MODEL my_model
USING PROMPT 'Route classified requests to appropriate specialists'
USING TOOLS routing_tool
WITH ('max_iterations' = '5');
Error Handling
Implement robust error handling in your agents. The following example creates an agent that handles errors gracefully.
CREATE AGENT robust_agent
USING MODEL my_model
USING PROMPT 'You are a robust agent that handles errors gracefully. If a tool fails, try alternative approaches or ask for clarification.'
USING TOOLS primary_tool, fallback_tool
WITH (
'max_iterations' = '10',
'max_consecutive_failures' = '5'
);
Troubleshooting
The following are common issues and possible solutions.
- Agent not responding
Check model configuration and API keys.
Verify tool connections are working.
Increase max_iterations if needed.
Check for infinite loops in logic.
- Tool calls failing
Verify tool definitions are correct
Check connection configurations.
Ensure proper error handling in tools.
Monitor tool execution logs.
- Performance issues
Use function-based tools for frequent operations
Optimize tool implementations.
Consider caching strategies.
Monitor resource usage.
Debugging tips
Use replay functionality to debug agent behavior.
Check agent execution logs for detailed information.
Test tools independently before using in agents.
Use smaller
max_iterationsduring development.