-
Notifications
You must be signed in to change notification settings - Fork 38.6k
Description
Canceling a Subscription
(in reactive transactions) leads to a state where transaction cleanup happens asynchronously and detached from completion signals.
Consider the following code:
TransactionalOperator transactionalOperator = …;
DatabaseClient databaseClient = …;
JdbcTemplate jdbcTemplate = …;
Flux<Integer> integerFlux = transactionalOperator.execute(status -> {
return databaseClient
.execute("INSERT INTO legoset (idl) VALUES(42055)").fetch().rowsUpdated();
});
Mono<Integer> next = integerFlux.next();
next.as(StepVerifier::create).expectNext(1).verifyComplete();
assertThat(jdbcTemplate.queryForMap("SELECT id, name, manual FROM legoset")).hasEntrySatisfying("id",42055);
Initially, the table is empty and both, DatabaseClient
and JdbcTemplate
are configured to point to the same database.
The assertion with queryForMap
typically fails with EmptyResultDataAccessException
. This is, because calling integerFlux.next()
cancels the upstream subscription while emitting completion as soon as an element was emitted.
TransactionalOperator
and its backing ReactiveTransactionManager
implementations issue a commit to clean up the transaction that happens asynchronously, without an ability to await commit completion.
Not sure whether we can fix the problem at all or whether we can mitigate it. One approach could be with TransactionalOperator.transactional(Mono)
to cancel the innermost Publisher
and hand out a Mono
. This change would return a properly bounded Mono
and cancellation would happen on the innermost Publisher
and preventing cancellation of the Publisher
that is returned from TransactionalOperator
.
This ticket is an opportunity to discuss that effect and its potential impact on cancellation of Publishers
which are enhanced for transactions.
/cc @smaldini @simonbasle