Skip to content

Commit 5ddb28e

Browse files
authored
Enhanced .close() in WebsocketGatewaClientTransport
2 parents bbcd45a + 0893cf1 commit 5ddb28e

File tree

1 file changed

+5
-17
lines changed

1 file changed

+5
-17
lines changed

services-gateway/src/main/java/io/scalecube/services/gateway/client/websocket/WebsocketGatewayClientTransport.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.netty.channel.ChannelOption;
55
import io.netty.handler.codec.http.HttpHeaderNames;
66
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
7+
import io.netty.resolver.DefaultAddressResolverGroup;
78
import io.scalecube.services.Address;
89
import io.scalecube.services.ServiceReference;
910
import io.scalecube.services.api.ServiceMessage;
@@ -26,7 +27,6 @@
2627
import reactor.netty.Connection;
2728
import reactor.netty.http.client.HttpClient;
2829
import reactor.netty.resources.ConnectionProvider;
29-
import reactor.netty.resources.LoopResources;
3030

3131
public final class WebsocketGatewayClientTransport implements ClientChannel, ClientTransport {
3232

@@ -39,10 +39,8 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
3939
private static final int CONNECT_TIMEOUT_MILLIS = (int) Duration.ofSeconds(5).toMillis();
4040

4141
private final GatewayClientCodec clientCodec;
42-
private final LoopResources loopResources;
4342
private final Duration keepAliveInterval;
4443
private final Function<HttpClient, HttpClient> operator;
45-
private final boolean ownsLoopResources;
4644

4745
private final AtomicLong sidCounter = new AtomicLong();
4846
private final AtomicReference<WebsocketGatewayClientSession> clientSessionReference =
@@ -52,11 +50,6 @@ private WebsocketGatewayClientTransport(Builder builder) {
5250
this.clientCodec = builder.clientCodec;
5351
this.keepAliveInterval = builder.keepAliveInterval;
5452
this.operator = builder.operator;
55-
this.loopResources =
56-
builder.loopResources == null
57-
? LoopResources.create("websocket-gateway-client", 1, true)
58-
: builder.loopResources;
59-
this.ownsLoopResources = builder.loopResources == null;
6053
}
6154

6255
@Override
@@ -70,7 +63,7 @@ public ClientChannel create(ServiceReference serviceReference) {
7063
final HttpClient httpClient =
7164
operator.apply(
7265
HttpClient.create(ConnectionProvider.newConnection())
73-
.runOn(loopResources)
66+
.resolver(DefaultAddressResolverGroup.INSTANCE)
7467
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MILLIS)
7568
.option(ChannelOption.TCP_NODELAY, true)
7669
.headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, CONTENT_TYPE)));
@@ -192,15 +185,15 @@ private static Throwable getRootCause(Throwable throwable) {
192185

193186
@Override
194187
public void close() {
195-
if (ownsLoopResources) {
196-
loopResources.dispose();
188+
final var session = clientSessionReference.get();
189+
if (session != null) {
190+
session.close().doOnError(ex -> {}).subscribe();
197191
}
198192
}
199193

200194
public static class Builder {
201195

202196
private GatewayClientCodec clientCodec = CLIENT_CODEC;
203-
private LoopResources loopResources;
204197
private Duration keepAliveInterval = Duration.ZERO;
205198
private Function<HttpClient, HttpClient> operator = client -> client;
206199

@@ -211,11 +204,6 @@ public Builder clientCodec(GatewayClientCodec clientCodec) {
211204
return this;
212205
}
213206

214-
public Builder loopResources(LoopResources loopResources) {
215-
this.loopResources = loopResources;
216-
return this;
217-
}
218-
219207
public Builder httpClient(UnaryOperator<HttpClient> operator) {
220208
this.operator = this.operator.andThen(operator);
221209
return this;

0 commit comments

Comments
 (0)