Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>2.6.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>2.16.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>3.12.1</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>3.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>1.0.0.Beta1</smallrye-stork.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
Expand Down Expand Up @@ -135,7 +135,8 @@
<reactive-streams.version>1.0.3</reactive-streams.version>
<jboss-logging.version>3.4.2.Final</jboss-logging.version>
<mutiny.version>1.2.0</mutiny.version>
<kafka2.version>2.8.1</kafka2.version>
<kafka3.version>3.0.0</kafka3.version>
<snappy.version>1.1.8.1</snappy.version>
<zookeeper.version>3.5.7</zookeeper.version>
<!-- Scala is used by Kafka so we need to choose a compatible version -->
<scala.version>2.12.13</scala.version>
Expand Down Expand Up @@ -3821,7 +3822,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka2.version}</version>
<version>${kafka3.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
Expand All @@ -3836,17 +3842,17 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka2.version}</version>
<version>${kafka3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka2.version}</version>
<version>${kafka3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka2.version}</version>
<version>${kafka3.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
Expand Down
5 changes: 5 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ This will add the following to your `pom.xml`:
</dependency>
----

[NOTE]
====
The extension includes `kafka-clients` version 3.0.0 as a transitive dependency and is compatible with Kafka brokers version 2.x.
====

== Configuring Smallrye Kafka Connector

Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:
Expand Down
5 changes: 5 additions & 0 deletions extensions/kafka-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,6 @@ private void handleSnappy(BuildProducer<ReflectiveClassBuildItem> reflectiveClas
} else { // otherwise the native lib of the platform this build runs on
String dir = OSInfo.getNativeLibFolderPathForCurrentOS();
String snappyNativeLibraryName = System.mapLibraryName("snappyjava");
if (snappyNativeLibraryName.toLowerCase().endsWith(".dylib")) {
snappyNativeLibraryName = snappyNativeLibraryName.replace(".dylib", ".jnilib");
}
String path = root + dir + "/" + snappyNativeLibraryName;
nativeLibs.produce(new NativeImageResourceBuildItem(path));
}
Expand Down
5 changes: 5 additions & 0 deletions extensions/kafka-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.graalvm.nativeimage</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@ public void loadSnappy() {
String snappyNativeLibraryName = System.mapLibraryName("snappyjava");
String snappyNativeLibraryPath = "/org/xerial/snappy/native/" + OSInfo.getNativeLibFolderPathForCurrentOS();
boolean hasNativeLib = hasResource(snappyNativeLibraryPath + "/" + snappyNativeLibraryName);
if (!hasNativeLib) {
if (OSInfo.getOSName().equals("Mac")) {
// Fix for openjdk7 for Mac
String altName = "libsnappyjava.jnilib";
if (hasResource(snappyNativeLibraryPath + "/" + altName)) {
snappyNativeLibraryName = altName;
hasNativeLib = true;
}
}
}

if (!hasNativeLib) {
String errorMessage = String.format("no native library is found for os.name=%s and os.arch=%s", OSInfo.getOSName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import javax.ws.rs.PathParam;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
Expand All @@ -27,7 +28,7 @@ public class KafkaStreamsEndpoint {
private ReadOnlyKeyValueStore<Integer, Long> getCountstore() {
while (true) {
try {
return streams.store("countstore", QueryableStoreTypes.keyValueStore());
return streams.store(StoreQueryParameters.fromNameAndType("countstore", QueryableStoreTypes.keyValueStore()));
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
Expand Down