Skip to content

Commit 7feb218

Browse files
committed
Added default system schedulers
1 parent 8897f92 commit 7feb218

File tree

3 files changed

+125
-18
lines changed

3 files changed

+125
-18
lines changed

services-api/src/main/java/io/scalecube/services/Reflect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler>
396396
+ declaringClass.getName()
397397
+ "."
398398
+ method.getName()
399-
+ ": scheduler (name="
399+
+ ": scheduler(name="
400400
+ name
401401
+ ") cannot be found");
402402
}
@@ -425,7 +425,7 @@ public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler>
425425
+ declaringClass.getName()
426426
+ "."
427427
+ method.getName()
428-
+ ": scheduler (name="
428+
+ ": scheduler(name="
429429
+ name
430430
+ ") cannot be found");
431431
}

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,10 @@ private Context conclude() {
757757
serviceRegistry = new ServiceRegistryImpl();
758758
}
759759

760+
schedulers.put("parallel", Schedulers.parallel());
761+
schedulers.put("single", Schedulers.single());
762+
schedulers.put("boundedElastic", Schedulers.boundedElastic());
763+
schedulers.put("immediate", Schedulers.immediate());
760764
schedulerSuppliers.forEach((s, supplier) -> schedulers.put(s, supplier.get()));
761765

762766
return this;

services/src/test/java/io/scalecube/services/ExecuteOnTest.java

Lines changed: 119 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,70 +52,70 @@ void afterEach() {
5252

5353
@Test
5454
void testExecuteOnClass() {
55-
final var executeOnClass = new HelloServiceV1();
55+
final var serviceV1 = new HelloServiceV1();
5656
try (final var microservices =
5757
Microservices.start(
5858
new Context()
5959
.scheduler(SCHEDULER1_NAME, () -> schedulers.get(SCHEDULER1_NAME))
6060
.scheduler(SCHEDULER2_NAME, () -> schedulers.get(SCHEDULER2_NAME))
61-
.services(executeOnClass))) {
61+
.services(serviceV1))) {
6262

6363
final var api = microservices.call().api(HelloService.class);
6464

6565
api.hello().block();
66-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
66+
assertEquals(SCHEDULER1_NAME, serviceV1.threadName.get(), "threadName");
6767

6868
api.hola().block();
69-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
69+
assertEquals(SCHEDULER1_NAME, serviceV1.threadName.get(), "threadName");
7070

7171
api.arigato().block();
72-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
72+
assertEquals(SCHEDULER1_NAME, serviceV1.threadName.get(), "threadName");
7373
}
7474
}
7575

7676
@Test
7777
void testExecuteOnMethod() {
78-
final var executeOnClass = new HelloServiceV2();
78+
final var serviceV2 = new HelloServiceV2();
7979
try (final var microservices =
8080
Microservices.start(
8181
new Context()
8282
.scheduler(SCHEDULER1_NAME, () -> schedulers.get(SCHEDULER1_NAME))
8383
.scheduler(SCHEDULER2_NAME, () -> schedulers.get(SCHEDULER2_NAME))
84-
.services(executeOnClass))) {
84+
.services(serviceV2))) {
8585

8686
final var api = microservices.call().api(HelloService.class);
8787

8888
api.hello().block();
89-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
89+
assertEquals(SCHEDULER1_NAME, serviceV2.threadName.get(), "threadName");
9090

9191
api.hola().block();
92-
assertEquals(SCHEDULER2_NAME, executeOnClass.threadName.get(), "threadName");
92+
assertEquals(SCHEDULER2_NAME, serviceV2.threadName.get(), "threadName");
9393

9494
api.arigato().block();
95-
assertEquals("main", executeOnClass.threadName.get(), "threadName");
95+
assertEquals("main", serviceV2.threadName.get(), "threadName");
9696
}
9797
}
9898

9999
@Test
100100
void testExecuteOnMixedDefinition() {
101-
final var executeOnClass = new HelloServiceV3();
101+
final var serviceV3 = new HelloServiceV3();
102102
try (final var microservices =
103103
Microservices.start(
104104
new Context()
105105
.scheduler(SCHEDULER1_NAME, () -> schedulers.get(SCHEDULER1_NAME))
106106
.scheduler(SCHEDULER2_NAME, () -> schedulers.get(SCHEDULER2_NAME))
107-
.services(executeOnClass))) {
107+
.services(serviceV3))) {
108108

109109
final var api = microservices.call().api(HelloService.class);
110110

111111
api.hello().block();
112-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
112+
assertEquals(SCHEDULER1_NAME, serviceV3.threadName.get(), "threadName");
113113

114114
api.hola().block();
115-
assertEquals(SCHEDULER1_NAME, executeOnClass.threadName.get(), "threadName");
115+
assertEquals(SCHEDULER1_NAME, serviceV3.threadName.get(), "threadName");
116116

117117
api.arigato().block();
118-
assertEquals(SCHEDULER2_NAME, executeOnClass.threadName.get(), "threadName");
118+
assertEquals(SCHEDULER2_NAME, serviceV3.threadName.get(), "threadName");
119119
}
120120
}
121121

@@ -153,6 +153,39 @@ void testExecuteOnSchedulerMustBeDisposed() {
153153
assertTrue(s3.isDisposed(), "s3.isDisposed");
154154
}
155155

156+
@Test
157+
void testDefaultSchedulersOnMethods() {
158+
final var service = new DefaultSchedulerOnServiceMethodImpl();
159+
try (final var microservices = Microservices.start(new Context().services(service))) {
160+
161+
final var api = microservices.call().api(DefaultSchedulerOnServiceMethod.class);
162+
163+
api.parallel().block();
164+
assertEquals("parallel-1", service.threadName.get(), "threadName");
165+
166+
api.single().block();
167+
assertEquals("single-1", service.threadName.get(), "threadName");
168+
169+
api.boundedElastic().block();
170+
assertEquals("boundedElastic-1", service.threadName.get(), "threadName");
171+
172+
api.immediate().block();
173+
assertEquals("main", service.threadName.get(), "threadName");
174+
}
175+
}
176+
177+
@Test
178+
void testDefaultSchedulersOnService() {
179+
final var service = new DefaultSchedulerOnServiceImpl();
180+
try (final var microservices = Microservices.start(new Context().services(service))) {
181+
182+
final var api = microservices.call().api(DefaultSchedulerOnService.class);
183+
184+
api.hello().block();
185+
assertEquals("single-1", service.threadName.get(), "threadName");
186+
}
187+
}
188+
156189
@Service("v1/greeting")
157190
public interface HelloService {
158191

@@ -166,6 +199,29 @@ public interface HelloService {
166199
Mono<String> arigato();
167200
}
168201

202+
@Service("v1/defaultSchedulerOnServiceMethod")
203+
public interface DefaultSchedulerOnServiceMethod {
204+
205+
@ServiceMethod
206+
Mono<String> parallel();
207+
208+
@ServiceMethod
209+
Mono<String> single();
210+
211+
@ServiceMethod
212+
Mono<String> boundedElastic();
213+
214+
@ServiceMethod
215+
Mono<String> immediate();
216+
}
217+
218+
@Service("v1/defaultSchedulerOnService")
219+
public interface DefaultSchedulerOnService {
220+
221+
@ServiceMethod
222+
Mono<String> hello();
223+
}
224+
169225
// All methods must be executed in scheduler@1
170226
@ExecuteOn(SCHEDULER1_NAME)
171227
public static class HelloServiceV1 implements HelloService {
@@ -246,7 +302,7 @@ public Mono<String> arigato() {
246302
}
247303
}
248304

249-
// All methods must be executed in scheduler@3@that-was-not-declared
305+
// This service will not be registered due to scheduler@3@that-was-not-declared
250306
@ExecuteOn(SCHEDULER3_NAME)
251307
public static class HelloServiceV4 implements HelloService {
252308

@@ -270,4 +326,51 @@ public Mono<String> arigato() {
270326
return Mono.just("Arigato | " + System.currentTimeMillis());
271327
}
272328
}
329+
330+
// Service that executes method calls in the system default schedulers
331+
public static class DefaultSchedulerOnServiceMethodImpl
332+
implements DefaultSchedulerOnServiceMethod {
333+
334+
final AtomicReference<String> threadName = new AtomicReference<>();
335+
336+
@ExecuteOn("parallel")
337+
@Override
338+
public Mono<String> parallel() {
339+
threadName.set(Thread.currentThread().getName());
340+
return Mono.just("parallel | " + System.currentTimeMillis());
341+
}
342+
343+
@ExecuteOn("single")
344+
@Override
345+
public Mono<String> single() {
346+
threadName.set(Thread.currentThread().getName());
347+
return Mono.just("single | " + System.currentTimeMillis());
348+
}
349+
350+
@ExecuteOn("boundedElastic")
351+
@Override
352+
public Mono<String> boundedElastic() {
353+
threadName.set(Thread.currentThread().getName());
354+
return Mono.just("boundedElastic | " + System.currentTimeMillis());
355+
}
356+
357+
@ExecuteOn("immediate")
358+
@Override
359+
public Mono<String> immediate() {
360+
threadName.set(Thread.currentThread().getName());
361+
return Mono.just("immediate | " + System.currentTimeMillis());
362+
}
363+
}
364+
365+
@ExecuteOn("single")
366+
public static class DefaultSchedulerOnServiceImpl implements DefaultSchedulerOnService {
367+
368+
final AtomicReference<String> threadName = new AtomicReference<>();
369+
370+
@Override
371+
public Mono<String> hello() {
372+
threadName.set(Thread.currentThread().getName());
373+
return Mono.just("hello | " + System.currentTimeMillis());
374+
}
375+
}
273376
}

0 commit comments

Comments
 (0)