-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Describe the bug
I'm trying to use REST APIs that return a Multi<Message>
that produces RestMediaType.APPLICATION_NDJSON
.
@GET
@Path("/pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> readPojo() {
//
}
Quarkus works as intended if the Message is smaller than approximately 1000 bytes. However, if it is larger, org.jboss.resteasy.reactive.client.impl.MultiInvoker
feeds incomplete chunks into Jackson for deserialization, which of course fails.
MultiInvoker does not actually check for a newline delimiter in the provided chunk. If there is no newline, it defaults to using the size of the chunk.
Expected behavior
Quarkus should buffer read data until the delimiter is read, then parse the buffered content as a json object.
Actual behavior
Jackson fails to deserialize because the json content is not complete.
Caused by: jakarta.ws.rs.ProcessingException: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input in VALUE_STRING
How to Reproduce?
Test based on MultiNdjsonTest :
public class LargeNDJsonTest {
@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest();
@TestHTTPResource
URI uri;
@Test
void readLargePojo() throws InterruptedException {
var client = createClient(uri);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readPojo(1000).onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);
if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
assertThat(collected.size(), org.hamcrest.Matchers.is(4));
}
private Client createClient(URI uri) {
return QuarkusRestClientBuilder.newBuilder().baseUri(uri).build(Client.class);
}
@Path("/stream")
public interface Client {
@GET
@Path("/pojo/{size}")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readPojo(int size);
}
@Path("/stream")
public static class StreamingResource {
@Inject
Vertx vertx;
@GET
@Path("/pojo/{size}")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> readPojo(int size) {
return Multi.createFrom().emitter(
em -> {
Random random = new Random();
byte[] bytes = new byte[size];
random.nextBytes(bytes);
em.emit(Message.of("one", java.util.Base64.getEncoder().encodeToString(bytes)));
em.emit(Message.of("two", java.util.Base64.getEncoder().encodeToString(bytes)));
em.emit(Message.of("three", java.util.Base64.getEncoder().encodeToString(bytes)));
vertx.setTimer(100, id -> {
em.emit(Message.of("four", java.util.Base64.getEncoder().encodeToString(bytes)));
em.complete();
});
});
}
}
public record Message(String name, String value) {
public static Message of(String name, String value) {
return new Message(name, value);
}
}
}
If client.readPojo(1000)
is changed to client.readPojo(25)
then the test passes.
Output of uname -a
or ver
Linux magnusg-laptop 6.11.0-21-generic #21-Ubuntu SMP PREEMPT_DYNAMIC Wed Feb 19 16:50:40 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Output of java -version
openjdk 23.0.2 2025-01-21
Quarkus version or git rev
3.21.1
Build tool (ie. output of mvnw --version
or gradlew --version
)
Gradle 8.12.1
Additional information
No response