4
4
import io .netty .channel .ChannelOption ;
5
5
import io .netty .handler .codec .http .HttpHeaderNames ;
6
6
import io .netty .handler .codec .http .websocketx .PingWebSocketFrame ;
7
+ import io .netty .resolver .DefaultAddressResolverGroup ;
7
8
import io .scalecube .services .Address ;
8
9
import io .scalecube .services .ServiceReference ;
9
10
import io .scalecube .services .api .ServiceMessage ;
26
27
import reactor .netty .Connection ;
27
28
import reactor .netty .http .client .HttpClient ;
28
29
import reactor .netty .resources .ConnectionProvider ;
29
- import reactor .netty .resources .LoopResources ;
30
30
31
31
public final class WebsocketGatewayClientTransport implements ClientChannel , ClientTransport {
32
32
@@ -39,10 +39,8 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
39
39
private static final int CONNECT_TIMEOUT_MILLIS = (int ) Duration .ofSeconds (5 ).toMillis ();
40
40
41
41
private final GatewayClientCodec clientCodec ;
42
- private final LoopResources loopResources ;
43
42
private final Duration keepAliveInterval ;
44
43
private final Function <HttpClient , HttpClient > operator ;
45
- private final boolean ownsLoopResources ;
46
44
47
45
private final AtomicLong sidCounter = new AtomicLong ();
48
46
private final AtomicReference <WebsocketGatewayClientSession > clientSessionReference =
@@ -52,11 +50,6 @@ private WebsocketGatewayClientTransport(Builder builder) {
52
50
this .clientCodec = builder .clientCodec ;
53
51
this .keepAliveInterval = builder .keepAliveInterval ;
54
52
this .operator = builder .operator ;
55
- this .loopResources =
56
- builder .loopResources == null
57
- ? LoopResources .create ("websocket-gateway-client" , 1 , true )
58
- : builder .loopResources ;
59
- this .ownsLoopResources = builder .loopResources == null ;
60
53
}
61
54
62
55
@ Override
@@ -70,7 +63,7 @@ public ClientChannel create(ServiceReference serviceReference) {
70
63
final HttpClient httpClient =
71
64
operator .apply (
72
65
HttpClient .create (ConnectionProvider .newConnection ())
73
- .runOn ( loopResources )
66
+ .resolver ( DefaultAddressResolverGroup . INSTANCE )
74
67
.option (ChannelOption .CONNECT_TIMEOUT_MILLIS , CONNECT_TIMEOUT_MILLIS )
75
68
.option (ChannelOption .TCP_NODELAY , true )
76
69
.headers (headers -> headers .set (HttpHeaderNames .CONTENT_TYPE , CONTENT_TYPE )));
@@ -192,15 +185,15 @@ private static Throwable getRootCause(Throwable throwable) {
192
185
193
186
@ Override
194
187
public void close () {
195
- if (ownsLoopResources ) {
196
- loopResources .dispose ();
188
+ final var session = clientSessionReference .get ();
189
+ if (session != null ) {
190
+ session .close ().doOnError (ex -> {}).subscribe ();
197
191
}
198
192
}
199
193
200
194
public static class Builder {
201
195
202
196
private GatewayClientCodec clientCodec = CLIENT_CODEC ;
203
- private LoopResources loopResources ;
204
197
private Duration keepAliveInterval = Duration .ZERO ;
205
198
private Function <HttpClient , HttpClient > operator = client -> client ;
206
199
@@ -211,11 +204,6 @@ public Builder clientCodec(GatewayClientCodec clientCodec) {
211
204
return this ;
212
205
}
213
206
214
- public Builder loopResources (LoopResources loopResources ) {
215
- this .loopResources = loopResources ;
216
- return this ;
217
- }
218
-
219
207
public Builder httpClient (UnaryOperator <HttpClient > operator ) {
220
208
this .operator = this .operator .andThen (operator );
221
209
return this ;
0 commit comments