Resolve Statement Issues in Confluent Cloud for Apache Flink¶
Inefficient Flink SQL queries in Confluent Cloud for Apache Flink® can cause performance issues that impact your data processing pipeline. These inefficiencies can be identified early through warnings when you submit your query, or they become apparent later when your statement enters a DEGRADED state during execution.
This page explains how to identify and resolve query inefficiencies, providing a comprehensive approach to troubleshooting statement performance problems.
Statement enters DEGRADED state¶
When a Flink statement is unable to make consistent progress, it may enter a DEGRADED state. This typically occurs due to performance bottlenecks or resource constraints.
You may see the following error message:
Your |af| statement has entered a Degraded state because it is unable to make consistent progress. This can be caused by inefficient query logic or insufficient compute resources. Please review your statement for performance bottlenecks. If the issue persists, consider scaling your compute pool or contacting Confluent support for assistance.
To resolve a DEGRADED state, follow these steps:
- Check for Statement Advisor warnings: Review and resolve any warnings that were returned during query submission. If you’re unsure whether warnings were shown, run your query with the EXPLAIN statement to see if warnings are generated.
- Profile your query: Use the Query Profiler to identify performance bottlenecks and data flow issues in your statement.
- Review compute resources: Check if your compute pool has reached its
maximum CFU limit. If so, consider:
- Increasing the maximum CFU limit for your compute pool
- Moving the statement to a dedicated compute pool with more CFU capacity
- Optimizing your query to reduce resource consumption
- Optimize query logic: Based on the warnings and profiling results, implement the specific optimizations described in the following warning sections.
Primary key differs from derived upsert key¶
[Warning] The primary key "<pk_column>" does not match the upsert key "<upsert_key_column>" that is derived from the query. If the primary key and upsert key don't match, the system needs to add a state-intensive operation for correction, which can result in a DEGRADED statement and higher CFU consumption. If possible, revisit the table declaration with the primary key or change your query. For more information, see https://cnfl.io/primary_vs_upsert_key.
This warning occurs when you insert data into a table where the table’s
defined PRIMARY KEY
doesn’t align with the key columns derived from the
INSERT INTO ... SELECT
or CREATE TABLE ... AS SELECT
query’s grouping
or source. When the keys mismatch, Flink must introduce an expensive internal
operator (UpsertMaterialize
) to ensure correctness, which consumes more state
and resources.
The following example illustrates a query that triggers this warning:
-- Create a table to store customer total orders
CREATE TABLE customer_orders (
total_orders INT PRIMARY KEY NOT ENFORCED, -- Primary Key is total_orders
customer_name STRING
);
-- Insert aggregated order counts per customer
INSERT INTO customer_orders
SELECT
SUM(order_count), customer_name -- Upsert key derived from GROUP BY is customer_name
FROM ( VALUES
('Bob', 2), -- Bob placed 2 orders
('Alice', 1), -- Alice placed 1 order
('Bob', 2) -- Bob placed 2 more orders
) AS OrderData(customer_name, order_count)
GROUP BY customer_name;
To resolve this warning:
- Align Primary Key
- Modify the
PRIMARY KEY
definition in yourCREATE TABLE
statement to match the columns used to uniquely identify rows in yourINSERT
query (often theGROUP BY
columns). In the example above, changing the primary key tocustomer_name
resolves the warning. - Modify Query
- Adjust your
INSERT INTO ... SELECT
query so the selected columns or grouping aligns with the existing primary key definition. This might involve changing theGROUP BY
clause or the columns being selected. - Check for warnings
- If you’re unsure whether your query produces this warning, run it with the EXPLAIN statement to see if warnings are generated.
High state operator without state TTL¶
[Warning] Your query includes one or more highly state-intensive operators but does not set a time-to-live (TTL) value, which means that the system potentially needs to store an infinite amount of state. This can result in a DEGRADED statement and higher CFU consumption. If possible, change your query to use a different operator, or set a time-to-live (TTL) value. For more information, see https://cnfl.io/high_state_intensive_operators.
Certain SQL operations, like joins on unbounded streams or aggregations without windowing, require Flink to maintain internal state. If this state isn’t configured to expire (using a Time-To-Live or TTL setting), it can grow indefinitely, leading to excessive memory usage, performance degradation, and higher costs.
The following example illustrates a query that triggers this warning:
-- Joining two unbounded streams without TTL
SELECT c.*, o.*
FROM `examples`.`marketplace`.`clicks` c
INNER JOIN `examples`.`marketplace`.`orders` o
ON c.user_id = o.customer_id;
To resolve this warning:
- Set State TTL
- Configure a state time-to-live (TTL) for the table(s) involved in the stateful operation. This ensures that state older than the specified duration is automatically cleared. This can done for the full statement via SET ‘sql.state-ttl’ option or for individual tables via State TTL Hints.
- Use Windowed Operations
- If applicable, rewrite your query to use windowed operations, like windowed joins or windowed aggregations, instead of unbounded operations. Windows limit the amount of state required inherently.
- Refactor Query
- Analyze if the stateful operation is necessary or if the query logic can be changed to avoid large state requirements.
- Check for warnings
- If you’re unsure whether your query produces this warning, run it with the EXPLAIN statement to see if warnings are generated.
Missing window_start
or window_end
in GROUP BY for window aggregation¶
[Warning] Your query contains only "window_end" in the GROUP BY clause, with no corresponding "window_start". This means that the query is considered a regular aggregation query and not a windowed aggregation, which can result in unexpected, continuously updating output and higher CFU consumption. if you want a windowed aggregation in your query, ensure that you include both "window_start" and "window_end" in the GROUP BY clause. For more information, see https://cnfl.io/regular_vs_window_aggregation.
A similar warning appears if only window_start
is included without
window_end
.
When performing windowed aggregations, using functions like TUMBLE
,
HOP
, CUMULATE
, SESSION
, you typically group by the window
boundaries (window_start
and window_end
) along with any other grouping
keys. If you include only one of the window boundary columns. either
window_start
or window_end
, in the GROUP BY
clause, Flink interprets
this as a regular, non-windowed aggregation. This leads to continuously
updating results for each input row rather than a single result per window,
which is usually not the intended behavior and can consume more resources.
The following example illustrates a query that triggers this warning:
-- Incorrect GROUP BY for TUMBLE window
SELECT window_end, SUM(price) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES)
)
GROUP BY window_end; -- Missing window_start
To resolve this warning should it occur in a query:
- Include both window boundaries
- When performing windowed aggregations, ensure that your
GROUP BY
clause includes bothwindow_start
andwindow_end
. - Check for warnings
- If you’re unsure whether your query produces this warning, run it with the EXPLAIN statement to see if warnings are generated.
The following example shows the revised query that resolves this warning:
-- Correct GROUP BY for TUMBLE window
SELECT window_start, window_end, SUM(price) as `sum`
FROM TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end; -- Includes both window boundaries
Session window without a PARTITION BY key¶
[Warning] Your query uses a SESSION window without a PARTITION BY clause. This results in all data being processed by a single, non-parallel task, which can create a significant bottleneck, leading to poor performance and high resource consumption. To improve performance and enable parallel execution, specify a PARTITION BY key in your SESSION window. For more information, see https://cnfl.io/session_without_partioning.
When using a SESSION window, data is grouped into sessions based on periods of activity, which are separated by a specified gap of inactivity. If you don’t include a PARTITION BY clause, all data will be sent to a single, non-parallel task to correctly identify these sessions. This creates a significant performance bottleneck and prevents the query from scaling.
The following example shows a query that triggers this warning:
-- This query uses a SESSION window without a PARTITION BY key
SELECT *
FROM SESSION(
TABLE `examples`.`marketplace`.`orders`,
DESCRIPTOR($rowtime),
INTERVAL '5' MINUTES
);
To resolve this warning:
- Add a PARTITION BY key
- Modify your SESSION window definition to include a PARTITION BY clause. This partitions the data by the specified key(s), allowing the sessionization to be performed independently and in parallel for each partition. This is important for performance and scalability.
- Check for warnings
- If you’re unsure whether your query produces this warning, run it with the EXPLAIN statement to see if warnings are generated.
The following example shows the revised query that resolves the warning:
-- Corrected query with PARTITION BY to enable parallel execution
SELECT *
FROM SESSION(
TABLE `examples`.`marketplace`.`orders` PARTITION BY customer_id,
DESCRIPTOR($rowtime),
INTERVAL '5' MINUTES
);