Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
<slf4j.version>1.7.30</slf4j.version>
<logback.version>1.2.11</logback.version>
<guava.version>13.0.1</guava.version>
<guava.version>32.0.0-jre</guava.version>
<gson.version>2.2.4</gson.version>
<findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
<netty.version>4.1.75.Final</netty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.common.Threads;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -93,9 +92,10 @@ public Future<? extends ServiceController> terminate(long gracefulTimeout, TimeU
}

terminationTimeoutMillis.compareAndSet(-1L, timeout);
stop();
stopAsync();
return Futures.transform(terminationFuture,
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this);
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this,
MoreExecutors.directExecutor());
}

@Nullable
Expand Down Expand Up @@ -129,29 +129,25 @@ public void terminated(State from) {
}, executor);
}

@Override
public void awaitTerminated() throws ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture);
}

@Override
public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit);
}

public final void addListener(Listener listener, Executor executor) {
listenerExecutors.addListener(new ListenerExecutor(listener, executor));
}

@Override
public final ListenableFuture<State> start() {
public final Service startAsync() {
serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
return serviceDelegate.start();
serviceDelegate.startAsync();
return this;
}

@Override
public final State startAndWait() {
return Futures.getUnchecked(start());
public final void awaitRunning() {
serviceDelegate.awaitRunning();
}

@Override
public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
serviceDelegate.awaitRunning(timeout, unit);
}

@Override
Expand All @@ -165,13 +161,24 @@ public final State state() {
}

@Override
public final State stopAndWait() {
return Futures.getUnchecked(stop());
public final Service stopAsync() {
serviceDelegate.stopAsync();
return this;
}

@Override
public final void awaitTerminated() {
serviceDelegate.awaitTerminated();
}

@Override
public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
serviceDelegate.awaitTerminated(timeout, unit);
}

@Override
public final ListenableFuture<State> stop() {
return serviceDelegate.stop();
public final Throwable failureCause() {
return serviceDelegate.failureCause();
}

protected Executor executor(final State state) {
Expand Down Expand Up @@ -213,15 +220,15 @@ protected void shutDown() throws Exception {
}

@Override
protected Executor executor(State state) {
return AbstractExecutionServiceController.this.executor(state);
protected Executor executor() {
return AbstractExecutionServiceController.this.executor(state());
}
}

/**
* Inner class for dispatching listener call back to a list of listeners.
*/
private static final class ListenerExecutors implements Listener {
private static final class ListenerExecutors extends Listener {

private interface Callback {
void call(Listener listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,6 +26,7 @@
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.twill.api.Command;
Expand Down Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b
@Override
protected synchronized void doStartUp() {
if (kafkaClient != null && !logHandlers.isEmpty()) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand All @@ -119,7 +120,7 @@ protected synchronized void doShutDown() {
}
if (kafkaClient != null) {
// Safe to call stop no matter what state the KafkaClientService is in.
kafkaClient.stopAndWait();
kafkaClient.stopAsync().awaitTerminated();
}
}

Expand All @@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) {

logHandlers.add(handler);
if (logHandlers.size() == 1) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand Down Expand Up @@ -198,7 +199,7 @@ public ListenableFuture<String> restartInstances(final String runnable, Set<Inte
public String apply(Set<String> input) {
return runnable;
}
});
}, MoreExecutors.directExecutor());
}

@Override
Expand Down Expand Up @@ -280,7 +281,7 @@ public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
String json = Charsets.UTF_8.decode(message.getPayload()).toString();
String json = StandardCharsets.UTF_8.decode(message.getPayload()).toString();
try {
LogEntry entry = GSON.fromJson(json, LogEntry.class);
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -46,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -144,7 +144,7 @@ protected Gson getLiveNodeGson() {
@Override
public ListenableFuture<String> onReceived(String messageId, Message message) {
LOG.info("Message received: {}", message);
return Futures.immediateCheckedFuture(messageId);
return Futures.immediateFuture(messageId);
}

@Override
Expand All @@ -164,10 +164,10 @@ protected final void startUp() throws Exception {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId());
LOG.warn("ZK Session expired for service {} with runId {}.", serviceName(), runId.getId());
expired = true;
} else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId());
LOG.info("Reconnected after expiration for service {} with runId {}", serviceName(), runId.getId());
expired = false;
logIfFailed(createLiveNode());
}
Expand Down Expand Up @@ -204,7 +204,7 @@ protected final void shutDown() throws Exception {
} finally {
// Given at most 5 seconds to cleanup ZK nodes
removeLiveNode().get(5, TimeUnit.SECONDS);
LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId());
}
}

Expand Down Expand Up @@ -277,6 +277,7 @@ public void onFailure(Throwable t) {
LOG.error("Failed to watch messages.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);

}

private void processMessage(final String path, final String messageId) {
Expand All @@ -292,7 +293,8 @@ public void onSuccess(NodeData result) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8));
LOG.debug("Message received from {}: {}",
path, new String(MessageCodec.encode(message), StandardCharsets.UTF_8));
}

// Handle the stop message
Expand Down Expand Up @@ -328,18 +330,19 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover
terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis);

// Stop this service.
Futures.addCallback(stop(), new FutureCallback<State>() {
addListener(new ServiceListenerAdapter() {
@Override
public void onSuccess(State result) {
public void terminated(State from) {
messageRemover.run();
}

@Override
public void onFailure(Throwable t) {
LOG.error("Stop service failed upon STOP command", t);
public void failed(State from, Throwable failure) {
LOG.error("Stop service failed upon STOP command", failure);
messageRemover.run();
}
}, Threads.SAME_THREAD_EXECUTOR);
stopAsync();
return true;
}

Expand Down Expand Up @@ -391,7 +394,7 @@ public void onSuccess(T result) {

@Override
public void onFailure(Throwable t) {
LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t);
LOG.error("Operation failed for service {} with runId {}", serviceName(), runId, t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
Expand All @@ -410,6 +413,6 @@ private byte[] serializeLiveNode() {
if (liveNodeData != null) {
content.add("data", getLiveNodeGson().toJsonTree(liveNodeData));
}
return GSON.toJson(content).getBytes(Charsets.UTF_8);
return GSON.toJson(content).getBytes(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected synchronized void forceShutDown() {
// In force shutdown, don't send message.
stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
}
stop();
stopAsync();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,9 +52,9 @@ protected void startUp() throws Exception {

for (Service service : services) {
try {
service.startAndWait();
} catch (UncheckedExecutionException e) {
failureCause = e.getCause();
service.startAsync().awaitRunning();
} catch (IllegalStateException e) {
failureCause = e.getCause() != null ? e.getCause() : e;
break;
}
}
Expand Down Expand Up @@ -88,12 +87,12 @@ private void stopAll() throws Exception {
Service service = itor.next();
try {
if (service.isRunning() || service.state() == State.STARTING) {
service.stopAndWait();
service.stopAsync().awaitTerminated();
}
} catch (UncheckedExecutionException e) {
} catch (IllegalStateException e) {
// Just catch as we want all services stopped
if (failureCause == null) {
failureCause = e.getCause();
failureCause = e.getCause() != null ? e.getCause() : e;
} else {
// Log for sub-sequence service shutdown error, as only the first failure cause will be thrown.
LOG.warn("Failed to stop service {}", service, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ElectionRegistry(ZKClient zkClient) {
*/
public Cancellable register(String name, ElectionHandler handler) {
LeaderElection election = new LeaderElection(zkClient, name, handler);
election.start();
election.startAsync();
registry.put(name, election);
return new CancellableElection(name, election);
}
Expand All @@ -56,7 +56,7 @@ public Cancellable register(String name, ElectionHandler handler) {
*/
public void shutdown() {
for (LeaderElection election : registry.values()) {
election.stop();
election.stopAsync();
}
}

Expand All @@ -71,7 +71,7 @@ public CancellableElection(String name, LeaderElection election) {

@Override
public void cancel() {
election.stop();
election.stopAsync();
registry.remove(name, election);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
* Also make sure each method is called at most once.
*/
final class ListenerExecutor implements Service.Listener {
final class ListenerExecutor extends Service.Listener {

private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* An adapter for implementing {@link Service.Listener} with all method default to no-op.
*/
public abstract class ServiceListenerAdapter implements Service.Listener {
public abstract class ServiceListenerAdapter extends Service.Listener {
@Override
public void starting() {
// No-op
Expand Down
Loading