Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions src/main/java/io/reactivesocket/internal/Responder.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void onNext(Frame requestFrame) {
} else if (requestFrame.getType() == FrameType.CANCEL) {
Subscription s;
synchronized (Responder.this) {
s = cancellationSubscriptions.get(requestFrame.getStreamId());
s = cancellationSubscriptions.get(streamId);
}
if (s != null) {
s.cancel();
Expand All @@ -268,7 +268,7 @@ public void onNext(Frame requestFrame) {
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
SubscriptionArbiter inFlightSubscription;
synchronized (Responder.this) {
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
inFlightSubscription = inFlight.get(streamId);
}
if (inFlightSubscription != null) {
long requestN = Frame.RequestN.requestN(requestFrame);
Expand Down Expand Up @@ -399,6 +399,7 @@ private Publisher<Frame> handleRequestResponse(
final RequestHandler requestHandler,
final Int2ObjectHashMap<Subscription> cancellationSubscriptions) {

final int streamId = requestFrame.getStreamId();
return child -> {
Subscription s = new Subscription() {

Expand All @@ -408,8 +409,6 @@ private Publisher<Frame> handleRequestResponse(
@Override
public void request(long n) {
if (n > 0 && started.compareAndSet(false, true)) {
final int streamId = requestFrame.getStreamId();

try {
Publisher<Payload> responsePublisher =
requestHandler.handleRequestResponse(requestFrame);
Expand Down Expand Up @@ -477,13 +476,13 @@ public void cancel() {

private void cleanup() {
synchronized(Responder.this) {
cancellationSubscriptions.remove(requestFrame.getStreamId());
cancellationSubscriptions.remove(streamId);
}
}

};
synchronized(Responder.this) {
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
cancellationSubscriptions.put(streamId, s);
}
child.onSubscribe(s);
};
Expand Down Expand Up @@ -541,7 +540,7 @@ private Publisher<Frame> _handleRequestStream(
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
final Int2ObjectHashMap<SubscriptionArbiter> inFlight,
final boolean allowCompletion) {

final int streamId = requestFrame.getStreamId();
return child -> {
Subscription s = new Subscription() {

Expand All @@ -556,7 +555,6 @@ public void request(long n) {
}
if (started.compareAndSet(false, true)) {
arbiter.addTransportRequest(n);
final int streamId = requestFrame.getStreamId();

try {
Publisher<Payload> responses =
Expand Down Expand Up @@ -630,14 +628,14 @@ public void cancel() {

private void cleanup() {
synchronized(Responder.this) {
inFlight.remove(requestFrame.getStreamId());
cancellationSubscriptions.remove(requestFrame.getStreamId());
inFlight.remove(streamId);
cancellationSubscriptions.remove(streamId);
}
}

};
synchronized(Responder.this) {
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
cancellationSubscriptions.put(streamId, s);
}
child.onSubscribe(s);

Expand Down Expand Up @@ -704,8 +702,9 @@ private Publisher<Frame> handleRequestChannel(Frame requestFrame,
Int2ObjectHashMap<SubscriptionArbiter> inFlight) {

UnicastSubject<Payload> channelSubject;
final int streamId = requestFrame.getStreamId();
synchronized(Responder.this) {
channelSubject = channels.get(requestFrame.getStreamId());
channelSubject = channels.get(streamId);
}
if (channelSubject == null) {
return child -> {
Expand All @@ -722,7 +721,6 @@ public void request(long n) {
}
if (started.compareAndSet(false, true)) {
arbiter.addTransportRequest(n);
final int streamId = requestFrame.getStreamId();

// first request on this channel
UnicastSubject<Payload> channelRequests =
Expand Down Expand Up @@ -816,14 +814,14 @@ public void cancel() {

private void cleanup() {
synchronized(Responder.this) {
inFlight.remove(requestFrame.getStreamId());
cancellationSubscriptions.remove(requestFrame.getStreamId());
inFlight.remove(streamId);
cancellationSubscriptions.remove(streamId);
}
}

};
synchronized(Responder.this) {
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
cancellationSubscriptions.put(streamId, s);
}
child.onSubscribe(s);

Expand All @@ -848,7 +846,7 @@ private void cleanup() {
// handle time-gap issues like this?
// TODO validate with unit tests.
return PublisherUtils.errorFrame(
requestFrame.getStreamId(), new RuntimeException("Channel unavailable"));
streamId, new RuntimeException("Channel unavailable"));
}
}
}
Expand Down