Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;


public class ConditionalRetry {
Expand All @@ -41,32 +40,18 @@ public ConditionalRetry(Counter counter, String name) {
public ConditionalRetry(Counter counter, String name, final int max) {
this.counter = counter;
this.name = name;
this.retryLogic =
new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.zipWith(Observable.range(1, max), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable t1, Integer integer) {
return integer;
}
})
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
if (errorRef.get() != null)
return Observable.error(errorRef.get());
if (ConditionalRetry.this.counter != null)
ConditionalRetry.this.counter.increment();
long delay = 2 * (integer > 10 ? 10 : integer);
logger.info(": retrying " + ConditionalRetry.this.name +
" after sleeping for " + delay + " secs");
return Observable.timer(delay, TimeUnit.SECONDS);
}
});
}
};
this.retryLogic = attempts -> attempts
.zipWith(Observable.range(1, max), (t1, integer) -> integer)
.flatMap(integer -> {
if (errorRef.get() != null)
return Observable.error(errorRef.get());
if (ConditionalRetry.this.counter != null)
ConditionalRetry.this.counter.increment();
long delay = 2 * (integer > 10 ? 10 : integer);
logger.info(": retrying " + ConditionalRetry.this.name +
" after sleeping for " + delay + " secs");
return Observable.timer(delay, TimeUnit.SECONDS);
});
}

public void setErrorRef(Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;


/* package */ class HttpUtility {
Expand All @@ -49,74 +47,54 @@
static Observable<String> getGetResponse(String host, int port, String uri) {
return new CompositeHttpClientBuilder<ByteBuf, ByteBuf>()
.appendPipelineConfigurator(
new PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() {
private String uri = "<undefined>";
(PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>) pipeline -> {
pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() {
private String uri = "<undefined>";

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
uri = request.getUri();
logger.info("Sending request on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
super.write(ctx, msg, promise);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
uri = request.uri();
logger.info("Sending request on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
super.write(ctx, msg, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
logger.info("Received response on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
super.channelRead(ctx, msg);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
logger.info("Received response on channel id: " + ctx.channel().toString() +
", request URI: " + uri);
}
});

try {
int maxContentLength = 10 * 1024 * 1024; // Ten megabytes
pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator",
new HttpObjectAggregator(maxContentLength));
} catch (NoSuchElementException ex) {
logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}",
ex.getMessage(), ex);
} catch (IllegalArgumentException ex) {
logger.error("ChannelHandler named http-object-aggregator already existed in this" +
" pipeline. Error: {}", ex.getMessage(), ex);
}
catch (Throwable t) {
logger.error("Unknown error adding HttpObjectAggregator to Master Client " +
"Pipeline. Error: {}", t.getMessage(), t);
super.channelRead(ctx, msg);
}
}
});

try {
int maxContentLength = 10 * 1024 * 1024; // Ten megabytes
pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator",
new HttpObjectAggregator(maxContentLength));
} catch (NoSuchElementException ex) {
logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}",
ex.getMessage(), ex);
} catch (IllegalArgumentException ex) {
logger.error("ChannelHandler named http-object-aggregator already existed in this" +
" pipeline. Error: {}", ex.getMessage(), ex);
} catch (Throwable t) {
logger.error("Unknown error adding HttpObjectAggregator to Master Client " +
"Pipeline. Error: {}", t.getMessage(), t);
}
})
.build()
.submit(new RxClient.ServerInfo(host, port),
HttpClientRequest.createGet(uri),
new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build())
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
@Override
public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
return response.getContent();
}
})
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf o) {
return o.toString(Charset.defaultCharset());
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logger.warn("Error: " + throwable.getMessage(), throwable);
}
})
.flatMap(response -> response.getContent())
.map(o -> o.toString(Charset.defaultCharset()))
.doOnError(throwable -> logger.warn("Error: " + throwable.getMessage(), throwable))
.timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS);
}
}
Loading