-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathDemoResource.java
More file actions
141 lines (113 loc) · 4.64 KB
/
DemoResource.java
File metadata and controls
141 lines (113 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.assaabloy.reactive.resource;
import com.assaabloy.reactive.service.one.OneCommand;
import com.assaabloy.reactive.service.three.Three;
import com.assaabloy.reactive.service.three.ThreeCommand;
import com.assaabloy.reactive.service.three.ThreeService;
import com.assaabloy.reactive.service.two.Two;
import com.assaabloy.reactive.service.two.TwoCommand;
import com.assaabloy.reactive.service.two.TwoService;
import com.assaabloy.reactive.service.one.One;
import com.assaabloy.reactive.service.one.OneService;
import rx.schedulers.Schedulers;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Path("/demo")
@Produces(MediaType.APPLICATION_JSON)
public class DemoResource {
private final OneService oneService;
private final TwoService twoService;
private final ThreeService threeService;
public DemoResource(final OneService oneService, final TwoService twoService,
final ThreeService threeService) {
this.oneService = oneService;
this.twoService = twoService;
this.threeService = threeService;
}
//
// Simple naive blocking method
//
@GET
@Path("/blocking")
public Response getDemoBlocking() {
One one = oneService.getOne();
Two two = twoService.getTwo();
return Response.ok(new Demo(one, two))
.build();
}
//
// Using futures and an executor service works, when the usage is simple
//
private final ExecutorService executor = Executors.newCachedThreadPool();
@GET
@Path("/futures")
public Response getDemoFuture() throws Exception {
Future<One> slow = executor.submit(oneService.getOneCallable());
Future<Two> fast = executor.submit(twoService.getTwoCallable());
return Response.ok(new Demo(slow.get(), fast.get()))
.build();
}
//
// Using futures and an executor service starts blocking when more complicated call graphs are used
//
@GET
@Path("/futures-complex")
public Response getDemoFutureComplex() throws Exception {
Future<One> slow = executor.submit(oneService.getOneCallable());
Future<Three> complex = executor.submit(threeService.getThreeCallable(slow.get()));
Future<Two> fast = executor.submit(twoService.getTwoCallable());
return Response.ok(new Demo(complex.get(), fast.get()))
.build();
}
//
// Using CompletableFutures can reduce blocking, but is lacking in operators and difficult to read
//
@GET
@Path("/completable-futures")
public void getDemoCompletableFutures(@Suspended AsyncResponse ar) throws Exception {
final CompletableFuture<Two> threeFuture =
CompletableFuture.supplyAsync(twoService::getTwo, executor);
CompletableFuture.supplyAsync(oneService::getOne, executor)
.thenApplyAsync(threeService::getThree, executor)
.thenCombine(threeFuture, Demo::new)
.whenComplete((demo, throwable) -> {
if (throwable != null) {
ar.resume(throwable);
}
ar.resume(demo);
})
.get();
}
//
// Fully asynchronous RxJava method.
//
@GET
@Path("/async")
public void getDemoAsync(@Suspended final AsyncResponse asyncResponse) {
oneService.observeOne()
.subscribeOn(Schedulers.io())
.flatMap(threeService::observeThree)
.subscribeOn(Schedulers.io())
.zipWith(twoService.observeTwo(), Demo::new)
.subscribe(asyncResponse::resume, asyncResponse::resume);
}
// Asynchronous RxJava method, wrapped with Hystrix
@GET
@Path("/async-hystrix")
public void getDemoAsyncHystrix(@Suspended final AsyncResponse asyncResponse) {
new OneCommand(oneService).toObservable()
.subscribeOn(Schedulers.io())
.flatMap(one -> new ThreeCommand(threeService, one).toObservable())
.subscribeOn(Schedulers.io())
.zipWith(new TwoCommand(twoService).toObservable(), Demo::new)
.subscribe(asyncResponse::resume, asyncResponse::resume);
}
}