Create a User-Defined Function with Confluent Cloud for Apache Flink¶
A user-defined function (UDF) extends the capabilities of Confluent Cloud for Apache Flink® and enables you to implement custom logic beyond what is supported by SQL. For example, you can implement functions like encoding and decoding a string, performing geospatial calculations, encrypting and decrypting fields, or reusing an existing library or code from a third-party supplier.
Confluent Cloud for Apache Flink supports UDFs written in Java. Package your custom function and its dependencies into a JAR file and upload it as an artifact to Confluent Cloud. Register the function in a Flink database by using the CREATE FUNCTION statement, and invoke your UDF in Flink SQL or the Table API. Confluent Cloud provides the infrastructure to run your code.
For a list of cloud service providers and regions that support UDFs, see UDF regional availability.
The following steps show how to implement a simple user-defined scalar function, upload it to Confluent Cloud, and use it in a Flink SQL statement.
- Step 1: Build the uber jar
- Step 2: Upload the jar as a Flink artifact
- Step 3: Register the UDF
- Step 4: Use the UDF in a Flink SQL query
- Step 5: Implement UDF logging (optional)
- Step 6: Delete the UDF
After you build and run the scalar function, try building a table function.
For more code examples, see Flink UDF Java Examples.
Permanent and in-line UDFs¶
Starting with Confluent Table API plugin version 2.1-8, you can simplify the process of creating and managing UDFs.
- Permanent UDFs are registered automatically and can be used in any Flink SQL or Table API program. The Table API creates a temporary JAR file containing all transitive classes required to run the function, uploads it to Confluent Cloud, and registers the function using the previously uploaded artifact.
- In-line UDFs are defined and used in the same Table API program.
Note
Permanent and in-line UDFs are an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
The following example shows how to create and call a permanent UDF and an in-line UDF.
For the full code listing, see Example_09_Functions.java in the flink-table-api-java-examples repository.
Implement a permanent and in-line UDF
package io.confluent.flink.examples.table;
import io.confluent.flink.plugin.ConfluentSettings;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.array;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.row;
/**
* A table program example showing how to use User-Defined Functions
* (UDFs) in the Flink Table API.
*
* <p>The Flink Table API simplifies the process of creating and managing UDFs.
*
* <ul>
* <li>It helps creating a JAR file containing all required dependencies for a given UDF.
* <li>Uploads the JAR to Confluent artifact API.
* <li>Creates SQL functions for given artifacts.
* </ul>
*/
public class Example_09_Functions {
// Fill this with an environment you have write access to
static final String TARGET_CATALOG = "";
// Fill this with a Kafka cluster you have write access to
static final String TARGET_DATABASE = "";
// All logic is defined in a main() method. It can run both in an IDE or CI/CD system.
public static void main(String[] args) {
// Setup connection properties to Confluent Cloud
EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
// Initialize the session context to get started
TableEnvironment env = TableEnvironment.create(settings);
// Set default catalog and database
env.useCatalog(TARGET_CATALOG);
env.useDatabase(TARGET_DATABASE);
System.out.println("Registering a scalar function...");
// The Table API underneath creates a temporary JAR file containing all transitive classes
// required to run the function, uploads it to Confluent Cloud, and registers the function
// using the previously uploaded artifact.
env.createFunction("CustomTax", CustomTax.class, true);
// As of now, Scalar and Table functions are supported.
System.out.println("Registering a table function...");
env.createFunction("Explode", Explode.class, true);
// Once registered, the functions can be used in Table API and SQL queries.
System.out.println("Executing registered UDFs...");
env.fromValues(row("Apple", "USA", 2), row("Apple", "EU", 3))
.select(
$("f0").as("product"),
$("f1").as("location"),
$("f2").times(call("CustomTax", $("f1"))).as("tax"))
.execute()
.print();
env.fromValues(
row(1L, "Ann", array("Apples", "Bananas")),
row(2L, "Peter", array("Apples", "Pears")))
.joinLateral(call("Explode", $("f2")).as("fruit"))
.select($("f0").as("id"), $("f1").as("name"), $("fruit"))
.execute()
.print();
// Instead of registering functions permanently, you can embed UDFs directly into queries
// without registering them first. This will upload all the functions of the query as a
// single artifact to Confluent Cloud. Moreover, the functions lifecycle will be bound to
// the lifecycle of the query.
System.out.println("Executing inline UDFs...");
env.fromValues(row("Apple", "USA", 2), row("Apple", "EU", 3))
.select(
$("f0").as("product"),
$("f1").as("location"),
$("f2").times(call(CustomTax.class, $("f1"))).as("tax"))
.execute()
.print();
env.fromValues(
row(1L, "Ann", array("Apples", "Bananas")),
row(2L, "Peter", array("Apples", "Pears")))
.joinLateral(call(Explode.class, $("f2")).as("fruit"))
.select($("f0").as("id"), $("f1").as("name"), $("fruit"))
.execute()
.print();
}
/** A scalar function that calculates a custom tax based on the provided location. */
public static class CustomTax extends ScalarFunction {
public int eval(String location) {
if (location.equals("USA")) {
return 10;
}
if (location.equals("EU")) {
return 5;
}
return 0;
}
}
/** A table function that explodes an array of string into multiple rows. */
public static class Explode extends TableFunction<String> {
public void eval(List<String> arr) {
for (String i : arr) {
collect(i);
}
}
}
}
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
A provisioned Flink compute pool in Confluent Cloud.
Apache Maven software project management tool (see Installing Apache Maven)
Java 11 to Java 17
Sufficient permissions to upload and invoke UDFs in Confluent Cloud. For more information, see Flink RBAC.
If using the Table API only, Flink versions 1.18.x and 1.19.x of
flink-table-api-java
are supported.
Step 1: Build the uber jar¶
In this section, you compile a simple Java class, named
TShirtSizingIsSmaller
into a jar file. The project is based on the
ScalarFunction
class in the Flink Table API. The
TShirtSizingIsSmaller.java
class has an eval
function that compares
two T-shirt sizes and returns the smaller size.
Copy the following project object model into a file named pom.xml.
Important
You can’t use your own Flink-related jars. If you package Flink core dependencies as part of the jar, you may break the dependency.
Also, this example shows how to capture all dependencies greedily, possibly including more than needed. As an alternative, you can optimize on artifact size by listing all dependencies and including their transitive dependencies.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>udf_example</artifactId> <version>1.0</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency> <!-- Dependencies --> </dependencies> <build> <sourceDirectory>./example</sourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.6.0</version> <configuration> <artifactSet> <includes> <!-- Include all UDF dependencies and their transitive dependencies here. --> <!-- This example shows how to capture all of them greedily. --> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*</artifact> <excludes> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Create a directory named “example”.
mkdir example
In the
example
directory, create a file namedTShirtSizingIsSmaller.java
.touch example/TShirtSizingIsSmaller.java
Copy the following code into
TShirtSizingIsSmaller.java
.package com.example.my; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; import java.util.List; import java.util.stream.IntStream; /** TShirt sizing function for demo. */ public class TShirtSizingIsSmaller extends ScalarFunction { public static final String NAME = "IS_SMALLER"; private static final List<Size> ORDERED_SIZES = Arrays.asList( new Size("X-Small", "XS"), new Size("Small", "S"), new Size("Medium", "M"), new Size("Large", "L"), new Size("X-Large", "XL"), new Size("XX-Large", "XXL")); public boolean eval(String shirt1, String shirt2) { int size1 = findSize(shirt1); int size2 = findSize(shirt2); // If either can't be found just say false rather than throw an error if (size1 == -1 || size2 == -1) { return false; } return size1 < size2; } private int findSize(String shirt) { return IntStream.range(0, ORDERED_SIZES.size()) .filter( i -> { Size s = ORDERED_SIZES.get(i); return s.name.equalsIgnoreCase(shirt) || s.abbreviation.equalsIgnoreCase(shirt); }) .findFirst() .orElse(-1); } private static class Size { private final String name; private final String abbreviation; public Size(String name, String abbreviation) { this.name = name; this.abbreviation = abbreviation; } } }
Run the following command to build the jar file.
mvn clean package
Run the following command to check the contents of your jar.
jar -tf target/udf_example-1.0.jar | grep -i TShirtSizingIsSmaller
Your output should resemble:
com/example/my/TShirtSizingIsSmaller$Size.class com/example/my/TShirtSizingIsSmaller.class
Step 2: Upload the jar as a Flink artifact¶
You can use the Confluent Cloud Console, the Confluent CLI, or the REST API to upload your UDF.
- Log in to Confluent Cloud and navigate to your Flink workspace.
- Navigate to the environment where you want to run the UDF.
- Click Flink, in the Flink page, click Artifacts.
- Click Upload artifact to open the upload pane.
- In the Cloud provider dropdown, select AWS, and in the Region dropdown, select the cloud region.
- Click Upload your JAR file and navigate to the location of your
JAR file, which in the current example is
target/udf_example-1.0.jar
. - When your JAR file is uploaded, it appears in the Artifacts list. In the list, click the row for your UDF artifact to open the details pane.
Log in to Confluent Cloud.
confluent login --organization-id ${ORG_ID} --prompt
Run the following command to upload the jar to Confluent Cloud.
confluent flink artifact create udf_example \ --artifact-file target/udf_example-1.0.jar \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION} \ --environment ${ENV_ID}
Your output should resemble:
+--------------------+-------------+ | ID | cfa-ldxmro | | Name | udf_example | | Version | ver-81vxm5 | | Cloud | aws | | Region | us-east-1 | | Environment | env-z3q9rd | | Content Format | JAR | | Description | | | Documentation Link | | +--------------------+-------------+
Note the artifact ID and version of your UDTF, which in this example are
cfa-ldxmro
andver-81vxm5
, because you use them later to register the UDTF in Flink SQL and to manage it.Run the following command to view all of the available UDFs.
confluent flink artifact list \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION}
Your output should resemble:
ID | Name | Cloud | Region | Environment -------------+-------------+-------+-----------+-------------- cfa-ldxmro | udf_example | AWS | us-east-1 | env-z3q9rd
Run the following command to view the details of your UDF. You can use the artifact ID from the previous step or the artifact name to specify your UDF.
# use the artifact ID confluent flink artifact describe \ cfa-ldxmro \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION} # use the artifact name confluent flink artifact describe \ udf_example \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION}
Your output should resemble:
+--------------------+-------------+ | ID | cfa-ldxmro | | Name | udf_example | | Version | ver-81vxm5 | | Cloud | aws | | Region | us-east-1 | | Environment | env-z3q9rd | | Content Format | JAR | | Description | | | Documentation Link | | +--------------------+-------------+
Step 3: Register the UDF¶
UDFs are registered inside a Flink database, which means that you must specify the Confluent Cloud environment (Flink catalog) and Kafka cluster (Flink database) where you want to use the UDF.
You can use the Confluent Cloud Console, the Confluent CLI, the Confluent Terraform provider, or the REST API to register your UDF.
- In the Flink page, click Compute pools.
- In the tile for the compute pool where you want to run the UDF, click Open SQL workspace.
- In the Use catalog dropdown, select the environment where you want to run the UDF.
- In the Use database dropdown, select Kafka cluster that you want to run the UDF.
Run the following command to start the Flink shell.
confluent flink shell --environment ${ENV_ID} --compute-pool ${COMPUTE_POOL_ID}
Run the following statements to specify the catalog and database.
-- Specify your catalog. This example uses the default. USE CATALOG default;
Your output should resemble:
+---------------------+---------+ | Key | Value | +---------------------+---------+ | sql.current-catalog | default | +---------------------+---------+
Specify the database you want to use, for example,
cluster_0
.-- Specify your database. This example uses cluster_0. USE cluster_0;
Your output should resemble:
+----------------------+-----------+ | Key | Value | +----------------------+-----------+ | sql.current-database | cluster_0 | +----------------------+-----------+
In Cloud Console or the Confluent CLI, run the CREATE FUNCTION statement to register your UDF in the current catalog and database. Substitute your UDF’s value for
<artifact-id>
.CREATE FUNCTION is_smaller AS 'com.example.my.TShirtSizingIsSmaller' USING JAR 'confluent-artifact://<artifact-id>';
Your output should resemble:
Function 'is_smaller' created.
Step 4: Use the UDF in a Flink SQL query¶
Once it is registered, your UDF is available to use in queries.
Run the following statement to view the UDFs in the current database.
SHOW USER FUNCTIONS;
Your output should resemble:
+---------------+ | function name | +---------------+ | is_smaller | +---------------+
Run the following statement to create a
sizes
table.CREATE TABLE sizes ( `size_1` STRING, `size_2` STRING );
Run the following statement to populate the
sizes
table with values.INSERT INTO sizes VALUES ('XL', 'L'), ('small', 'L'), ('M', 'L'), ('XXL', 'XL');
Run the following statement to view the rows in the
sizes
table.SELECT * FROM sizes;
Your output should resemble:
size_1 size_2 XL L small L M L XXL XL
Run the following statement to execute the
is_smaller
function on the data in thesizes
table.SELECT size_1, size_2, is_smaller (size_1, size_2) AS is_smaller FROM sizes;
Your output should resemble:
size_1 size_2 is_smaller XL L FALSE small L TRUE M L TRUE XXL XL FALSE
Step 5: Implement UDF logging (optional)¶
If you want to log UDF status messages to a Kafka topic, follow the steps in Enable UDF Logging.
Step 6: Delete the UDF¶
When you’re finished using the UDF, you can delete it from the current database.
You can use the Confluent Cloud Console, the Confluent CLI, the Confluent Terraform provider, or the REST API to delete your UDF.
Drop the function¶
Run the following statement to remove the
is_smaller
function from the current database.DROP FUNCTION is_smaller;
Your output should resemble:
Function 'is_smaller' dropped.
Currently running statements are not affected and continue running.
Exit the Flink shell.
exit;
Delete the JAR artifact¶
- Navigate to the environment where your UDF is registered.
- Click Flink, and in the Flink page, click Artifacts.
- In the artifacts list, find the UDF you want to delete.
- In the Actions column, click the icon, and in the context menu, select Delete artifact.
- In the confirmation dialog, type “udf_example”, and click Confirm. The “Artifact deleted successfully” message appears.
Run the following command to delete the artifact form the environment.
confluent flink artifact delete \ <artifact-id> \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION}
You receive a warning about breaking Flink statements that use the artifact. Type “y” when you’re prompted to proceed.
Your output should resemble:
Deleted Flink artifact "<artifact-id>".
Implement a user-defined table function¶
In the previous steps, you implemented a UDF with a simple scalar function. Confluent Cloud for Apache Flink also supports user-defined table functions (UDTFs), which take multiple scalar values as input arguments and return multiple rows as output, instead of a single value.
The following steps show how to implement a simple UDTF, upload it to Confluent Cloud, and use it in a Flink SQL statement.
Step 1: Build the uber jar¶
In this section, you compile a simple Java class, named SplitFunction
into
a jar file, similar to the previous section. The class is based on the
TableFunction
class in the Flink Table API. The SplitFunction.java
class
has an eval
function that uses the Java split
method to break up a
string into words and returns the words as columns in a row.
In the
example
directory, create a file namedSplitFunction.java
.touch example/SplitFunction.java
Copy the following code into
SplitFunction.java
.package com.example.my; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.*; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*; @FunctionHint(output = @DataTypeHint("ROW<word STRING>")) public class SplitFunction extends TableFunction<Row> { public void eval(String str, String delimiter) { for (String s : str.split(delimiter)) { // use collect(...) to emit a row collect(Row.of(s)); } } }
Run the following command to build the jar file. You can use the POM file from the previous section.
mvn clean package
Run the following command to check the contents of your jar.
jar -tf target/udf_example-1.0.jar | grep -i SplitFunction
Your output should resemble:
com/example/my/SplitFunction.class
Step 2: Upload the UDTF jar as a Flink artifact¶
- Log in to Confluent Cloud and navigate to your Flink workspace.
- Navigate to the environment where you want to run the UDF.
- Click Flink, in the Flink page, click Artifacts.
- Click Upload artifact to open the upload pane.
- In the Cloud provider dropdown, select AWS, and in the Region dropdown, select the cloud region.
- Click Upload your JAR file and navigate to the location of your
JAR file, which in the current example is
target/udf_example-1.0.jar
. - When your JAR file is uploaded, it appears in the Artifacts list. In the list, click the row for your UDF artifact to open the details pane.
Log in to Confluent Cloud.
confluent login --organization-id ${ORG_ID} --prompt
Run the following command to upload the jar to Confluent Cloud.
confluent flink artifact create udf_table_example \ --artifact-file target/udf_example-1.0.jar \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION} \ --environment ${ENV_ID}
Your output should resemble:
+--------------------+-------------------+ | ID | cfa-l5xp82 | | Name | udf_table_example | | Version | ver-0x37m2 | | Cloud | aws | | Region | us-east-1 | | Environment | env-z3q9rd | | Content Format | JAR | | Description | | | Documentation Link | | +--------------------+-------------------+
Note the artifact ID and version of your UDTF, which in this example are
cfa-l5xp82
andver-0x37m2
, because you use them later to register the UDTF in Flink SQL and to manage it.
Step 3: Register the UDTF¶
In the Flink shell or the Cloud Console, specify the catalog and database (environment and cluster) where you want to use the UDTF, as you did in the previous section.
Run the CREATE FUNCTION statement to register your UDTF in the current catalog and database. Substitute your UDTF’s value for
<artifact-id>
.CREATE FUNCTION split_string AS 'com.example.my.SplitFunction' USING JAR 'confluent-artifact://<artifact-id>';
Your output should resemble:
Function 'split_string' created.
Step 4: Use the UDTF in a Flink SQL query¶
Once it is registered, your UDTF is available to use in queries.
Run the following statement to view the UDFs in the current database.
SHOW USER FUNCTIONS;
Your output should resemble:
+---------------+ | Function Name | +---------------+ | split_string | +---------------+
Run the following statement to execute the
split_string
function.SELECT * FROM (VALUES 'A;B', 'C;D;E;F') as T(f), LATERAL TABLE(split_string(f, ';'))
Your output should resemble:
f word A;B A A;B B C;D;E;F C C;D;E;F D C;D;E;F E C;D;E;F F
When you’re done with the example UDTF, drop the function and delete the JAR artifact as you did in Step 6: Delete the UDF.