diff --git a/pom.xml b/pom.xml
index d44cbc29..e292c88b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
true
1.7.30
1.2.11
- 13.0.1
+ 32.0.0-jre
2.2.4
2.0.1
4.1.75.Final
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
index a00ec6ee..9b850fa9 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
@@ -23,14 +23,13 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.common.Threads;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -93,9 +92,10 @@ public Future extends ServiceController> terminate(long gracefulTimeout, TimeU
}
terminationTimeoutMillis.compareAndSet(-1L, timeout);
- stop();
+ stopAsync();
return Futures.transform(terminationFuture,
- (Function) input -> AbstractExecutionServiceController.this);
+ (Function) input -> AbstractExecutionServiceController.this,
+ MoreExecutors.directExecutor());
}
@Nullable
@@ -129,29 +129,25 @@ public void terminated(State from) {
}, executor);
}
- @Override
- public void awaitTerminated() throws ExecutionException {
- Uninterruptibles.getUninterruptibly(terminationFuture);
- }
-
- @Override
- public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException {
- Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit);
- }
-
public final void addListener(Listener listener, Executor executor) {
listenerExecutors.addListener(new ListenerExecutor(listener, executor));
}
@Override
- public final ListenableFuture start() {
+ public final Service startAsync() {
serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
- return serviceDelegate.start();
+ serviceDelegate.startAsync();
+ return this;
}
@Override
- public final State startAndWait() {
- return Futures.getUnchecked(start());
+ public final void awaitRunning() {
+ serviceDelegate.awaitRunning();
+ }
+
+ @Override
+ public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
+ serviceDelegate.awaitRunning(timeout, unit);
}
@Override
@@ -165,13 +161,24 @@ public final State state() {
}
@Override
- public final State stopAndWait() {
- return Futures.getUnchecked(stop());
+ public final Service stopAsync() {
+ serviceDelegate.stopAsync();
+ return this;
+ }
+
+ @Override
+ public final void awaitTerminated() {
+ serviceDelegate.awaitTerminated();
+ }
+
+ @Override
+ public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
+ serviceDelegate.awaitTerminated(timeout, unit);
}
@Override
- public final ListenableFuture stop() {
- return serviceDelegate.stop();
+ public final Throwable failureCause() {
+ return serviceDelegate.failureCause();
}
protected Executor executor(final State state) {
@@ -213,15 +220,15 @@ protected void shutDown() throws Exception {
}
@Override
- protected Executor executor(State state) {
- return AbstractExecutionServiceController.this.executor(state);
+ protected Executor executor() {
+ return AbstractExecutionServiceController.this.executor(state());
}
}
/**
* Inner class for dispatching listener call back to a list of listeners.
*/
- private static final class ListenerExecutors implements Listener {
+ private static final class ListenerExecutors extends Listener {
private interface Callback {
void call(Listener listener);
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 0ff2fc8c..74136260 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;
-import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -27,6 +26,7 @@
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.twill.api.Command;
@@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b
@Override
protected synchronized void doStartUp() {
if (kafkaClient != null && !logHandlers.isEmpty()) {
- kafkaClient.startAndWait();
+ kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
@@ -119,7 +120,7 @@ protected synchronized void doShutDown() {
}
if (kafkaClient != null) {
// Safe to call stop no matter what state the KafkaClientService is in.
- kafkaClient.stopAndWait();
+ kafkaClient.stopAsync().awaitTerminated();
}
}
@@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) {
logHandlers.add(handler);
if (logHandlers.size() == 1) {
- kafkaClient.startAndWait();
+ kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
@@ -198,7 +199,7 @@ public ListenableFuture restartInstances(final String runnable, Set input) {
return runnable;
}
- });
+ }, MoreExecutors.directExecutor());
}
@Override
@@ -280,7 +281,7 @@ public long onReceived(Iterator messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
- String json = Charsets.UTF_8.decode(message.getPayload()).toString();
+ String json = StandardCharsets.UTF_8.decode(message.getPayload()).toString();
try {
LogEntry entry = GSON.fromJson(json, LogEntry.class);
if (entry != null) {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 31adabce..f53900fd 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;
-import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
@@ -46,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -144,7 +144,7 @@ protected Gson getLiveNodeGson() {
@Override
public ListenableFuture onReceived(String messageId, Message message) {
LOG.info("Message received: {}", message);
- return Futures.immediateCheckedFuture(messageId);
+ return Futures.immediateFuture(messageId);
}
@Override
@@ -164,10 +164,10 @@ protected final void startUp() throws Exception {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
- LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId());
+ LOG.warn("ZK Session expired for service {} with runId {}.", serviceName(), runId.getId());
expired = true;
} else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
- LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId());
+ LOG.info("Reconnected after expiration for service {} with runId {}", serviceName(), runId.getId());
expired = false;
logIfFailed(createLiveNode());
}
@@ -204,7 +204,7 @@ protected final void shutDown() throws Exception {
} finally {
// Given at most 5 seconds to cleanup ZK nodes
removeLiveNode().get(5, TimeUnit.SECONDS);
- LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
+ LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId());
}
}
@@ -277,6 +277,7 @@ public void onFailure(Throwable t) {
LOG.error("Failed to watch messages.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
+
}
private void processMessage(final String path, final String messageId) {
@@ -292,7 +293,8 @@ public void onSuccess(NodeData result) {
return;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8));
+ LOG.debug("Message received from {}: {}",
+ path, new String(MessageCodec.encode(message), StandardCharsets.UTF_8));
}
// Handle the stop message
@@ -328,18 +330,19 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover
terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis);
// Stop this service.
- Futures.addCallback(stop(), new FutureCallback() {
+ addListener(new ServiceListenerAdapter() {
@Override
- public void onSuccess(State result) {
+ public void terminated(State from) {
messageRemover.run();
}
@Override
- public void onFailure(Throwable t) {
- LOG.error("Stop service failed upon STOP command", t);
+ public void failed(State from, Throwable failure) {
+ LOG.error("Stop service failed upon STOP command", failure);
messageRemover.run();
}
}, Threads.SAME_THREAD_EXECUTOR);
+ stopAsync();
return true;
}
@@ -391,7 +394,7 @@ public void onSuccess(T result) {
@Override
public void onFailure(Throwable t) {
- LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t);
+ LOG.error("Operation failed for service {} with runId {}", serviceName(), runId, t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
@@ -410,6 +413,6 @@ private byte[] serializeLiveNode() {
if (liveNodeData != null) {
content.add("data", getLiveNodeGson().toJsonTree(liveNodeData));
}
- return GSON.toJson(content).getBytes(Charsets.UTF_8);
+ return GSON.toJson(content).getBytes(StandardCharsets.UTF_8);
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 214fbb86..74066e56 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -154,7 +154,7 @@ protected synchronized void forceShutDown() {
// In force shutdown, don't send message.
stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
}
- stop();
+ stopAsync();
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java
index 38659f68..603e2db7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java
@@ -22,7 +22,6 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,9 +52,9 @@ protected void startUp() throws Exception {
for (Service service : services) {
try {
- service.startAndWait();
- } catch (UncheckedExecutionException e) {
- failureCause = e.getCause();
+ service.startAsync().awaitRunning();
+ } catch (IllegalStateException e) {
+ failureCause = e.getCause() != null ? e.getCause() : e;
break;
}
}
@@ -88,12 +87,12 @@ private void stopAll() throws Exception {
Service service = itor.next();
try {
if (service.isRunning() || service.state() == State.STARTING) {
- service.stopAndWait();
+ service.stopAsync().awaitTerminated();
}
- } catch (UncheckedExecutionException e) {
+ } catch (IllegalStateException e) {
// Just catch as we want all services stopped
if (failureCause == null) {
- failureCause = e.getCause();
+ failureCause = e.getCause() != null ? e.getCause() : e;
} else {
// Log for sub-sequence service shutdown error, as only the first failure cause will be thrown.
LOG.warn("Failed to stop service {}", service, e);
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
index 9153fe62..03dc6bbc 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java
@@ -46,7 +46,7 @@ public ElectionRegistry(ZKClient zkClient) {
*/
public Cancellable register(String name, ElectionHandler handler) {
LeaderElection election = new LeaderElection(zkClient, name, handler);
- election.start();
+ election.startAsync();
registry.put(name, election);
return new CancellableElection(name, election);
}
@@ -56,7 +56,7 @@ public Cancellable register(String name, ElectionHandler handler) {
*/
public void shutdown() {
for (LeaderElection election : registry.values()) {
- election.stop();
+ election.stopAsync();
}
}
@@ -71,7 +71,7 @@ public CancellableElection(String name, LeaderElection election) {
@Override
public void cancel() {
- election.stop();
+ election.stopAsync();
registry.remove(name, election);
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
index 9d3e1560..0e93016c 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
@@ -29,7 +29,7 @@
* Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
* Also make sure each method is called at most once.
*/
-final class ListenerExecutor implements Service.Listener {
+final class ListenerExecutor extends Service.Listener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java
index 4a34abf1..9d8c3142 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java
@@ -22,7 +22,7 @@
/**
* An adapter for implementing {@link Service.Listener} with all method default to no-op.
*/
-public abstract class ServiceListenerAdapter implements Service.Listener {
+public abstract class ServiceListenerAdapter extends Service.Listener {
@Override
public void starting() {
// No-op
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Services.java b/twill-core/src/main/java/org/apache/twill/internal/Services.java
index e431be98..1f0847a1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Services.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Services.java
@@ -17,14 +17,12 @@
*/
package org.apache.twill.internal;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Threads;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -38,27 +36,25 @@ public final class Services {
*
* @param firstService First service to start.
* @param moreServices The rest services to start.
- * @return A {@link ListenableFuture} that will be completed when all services are started, with the
- * result carries the completed {@link ListenableFuture} of each corresponding service in the
- * same order as they are passed to this method.
+ * @return A {@link ListenableFuture} that will be completed when all services are started.
*/
- public static ListenableFuture>> chainStart(Service firstService,
- Service...moreServices) {
+ public static ListenableFuture chainStart(Service firstService,
+ Service...moreServices) {
return doChain(true, firstService, moreServices);
}
/**
* Stops a list of {@link Service} one by one. It behaves the same as
* {@link #chainStart(com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)}
- * except {@link com.google.common.util.concurrent.Service#stop()} is called instead of start.
+ * except {@link com.google.common.util.concurrent.Service#stopAsync()} is called instead of startAsync.
*
* @param firstService First service to stop.
* @param moreServices The rest services to stop.
* @return A {@link ListenableFuture} that will be completed when all services are stopped.
* @see #chainStart(com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)
*/
- public static ListenableFuture>> chainStop(Service firstService,
- Service...moreServices) {
+ public static ListenableFuture chainStop(Service firstService,
+ Service...moreServices) {
return doChain(false, firstService, moreServices);
}
@@ -97,43 +93,56 @@ public void failed(Service.State from, Throwable failure) {
/**
* Performs the actual logic of chain Service start/stop.
*/
- private static ListenableFuture>> doChain(boolean doStart,
- Service firstService,
- Service...moreServices) {
- SettableFuture>> resultFuture = SettableFuture.create();
- List> result = Lists.newArrayListWithCapacity(moreServices.length + 1);
-
- ListenableFuture future = doStart ? firstService.start() : firstService.stop();
- future.addListener(createChainListener(future, moreServices, new AtomicInteger(0), result, resultFuture, doStart),
- Threads.SAME_THREAD_EXECUTOR);
+ private static ListenableFuture doChain(boolean doStart,
+ Service firstService,
+ Service...moreServices) {
+ final SettableFuture resultFuture = SettableFuture.create();
+ final AtomicInteger idx = new AtomicInteger(0);
+ startOrStop(doStart, firstService, moreServices, idx, resultFuture);
return resultFuture;
}
/**
- * Returns a {@link Runnable} that can be used as a {@link ListenableFuture} listener to trigger
- * further service action or completing the result future. Used by
- * {@link #doChain(boolean, com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)}
+ * Starts or stops a service and adds a listener to chain the next service when the operation completes.
*/
- private static Runnable createChainListener(final ListenableFuture future, final Service[] services,
- final AtomicInteger idx,
- final List> result,
- final SettableFuture>> resultFuture,
- final boolean doStart) {
- return new Runnable() {
+ private static void startOrStop(final boolean doStart, Service service, final Service[] moreServices,
+ final AtomicInteger idx,
+ final SettableFuture resultFuture) {
+ service.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ if (doStart) {
+ onServiceOperationComplete(Service.State.RUNNING);
+ }
+ }
@Override
- public void run() {
- result.add(future);
+ public void terminated(Service.State from) {
+ if (!doStart) {
+ onServiceOperationComplete(Service.State.TERMINATED);
+ }
+ }
+
+ @Override
+ public void failed(Service.State from, Throwable failure) {
+ resultFuture.setException(failure);
+ }
+
+ private void onServiceOperationComplete(Service.State state) {
int nextIdx = idx.getAndIncrement();
- if (nextIdx == services.length) {
- resultFuture.set(result);
+ if (nextIdx >= moreServices.length) {
+ resultFuture.set(state);
return;
}
- ListenableFuture actionFuture = doStart ? services[nextIdx].start() : services[nextIdx].stop();
- actionFuture.addListener(createChainListener(actionFuture, services, idx, result, resultFuture, doStart),
- Threads.SAME_THREAD_EXECUTOR);
+ startOrStop(doStart, moreServices[nextIdx], moreServices, idx, resultFuture);
}
- };
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ if (doStart) {
+ service.startAsync();
+ } else {
+ service.stopAsync();
+ }
}
private Services() {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 9bcbdf69..c2bd979a 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;
-import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
@@ -40,6 +39,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -167,7 +167,7 @@ public TwillContainerController start(RunId runId, int instanceId, Class> main
TwillContainerControllerImpl controller =
new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController);
- controller.start();
+ controller.startAsync();
return controller;
}
@@ -237,12 +237,12 @@ protected void instanceNodeUpdated(NodeData nodeData) {
}
try {
Gson gson = new Gson();
- JsonElement json = gson.fromJson(new String(nodeData.getData(), Charsets.UTF_8), JsonElement.class);
+ JsonElement json = gson.fromJson(new String(nodeData.getData(), StandardCharsets.UTF_8), JsonElement.class);
if (json.isJsonObject()) {
JsonElement data = json.getAsJsonObject().get("data");
if (data != null) {
this.liveData = gson.fromJson(data, ContainerLiveNodeData.class);
- LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), Charsets.UTF_8));
+ LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), StandardCharsets.UTF_8));
}
}
} catch (Throwable t) {
@@ -292,9 +292,8 @@ public int getInstanceId() {
}
private void killAndWait(long maxWaitSecs) {
- Stopwatch watch = new Stopwatch();
- watch.start();
- while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
+ Stopwatch watch = Stopwatch.createStarted();
+ while (watch.elapsed(TimeUnit.SECONDS) < maxWaitSecs) {
// Kill the application
try {
kill();
@@ -312,7 +311,7 @@ private void killAndWait(long maxWaitSecs) {
// Timeout reached, runnable has not stopped
LOG.error("Failed to kill runnable {}, instance {} after {} seconds", runnable, instanceId,
- watch.elapsedTime(TimeUnit.SECONDS));
+ watch.elapsed(TimeUnit.SECONDS));
// TODO: should we throw exception here?
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
index cfcc7e7f..8fe9d1b0 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
@@ -20,6 +20,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.MessageCodec;
@@ -79,14 +80,14 @@ public void onSuccess(String result) {
public void onFailure(Throwable t) {
completion.setException(t);
}
- });
+ }, MoreExecutors.directExecutor());
}
@Override
public void onFailure(Throwable t) {
completion.setException(t);
}
- });
+ }, MoreExecutors.directExecutor());
}
private ZKMessages() {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
index 341d3ee9..34021c82 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
@@ -18,8 +18,6 @@
package org.apache.twill.internal.json;
import com.google.common.collect.ImmutableMultimap;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -39,6 +37,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
/**
* Gson codec for {@link Arguments}.
@@ -48,16 +47,24 @@ public final class ArgumentsCodec implements JsonSerializer, JsonDese
private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec())
.create();
- public static void encode(Arguments arguments, OutputSupplier extends Writer> writerSupplier) throws IOException {
- try (Writer writer = writerSupplier.getOutput()) {
+ public static void encode(Arguments arguments, Callable extends Writer> writerSupplier) throws IOException {
+ try (Writer writer = writerSupplier.call()) {
GSON.toJson(arguments, writer);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
- public static Arguments decode(InputSupplier extends Reader> readerSupplier) throws IOException {
- try (Reader reader = readerSupplier.getInput()) {
+ public static Arguments decode(Callable extends Reader> readerSupplier) throws IOException {
+ try (Reader reader = readerSupplier.call()) {
return GSON.fromJson(reader, Arguments.class);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
index 4df9081e..3a779b27 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.json;
-import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.gson.Gson;
@@ -41,6 +40,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.io.Reader;
import java.io.Writer;
import java.lang.reflect.ParameterizedType;
@@ -85,7 +85,7 @@ public void toJson(TwillRuntimeSpecification spec, Writer writer) {
}
public void toJson(TwillRuntimeSpecification spec, File file) throws IOException {
- try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) {
+ try (Writer writer = Files.newWriter(file, StandardCharsets.UTF_8)) {
toJson(spec, writer);
}
}
@@ -99,7 +99,7 @@ public TwillRuntimeSpecification fromJson(Reader reader) {
}
public TwillRuntimeSpecification fromJson(File file) throws IOException {
- try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
+ try (Reader reader = Files.newReader(file, StandardCharsets.UTF_8)) {
return fromJson(reader);
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index 832faa82..3ebe9dac 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.kafka.client;
-import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -34,6 +33,7 @@
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -162,7 +162,7 @@ public void changed(BrokerService brokerService) {
String newBrokerList = brokerService.getBrokerList();
// If there is no change, whether it is empty or not, just return
- if (Objects.equal(brokerList, newBrokerList)) {
+ if (Objects.equals(brokerList, newBrokerList)) {
return;
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
index de42b9bc..ff81892d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.kafka.client;
-import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -51,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -307,7 +307,7 @@ private T decodeNodeData(NodeData nodeData, Class type) {
if (data == null) {
return null;
}
- return GSON.fromJson(new String(data, Charsets.UTF_8), type);
+ return GSON.fromJson(new String(data, StandardCharsets.UTF_8), type);
}
/**
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java
index a0d93ec0..a712e3a1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java
@@ -98,7 +98,7 @@ protected void startUp() throws Exception {
scheduler.scheduleAtFixedRate(this, PUBLISHER_CLEANUP_SECONDS, PUBLISHER_CLEANUP_SECONDS, TimeUnit.SECONDS);
// Start broker service to get auto-updated brokers information.
- brokerService.startAndWait();
+ brokerService.startAsync().awaitRunning();
}
@Override
@@ -110,7 +110,7 @@ protected void shutDown() throws Exception {
}
consumer.stop();
- brokerService.stopAndWait();
+ brokerService.stopAsync().awaitTerminated();
LOG.info("KafkaClientService stopped");
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 493c4cae..02fd3c5b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -20,7 +20,6 @@
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
@@ -47,6 +46,7 @@
import org.apache.twill.zookeeper.ZKClients;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
@@ -154,13 +154,9 @@ public void start() {
kafkaClient = new ZKKafkaClientService(zkClientService);
Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
- new FutureCallback>>() {
+ new FutureCallback() {
@Override
- public void onSuccess(List> result) {
- for (ListenableFuture future : result) {
- Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
- "Service is not running.");
- }
+ public void onSuccess(Service.State result) {
addInfo("Kafka client started: " + zkConnectStr);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@@ -209,7 +205,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept
List logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());
for (String json : Iterables.consumingIterable(buffer)) {
- logs.add(Charsets.UTF_8.encode(json));
+ logs.add(StandardCharsets.UTF_8.encode(json));
}
long backOffTime = timeoutUnit.toNanos(timeout) / 10;
@@ -218,8 +214,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept
}
try {
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
+ Stopwatch stopwatch = Stopwatch.createStarted();
long publishTimeout = timeout;
do {
@@ -230,7 +225,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept
} catch (ExecutionException e) {
addError("Failed to publish logs to Kafka.", e);
TimeUnit.NANOSECONDS.sleep(backOffTime);
- publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
+ publishTimeout -= stopwatch.elapsed(timeoutUnit);
stopwatch.reset();
stopwatch.start();
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
index 79a53d2e..e2686cc9 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.state;
-import com.google.common.base.Charsets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -31,6 +30,7 @@
import org.apache.twill.api.Command;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -54,7 +54,7 @@ public static Message decode(byte[] bytes) {
if (bytes == null) {
return null;
}
- String content = new String(bytes, Charsets.UTF_8);
+ String content = new String(bytes, StandardCharsets.UTF_8);
return GSON.fromJson(content, Message.class);
}
@@ -64,7 +64,7 @@ public static Message decode(byte[] bytes) {
* @return byte array representing the encoded message.
*/
public static byte[] encode(Message message) {
- return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8);
+ return GSON.toJson(message, Message.class).getBytes(StandardCharsets.UTF_8);
}
/**
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
index 0465d467..111dab33 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
@@ -17,9 +17,11 @@
*/
package org.apache.twill.internal.state;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
import org.apache.twill.api.Command;
+import java.util.Objects;
+
/**
* Implementation of {@code Message} interface to pass information about {@code Command} to execute.
*/
@@ -59,7 +61,7 @@ public Command getCommand() {
@Override
public String toString() {
- return Objects.toStringHelper(Message.class)
+ return MoreObjects.toStringHelper(Message.class)
.add("type", type)
.add("scope", scope)
.add("runnable", runnableName)
@@ -69,7 +71,7 @@ public String toString() {
@Override
public int hashCode() {
- return Objects.hashCode(type, scope, runnableName, command);
+ return Objects.hash(type, scope, runnableName, command);
}
@Override
@@ -83,7 +85,7 @@ public boolean equals(Object obj) {
Message other = (Message) obj;
return type == other.getType()
&& scope == other.getScope()
- && Objects.equal(runnableName, other.getRunnableName())
- && Objects.equal(command, other.getCommand());
+ && Objects.equals(runnableName, other.getRunnableName())
+ && Objects.equals(command, other.getCommand());
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java
index e659ab74..9d47e45f 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java
@@ -17,7 +17,7 @@
*/
package org.apache.twill.kafka.client;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
/**
* Represents broker information. This class should be serializable with Gson.
@@ -57,7 +57,7 @@ public int hashCode() {
@Override
public String toString() {
- return Objects.toStringHelper(BrokerInfo.class)
+ return MoreObjects.toStringHelper(BrokerInfo.class)
.add("host", host)
.add("port", port)
.toString();
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
index 87040bee..eb538078 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
@@ -17,7 +17,7 @@
*/
package org.apache.twill.kafka.client;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
/**
* Represents a combination of topic and partition.
@@ -62,7 +62,7 @@ public int hashCode() {
@Override
public String toString() {
- return Objects.toStringHelper(this)
+ return MoreObjects.toStringHelper(this)
.add("topic", topic)
.add("partition", partition)
.toString();
diff --git a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java
index 34f866c9..ec75fbe1 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java
@@ -49,7 +49,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout
}
Service service = new CompositeService(services);
- service.start().get(5, TimeUnit.SECONDS);
+ service.startAsync().awaitRunning(5, TimeUnit.SECONDS);
// There should be 10 permits after all 10 services started
Assert.assertTrue(semaphore.tryAcquire(10, 5, TimeUnit.SECONDS));
@@ -59,7 +59,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout
// Release 10 permits for the stop sequence to start
semaphore.release(10);
- service.stop().get(5, TimeUnit.SECONDS);
+ service.stopAsync().awaitTerminated(5, TimeUnit.SECONDS);
// There should be no permit left after all 10 services stopped
Assert.assertFalse(semaphore.tryAcquire(10));
@@ -80,9 +80,9 @@ public void testErrorStart() throws InterruptedException {
Service service = new CompositeService(services);
try {
- service.start().get();
+ service.startAsync().awaitRunning();
Assert.fail();
- } catch (ExecutionException e) {
+ } catch (IllegalStateException e) {
// Expected
}
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
index 1a5b7785..3c9ecfc1 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -50,17 +50,17 @@ public class ControllerTest {
@Test
public void testController() throws ExecutionException, InterruptedException, TimeoutException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
+ zkServer.startAsync().awaitRunning();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
+ zkClientService.startAsync().awaitRunning();
Service service = createService(zkClientService, runId);
- service.startAndWait();
+ service.startAsync().awaitRunning();
TwillController controller = getController(zkClientService, "testController", runId);
controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
@@ -76,10 +76,10 @@ public void terminated(Service.State from) {
Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
- zkClientService.stopAndWait();
+ zkClientService.stopAsync().awaitTerminated();
} finally {
- zkServer.stopAndWait();
+ zkServer.stopAsync().awaitTerminated();
}
}
@@ -87,13 +87,13 @@ public void terminated(Service.State from) {
@Test
public void testControllerBefore() throws InterruptedException, ExecutionException, TimeoutException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
+ zkServer.startAsync().awaitRunning();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
+ zkClientService.startAsync().awaitRunning();
final CountDownLatch runLatch = new CountDownLatch(1);
TwillController controller = getController(zkClientService, "testControllerBefore", runId);
@@ -105,7 +105,7 @@ public void run() {
}, Threads.SAME_THREAD_EXECUTOR);
Service service = createService(zkClientService, runId);
- service.start();
+ service.startAsync();
Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
@@ -116,11 +116,11 @@ public void run() {
// Expected
}
- service.stop();
+ service.stopAsync();
controller.awaitTerminated(120, TimeUnit.SECONDS);
} finally {
- zkServer.stopAndWait();
+ zkServer.stopAsync().awaitTerminated();
}
}
@@ -128,16 +128,16 @@ public void run() {
@Test
public void testControllerListener() throws InterruptedException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
+ zkServer.startAsync().awaitRunning();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
+ zkClientService.startAsync().awaitRunning();
Service service = createService(zkClientService, runId);
- service.startAndWait();
+ service.startAsync().awaitRunning();
final CountDownLatch runLatch = new CountDownLatch(1);
TwillController controller = getController(zkClientService, "testControllerListener", runId);
@@ -150,11 +150,11 @@ public void run() {
Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
- service.stopAndWait();
+ service.stopAsync().awaitTerminated();
- zkClientService.stopAndWait();
+ zkClientService.stopAsync().awaitTerminated();
} finally {
- zkServer.stopAndWait();
+ zkServer.stopAsync().awaitTerminated();
}
}
@@ -212,7 +212,7 @@ public ResourceReport getResourceReport() {
return null;
}
};
- controller.startAndWait();
+ controller.startAsync().awaitRunning();
return controller;
}
}
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java
index 058fb439..0c0657cd 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java
@@ -19,7 +19,6 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import org.junit.Assert;
@@ -45,8 +44,8 @@ public void testChain() throws ExecutionException, InterruptedException {
Service s2 = new DummyService("s2", transiting);
Service s3 = new DummyService("s3", transiting);
- Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get();
- Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get();
+ Services.chainStart(s1, s2, s3).get();
+ Services.chainStop(s3, s2, s1).get();
}
@Test
@@ -54,8 +53,8 @@ public void testCompletion() throws ExecutionException, InterruptedException {
Service service = new DummyService("s1", new AtomicBoolean());
ListenableFuture completion = Services.getCompletionFuture(service);
- service.start();
- service.stop();
+ service.startAsync();
+ service.stopAsync();
completion.get();
@@ -63,9 +62,9 @@ public void testCompletion() throws ExecutionException, InterruptedException {
service = new DummyService("s2", transiting);
completion = Services.getCompletionFuture(service);
- service.startAndWait();
+ service.startAsync().awaitRunning();
transiting.set(true);
- service.stop();
+ service.stopAsync();
try {
completion.get();
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
index 4609ebd9..7b542984 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -132,7 +132,9 @@ private void unjar(File jarFile, File targetDir) throws IOException {
target.mkdirs();
} else {
target.getParentFile().mkdirs();
- ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
+ try (java.io.OutputStream os = new java.io.FileOutputStream(target)) {
+ ByteStreams.copy(jarInput, os);
+ }
}
jarEntry = jarInput.getNextJarEntry();
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index e7e7e7ca..fce001e5 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.kafka.client;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Futures;
@@ -40,6 +39,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
@@ -65,11 +65,11 @@ public class KafkaTest {
@BeforeClass
public static void init() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
- zkServer.startAndWait();
+ zkServer.startAsync().awaitRunning();
// Extract the kafka.tgz and start the kafka server
kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr()));
- kafkaServer.startAndWait();
+ kafkaServer.startAsync().awaitRunning();
zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
@@ -80,8 +80,8 @@ public static void init() throws Exception {
@AfterClass
public static void finish() throws Exception {
Services.chainStop(kafkaClient, zkClientService).get();
- kafkaServer.stopAndWait();
- zkServer.stopAndWait();
+ kafkaServer.stopAsync().awaitTerminated();
+ zkServer.stopAsync().awaitTerminated();
}
@Test
@@ -91,15 +91,15 @@ public void testKafkaClientReconnect() throws Exception {
EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig);
ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
- zkClient.startAndWait();
+ zkClient.startAsync().awaitRunning();
try {
zkClient.create("/", null, CreateMode.PERSISTENT).get();
ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
- kafkaClient.startAndWait();
+ kafkaClient.startAsync().awaitRunning();
try {
- server.startAndWait();
+ server.startAsync().awaitRunning();
try {
// Publish a messages
createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
@@ -114,7 +114,7 @@ public long onReceived(Iterator messages) {
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
- queue.offer(Charsets.UTF_8.decode(message.getPayload()).toString());
+ queue.offer(StandardCharsets.UTF_8.decode(message.getPayload()).toString());
}
return nextOffset;
}
@@ -128,12 +128,12 @@ public void finished() {
Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
// Shutdown the server
- server.stopAndWait();
+ server.stopAsync().awaitTerminated();
// Start the server again.
// Needs to create a new instance with the same config since guava service cannot be restarted
server = new EmbeddedKafkaServer(kafkaServerConfig);
- server.startAndWait();
+ server.startAsync().awaitRunning();
// Wait a little while to make sure changes is reflected in broker service
TimeUnit.SECONDS.sleep(3);
@@ -146,13 +146,13 @@ public void finished() {
cancel.cancel();
} finally {
- kafkaClient.stopAndWait();
+ kafkaClient.stopAsync().awaitTerminated();
}
} finally {
- server.stopAndWait();
+ server.stopAsync().awaitTerminated();
}
} finally {
- zkClient.stopAndWait();
+ zkClient.stopAsync().awaitTerminated();
}
}
@@ -180,7 +180,7 @@ public long onReceived(Iterator messages) {
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
- LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+ LOG.info(StandardCharsets.UTF_8.decode(message.getPayload()).toString());
latch.countDown();
}
return nextOffset;
@@ -222,7 +222,7 @@ public long onReceived(Iterator messages) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset() + 1;
offsetQueue.offer(message.getOffset());
- LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+ LOG.info(StandardCharsets.UTF_8.decode(message.getPayload()).toString());
return nextOffset;
}
return nextOffset;
@@ -247,17 +247,17 @@ public void testBrokerChange() throws Exception {
// Create a new namespace in ZK for Kafka server for this test case
String connectionStr = zkServer.getConnectionStr() + "/broker_change";
ZKClientService zkClient = ZKClientService.Builder.of(connectionStr).build();
- zkClient.startAndWait();
+ zkClient.startAsync().awaitRunning();
zkClient.create("/", null, CreateMode.PERSISTENT).get();
// Start a new kafka server
File logDir = TMP_FOLDER.newFolder();
EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
- server.startAndWait();
+ server.startAsync().awaitRunning();
// Start a Kafka client
KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
- kafkaClient.startAndWait();
+ kafkaClient.startAsync().awaitRunning();
// Attach a consumer
final BlockingQueue consumedMessages = Queues.newLinkedBlockingQueue();
@@ -269,7 +269,7 @@ public long onReceived(Iterator messages) {
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
- consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString());
+ consumedMessages.add(StandardCharsets.UTF_8.decode(message.getPayload()).toString());
}
return nextOffset;
}
@@ -282,26 +282,26 @@ public void finished() {
// Get a publisher and publish a message
KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.FIRE_AND_FORGET, Compression.NONE);
- publisher.prepare("test").add(Charsets.UTF_8.encode("Message 0"), 0).send().get();
+ publisher.prepare("test").add(StandardCharsets.UTF_8.encode("Message 0"), 0).send().get();
// Should receive one message
Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS));
// Now shutdown and restart the server on different port
- server.stopAndWait();
+ server.stopAsync().awaitTerminated();
server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
- server.startAndWait();
+ server.startAsync().awaitRunning();
// Wait a little while to make sure changes is reflected in broker service
TimeUnit.SECONDS.sleep(3);
// Now publish again with the same publisher. It should succeed and the consumer should receive the message.
- publisher.prepare("test").add(Charsets.UTF_8.encode("Message 1"), 0).send().get();
+ publisher.prepare("test").add(StandardCharsets.UTF_8.encode("Message 1"), 0).send().get();
Assert.assertEquals("Message 1", consumedMessages.poll(5, TimeUnit.SECONDS));
- kafkaClient.stopAndWait();
- zkClient.stopAndWait();
- server.stopAndWait();
+ kafkaClient.stopAsync().awaitTerminated();
+ zkClient.stopAsync().awaitTerminated();
+ server.stopAsync().awaitTerminated();
}
private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
@@ -315,7 +315,7 @@ private Thread createPublishThread(final KafkaClient kafkaClient, final String t
KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
KafkaPublisher.Preparer preparer = publisher.prepare(topic);
for (int i = 0; i < count; i++) {
- preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
+ preparer.add(StandardCharsets.UTF_8.encode((base + i) + " " + message), 0);
}
Futures.getUnchecked(preparer.send());
});
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
index b020dc7a..f2d03fc8 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.discovery;
-import com.google.common.base.Charsets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -30,6 +29,7 @@
import com.google.gson.JsonSerializer;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
import java.net.InetSocketAddress;
/**
@@ -47,7 +47,7 @@ final class DiscoverableAdapter {
* @return array of bytes representing an instance of discoverable
*/
static byte[] encode(Discoverable discoverable) {
- return GSON.toJson(discoverable, Discoverable.class).getBytes(Charsets.UTF_8);
+ return GSON.toJson(discoverable, Discoverable.class).getBytes(StandardCharsets.UTF_8);
}
/**
@@ -59,7 +59,7 @@ static Discoverable decode(byte[] encoded) {
if (encoded == null) {
return null;
}
- return GSON.fromJson(new String(encoded, Charsets.UTF_8), Discoverable.class);
+ return GSON.fromJson(new String(encoded, StandardCharsets.UTF_8), Discoverable.class);
}
private DiscoverableAdapter() {
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index dcf39351..5fd0bf07 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -47,20 +47,20 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
@BeforeClass
public static void beforeClass() {
zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
- zkServer.startAndWait();
+ zkServer.startAsync().awaitRunning();
zkClient = ZKClientServices.delegate(
ZKClients.retryOnFailure(
ZKClients.reWatchOnExpire(
ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
- zkClient.startAndWait();
+ zkClient.startAsync().awaitRunning();
}
@AfterClass
public static void afterClass() {
- zkClient.stopAndWait();
- zkServer.stopAndWait();
+ zkClient.stopAsync().awaitTerminated();
+ zkServer.stopAsync().awaitTerminated();
}
@Test (timeout = 30000)
@@ -87,7 +87,7 @@ public void testDoubleRegister() throws Exception {
ZKClients.reWatchOnExpire(
ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
- zkClient2.startAndWait();
+ zkClient2.startAsync().awaitRunning();
try (ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2)) {
cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
@@ -98,7 +98,7 @@ public void testDoubleRegister() throws Exception {
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
- zkClient2.stopAndWait();
+ zkClient2.stopAsync().awaitTerminated();
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
@@ -109,7 +109,7 @@ public void run() {
cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
cancellable.cancel();
} finally {
- zkClient2.stopAndWait();
+ zkClient2.stopAsync().awaitTerminated();
}
} finally {
closeServices(entry);
diff --git a/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java b/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java
index 4b8641e4..040770f9 100644
--- a/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java
+++ b/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java
@@ -17,14 +17,13 @@
*/
package org.apache.twill.ext;
-import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.nio.file.Files;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -35,6 +34,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
+import java.util.Objects;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
@@ -85,7 +85,7 @@ public void load(ClassLoader parentClassLoader) throws IOException, ClassNotFoun
Preconditions.checkNotNull(libFolder);
File inputJarFile = this.jarFile;
- File outputJarDir = Files.createTempDir();
+ File outputJarDir = Files.createTempDirectory("bundled-jar").toFile();
LOG.debug("Unpacking jar to " + outputJarDir.getAbsolutePath());
JarFile jarFile = new JarFile(inputJarFile);
@@ -220,15 +220,15 @@ public boolean equals(Object o) {
}
Arguments arguments = (Arguments) o;
- return Objects.equal(jarFileName, arguments.jarFileName)
- && Objects.equal(libFolder, arguments.libFolder)
+ return Objects.equals(jarFileName, arguments.jarFileName)
+ && Objects.equals(libFolder, arguments.libFolder)
&& Arrays.deepEquals(mainArgs, arguments.mainArgs)
- && Objects.equal(mainClassName, arguments.mainClassName);
+ && Objects.equals(mainClassName, arguments.mainClassName);
}
@Override
public int hashCode() {
- return Objects.hashCode(jarFileName, mainClassName, mainArgs, libFolder);
+ return Objects.hash(jarFileName, mainClassName, mainArgs, libFolder);
}
public String[] toArray() {
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 42bff62d..da5bd24a 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -86,12 +86,12 @@ protected void startUp() throws Exception {
trackerAddr.getPort(),
trackerUrl.toString());
maxCapability = response.getMaximumResourceCapability();
- nmClient.startAndWait();
+ nmClient.startAsync().awaitRunning();
}
@Override
protected void shutDown() throws Exception {
- nmClient.stopAndWait();
+ nmClient.stopAsync().awaitTerminated();
amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
amrmClient.stop();
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index ca0bc080..06e43a88 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -25,7 +25,6 @@
import ch.qos.logback.core.spi.FilterReply;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
@@ -81,7 +80,7 @@ protected final void doMain(final Service mainService,
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- mainService.stopAndWait();
+ mainService.stopAsync().awaitTerminated();
}
});
@@ -93,7 +92,7 @@ public void run() {
try {
// Starts the service
LOG.info("Starting service {}.", mainService);
- Futures.allAsList(Services.chainStart(requiredServices, mainService).get()).get();
+ Services.chainStart(requiredServices, mainService).get();
LOG.info("Service {} started.", mainService);
} catch (Throwable t) {
LOG.error("Exception when starting service {}.", mainService, t);
@@ -110,7 +109,7 @@ public void run() {
throw Throwables.propagate(t);
}
} finally {
- requiredServices.stopAndWait();
+ requiredServices.stopAsync().awaitTerminated();
ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
if (loggerFactory instanceof LoggerContext) {
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
index 38cb32cf..c182e899 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
@@ -17,9 +17,10 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.base.Objects;
import org.apache.hadoop.yarn.api.records.Resource;
+import java.util.Objects;
+
/**
* This class defines how the containers should be allocated.
*/
@@ -93,9 +94,9 @@ public boolean equals(Object obj) {
}
AllocationSpecification other = (AllocationSpecification) obj;
return (instanceId == other.instanceId) &&
- Objects.equal(resource, other.resource) &&
- Objects.equal(type, other.type) &&
- Objects.equal(runnableName, other.runnableName);
+ Objects.equals(resource, other.resource) &&
+ Objects.equals(type, other.type) &&
+ Objects.equals(runnableName, other.runnableName);
}
@Override
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 036be811..01e45dd2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -169,7 +169,7 @@ protected void startUp() throws Exception {
// no left over content from previous AM attempt.
LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
- kafkaServer.startAndWait();
+ kafkaServer.startAsync().awaitRunning();
}
@Override
@@ -183,7 +183,7 @@ protected void shutDown() throws Exception {
// Ignore
LOG.info("Kafka shutdown delay interrupted", e);
} finally {
- kafkaServer.stopAndWait();
+ kafkaServer.stopAsync().awaitTerminated();
}
}
@@ -230,13 +230,13 @@ private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerSer
@Override
protected void startUp() throws Exception {
trackerService.setHost(yarnAMClient.getHost());
- trackerService.startAndWait();
+ trackerService.startAsync().awaitRunning();
yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl());
try {
- yarnAMClient.startAndWait();
+ yarnAMClient.startAsync().awaitRunning();
} catch (Exception e) {
- trackerService.stopAndWait();
+ trackerService.stopAsync().awaitTerminated();
throw e;
}
}
@@ -244,9 +244,9 @@ protected void startUp() throws Exception {
@Override
protected void shutDown() throws Exception {
try {
- yarnAMClient.stopAndWait();
+ yarnAMClient.stopAsync().awaitTerminated();
} finally {
- trackerService.stopAndWait();
+ trackerService.stopAsync().awaitTerminated();
}
}
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 6649bf41..7ac349c3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -21,14 +21,15 @@
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
-import com.google.common.collect.DiscreteDomains;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
-import com.google.common.collect.Ranges;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
@@ -636,7 +637,7 @@ private long checkProvisionTimeout(long nextTimeoutCheck) {
if (action.getTimeout() < 0) {
// Abort application
stopStatus = StopStatus.ABORTED;
- stop();
+ stopAsync();
} else {
return nextTimeoutCheck + action.getTimeout();
}
@@ -1023,7 +1024,7 @@ public void run() {
int runningCount = runningContainers.count(runnableName);
Set instancesToRemove = instanceIds == null ? null : ImmutableSet.copyOf(instanceIds);
if (instancesToRemove == null) {
- instancesToRemove = Ranges.closedOpen(0, runningCount).asSet(DiscreteDomains.integers());
+ instancesToRemove = ContiguousSet.create(Range.closedOpen(0, runningCount), DiscreteDomain.integers());
}
LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
index e48dcbb6..b226e3f2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -17,7 +17,7 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.EnvKeys;
@@ -50,7 +50,7 @@ public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmC
@Override
public String toString() {
- return Objects.toStringHelper(this)
+ return MoreObjects.toStringHelper(this)
.add("container", containerInfo)
.toString();
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index f8ed27ec..29d1e997 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
@@ -33,6 +32,7 @@
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -65,6 +65,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
@@ -581,7 +582,7 @@ public void onFailure(Throwable t) {
}
}
}
- });
+ }, MoreExecutors.directExecutor());
}
/**
@@ -735,10 +736,10 @@ private Location saveLogLevels() {
try {
Gson gson = new GsonBuilder().serializeNulls().create();
String jsonStr = gson.toJson(logLevels);
- String fileName = Hashing.md5().hashString(jsonStr) + "." + Constants.Files.LOG_LEVELS;
+ String fileName = Hashing.md5().hashString(jsonStr, StandardCharsets.UTF_8) + "." + Constants.Files.LOG_LEVELS;
Location location = applicationLocation.append(fileName);
if (!location.exists()) {
- try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+ try (Writer writer = new OutputStreamWriter(location.getOutputStream(), StandardCharsets.UTF_8)) {
writer.write(jsonStr);
}
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index e6d86a55..9ddfb4ef 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.container;
-import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.reflect.TypeToken;
@@ -52,6 +51,7 @@
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
+import java.nio.charset.StandardCharsets;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -179,7 +179,7 @@ private static ClassLoader getClassLoader() {
private static Map> loadLogLevels() throws IOException {
File file = new File(Constants.Files.LOG_LEVELS);
if (file.exists()) {
- try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
+ try (Reader reader = Files.newReader(file, StandardCharsets.UTF_8)) {
Gson gson = new GsonBuilder().serializeNulls().create();
return gson.fromJson(reader, new TypeToken