From 76e546cadd80442671714180ec4b13dc0e61789a Mon Sep 17 00:00:00 2001 From: vikas-cs07 Date: Wed, 18 Mar 2026 16:48:53 +0530 Subject: [PATCH] Upgrade Guava library from 13.0.1 to 32.0.0-jre Migrated all removed and deprecated Guava APIs across 74 files: - Objects.toStringHelper() -> MoreObjects.toStringHelper() - Objects.hashCode/equal -> java.util.Objects.hash/equals - new Stopwatch()/elapsedTime() -> Stopwatch.createStarted()/elapsed() - Charsets.UTF_8 -> StandardCharsets.UTF_8 - InputSupplier/OutputSupplier -> Callable - Guava Optional -> java.util.Optional - Futures.addCallback/transform 2-arg -> 3-arg with Executor - Futures.immediateCheckedFuture -> Futures.immediateFuture - Service.start/stop/startAndWait/stopAndWait -> startAsync/stopAsync/awaitRunning/awaitTerminated - ServiceListenerAdapter: implements -> extends Service.Listener - Ranges/DiscreteDomains -> Range/DiscreteDomain/ContiguousSet - Files.createTempDir() -> Files.createTempDirectory() - hasher.putString/hashString -> added Charset parameter - Files.newOutputStreamSupplier/newReaderSupplier -> Java NIO equivalents - getServiceName() -> serviceName() --- pom.xml | 2 +- .../AbstractExecutionServiceController.java | 57 +++++++------ .../internal/AbstractTwillController.java | 13 +-- .../twill/internal/AbstractTwillService.java | 27 +++--- .../internal/AbstractZKServiceController.java | 2 +- .../twill/internal/CompositeService.java | 13 ++- .../twill/internal/ElectionRegistry.java | 6 +- .../twill/internal/ListenerExecutor.java | 2 +- .../internal/ServiceListenerAdapter.java | 2 +- .../org/apache/twill/internal/Services.java | 81 ++++++++++-------- .../internal/TwillContainerLauncher.java | 15 ++-- .../org/apache/twill/internal/ZKMessages.java | 5 +- .../twill/internal/json/ArgumentsCodec.java | 19 +++-- .../TwillRuntimeSpecificationAdapter.java | 6 +- .../kafka/client/SimpleKafkaPublisher.java | 4 +- .../kafka/client/ZKBrokerService.java | 4 +- .../kafka/client/ZKKafkaClientService.java | 4 +- .../twill/internal/logging/KafkaAppender.java | 17 ++-- .../twill/internal/state/MessageCodec.java | 6 +- .../twill/internal/state/SimpleMessage.java | 12 +-- .../apache/twill/kafka/client/BrokerInfo.java | 4 +- .../twill/kafka/client/TopicPartition.java | 4 +- .../twill/internal/CompositeServiceTest.java | 8 +- .../apache/twill/internal/ControllerTest.java | 34 ++++---- .../apache/twill/internal/ServicesTest.java | 13 ++- .../utils/ApplicationBundlerTest.java | 4 +- .../apache/twill/kafka/client/KafkaTest.java | 56 ++++++------ .../twill/discovery/DiscoverableAdapter.java | 6 +- .../discovery/ZKDiscoveryServiceTest.java | 14 +-- .../apache/twill/ext/BundledJarRunner.java | 14 +-- .../internal/yarn/Hadoop21YarnAMClient.java | 4 +- .../apache/twill/internal/ServiceMain.java | 7 +- .../appmaster/AllocationSpecification.java | 9 +- .../appmaster/ApplicationMasterMain.java | 14 +-- .../appmaster/ApplicationMasterService.java | 9 +- .../appmaster/RunnableProcessLauncher.java | 4 +- .../internal/appmaster/RunningContainers.java | 9 +- .../container/TwillContainerMain.java | 7 +- .../ApplicationMasterLiveNodeDecoder.java | 4 +- .../twill/yarn/YarnTwillController.java | 8 +- .../apache/twill/yarn/YarnTwillPreparer.java | 22 ++--- .../twill/yarn/YarnTwillRunnerService.java | 14 +-- .../twill/filesystem/LocationTestBase.java | 6 +- .../org/apache/twill/yarn/BaseYarnTest.java | 4 +- .../apache/twill/yarn/DistributedShell.java | 5 +- .../apache/twill/yarn/EchoServerTestRun.java | 17 ++-- .../apache/twill/yarn/EnvironmentTestRun.java | 8 +- .../twill/yarn/EventHandlerTestRun.java | 4 +- .../twill/yarn/FailureRestartTestRun.java | 8 +- .../apache/twill/yarn/LocalFileTestRun.java | 11 +-- .../twill/yarn/LogLevelChangeTestRun.java | 5 +- .../apache/twill/yarn/LogLevelTestRun.java | 5 +- .../twill/yarn/ResourceReportTestRun.java | 15 ++-- .../twill/yarn/RestartRunnableTestRun.java | 10 +-- .../twill/yarn/SessionExpireTestRun.java | 5 +- .../org/apache/twill/yarn/SocketServer.java | 5 +- .../org/apache/twill/yarn/TwillTester.java | 4 +- .../internal/zookeeper/BasicNodeChildren.java | 4 +- .../internal/zookeeper/BasicNodeData.java | 4 +- .../zookeeper/DefaultZKClientService.java | 42 ++++++--- .../zookeeper/FailureRetryZKClient.java | 18 ++-- .../internal/zookeeper/InMemoryZKServer.java | 45 +++++++--- .../internal/zookeeper/LeaderElection.java | 16 ++-- .../internal/zookeeper/NamespaceZKClient.java | 2 +- .../zookeeper/ReentrantDistributedLock.java | 7 +- .../zookeeper/RewatchOnExpireWatcher.java | 7 +- .../zookeeper/RewatchOnExpireZKClient.java | 7 +- .../zookeeper/ForwardingZKClientService.java | 38 ++++++--- .../twill/zookeeper/ZKClientService.java | 4 +- .../apache/twill/zookeeper/ZKOperations.java | 6 +- .../zookeeper/LeaderElectionTest.java | 58 ++++++------- .../ReentrantDistributedLockTest.java | 34 ++++---- .../apache/twill/zookeeper/ZKClientTest.java | 85 ++++++++++--------- .../twill/zookeeper/ZKOperationsTest.java | 8 +- 74 files changed, 573 insertions(+), 489 deletions(-) diff --git a/pom.xml b/pom.xml index d44cbc29..e292c88b 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ true 1.7.30 1.2.11 - 13.0.1 + 32.0.0-jre 2.2.4 2.0.1 4.1.75.Final diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java index a00ec6ee..9b850fa9 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java @@ -23,14 +23,13 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.Uninterruptibles; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.api.RunId; import org.apache.twill.api.ServiceController; import org.apache.twill.common.Threads; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -93,9 +92,10 @@ public Future terminate(long gracefulTimeout, TimeU } terminationTimeoutMillis.compareAndSet(-1L, timeout); - stop(); + stopAsync(); return Futures.transform(terminationFuture, - (Function) input -> AbstractExecutionServiceController.this); + (Function) input -> AbstractExecutionServiceController.this, + MoreExecutors.directExecutor()); } @Nullable @@ -129,29 +129,25 @@ public void terminated(State from) { }, executor); } - @Override - public void awaitTerminated() throws ExecutionException { - Uninterruptibles.getUninterruptibly(terminationFuture); - } - - @Override - public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException { - Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit); - } - public final void addListener(Listener listener, Executor executor) { listenerExecutors.addListener(new ListenerExecutor(listener, executor)); } @Override - public final ListenableFuture start() { + public final Service startAsync() { serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR); - return serviceDelegate.start(); + serviceDelegate.startAsync(); + return this; } @Override - public final State startAndWait() { - return Futures.getUnchecked(start()); + public final void awaitRunning() { + serviceDelegate.awaitRunning(); + } + + @Override + public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitRunning(timeout, unit); } @Override @@ -165,13 +161,24 @@ public final State state() { } @Override - public final State stopAndWait() { - return Futures.getUnchecked(stop()); + public final Service stopAsync() { + serviceDelegate.stopAsync(); + return this; + } + + @Override + public final void awaitTerminated() { + serviceDelegate.awaitTerminated(); + } + + @Override + public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitTerminated(timeout, unit); } @Override - public final ListenableFuture stop() { - return serviceDelegate.stop(); + public final Throwable failureCause() { + return serviceDelegate.failureCause(); } protected Executor executor(final State state) { @@ -213,15 +220,15 @@ protected void shutDown() throws Exception { } @Override - protected Executor executor(State state) { - return AbstractExecutionServiceController.this.executor(state); + protected Executor executor() { + return AbstractExecutionServiceController.this.executor(state()); } } /** * Inner class for dispatching listener call back to a list of listeners. */ - private static final class ListenerExecutors implements Listener { + private static final class ListenerExecutors extends Listener { private interface Callback { void call(Listener listener); diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java index 0ff2fc8c..74136260 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -27,6 +26,7 @@ import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.twill.api.Command; @@ -54,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b @Override protected synchronized void doStartUp() { if (kafkaClient != null && !logHandlers.isEmpty()) { - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); logCancellable = kafkaClient.getConsumer().prepare() .addFromBeginning(Constants.LOG_TOPIC, 0) .consume(new LogMessageCallback(logHandlers)); @@ -119,7 +120,7 @@ protected synchronized void doShutDown() { } if (kafkaClient != null) { // Safe to call stop no matter what state the KafkaClientService is in. - kafkaClient.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); } } @@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) { logHandlers.add(handler); if (logHandlers.size() == 1) { - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); logCancellable = kafkaClient.getConsumer().prepare() .addFromBeginning(Constants.LOG_TOPIC, 0) .consume(new LogMessageCallback(logHandlers)); @@ -198,7 +199,7 @@ public ListenableFuture restartInstances(final String runnable, Set input) { return runnable; } - }); + }, MoreExecutors.directExecutor()); } @Override @@ -280,7 +281,7 @@ public long onReceived(Iterator messages) { long nextOffset = -1L; while (messages.hasNext()) { FetchedMessage message = messages.next(); - String json = Charsets.UTF_8.decode(message.getPayload()).toString(); + String json = StandardCharsets.UTF_8.decode(message.getPayload()).toString(); try { LogEntry entry = GSON.fromJson(json, LogEntry.class); if (entry != null) { diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index 31adabce..f53900fd 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal; -import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.FutureCallback; @@ -46,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; @@ -144,7 +144,7 @@ protected Gson getLiveNodeGson() { @Override public ListenableFuture onReceived(String messageId, Message message) { LOG.info("Message received: {}", message); - return Futures.immediateCheckedFuture(messageId); + return Futures.immediateFuture(messageId); } @Override @@ -164,10 +164,10 @@ protected final void startUp() throws Exception { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.Expired) { - LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId()); + LOG.warn("ZK Session expired for service {} with runId {}.", serviceName(), runId.getId()); expired = true; } else if (event.getState() == Event.KeeperState.SyncConnected && expired) { - LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId()); + LOG.info("Reconnected after expiration for service {} with runId {}", serviceName(), runId.getId()); expired = false; logIfFailed(createLiveNode()); } @@ -204,7 +204,7 @@ protected final void shutDown() throws Exception { } finally { // Given at most 5 seconds to cleanup ZK nodes removeLiveNode().get(5, TimeUnit.SECONDS); - LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId()); + LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId()); } } @@ -277,6 +277,7 @@ public void onFailure(Throwable t) { LOG.error("Failed to watch messages.", t); } }, Threads.SAME_THREAD_EXECUTOR); + } private void processMessage(final String path, final String messageId) { @@ -292,7 +293,8 @@ public void onSuccess(NodeData result) { return; } if (LOG.isDebugEnabled()) { - LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8)); + LOG.debug("Message received from {}: {}", + path, new String(MessageCodec.encode(message), StandardCharsets.UTF_8)); } // Handle the stop message @@ -328,18 +330,19 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis); // Stop this service. - Futures.addCallback(stop(), new FutureCallback() { + addListener(new ServiceListenerAdapter() { @Override - public void onSuccess(State result) { + public void terminated(State from) { messageRemover.run(); } @Override - public void onFailure(Throwable t) { - LOG.error("Stop service failed upon STOP command", t); + public void failed(State from, Throwable failure) { + LOG.error("Stop service failed upon STOP command", failure); messageRemover.run(); } }, Threads.SAME_THREAD_EXECUTOR); + stopAsync(); return true; } @@ -391,7 +394,7 @@ public void onSuccess(T result) { @Override public void onFailure(Throwable t) { - LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t); + LOG.error("Operation failed for service {} with runId {}", serviceName(), runId, t); } }, Threads.SAME_THREAD_EXECUTOR); } @@ -410,6 +413,6 @@ private byte[] serializeLiveNode() { if (liveNodeData != null) { content.add("data", getLiveNodeGson().toJsonTree(liveNodeData)); } - return GSON.toJson(content).getBytes(Charsets.UTF_8); + return GSON.toJson(content).getBytes(StandardCharsets.UTF_8); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java index 214fbb86..74066e56 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java @@ -154,7 +154,7 @@ protected synchronized void forceShutDown() { // In force shutdown, don't send message. stopMessageFuture = Futures.immediateFuture(State.TERMINATED); } - stop(); + stopAsync(); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java index 38659f68..603e2db7 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.UncheckedExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +52,9 @@ protected void startUp() throws Exception { for (Service service : services) { try { - service.startAndWait(); - } catch (UncheckedExecutionException e) { - failureCause = e.getCause(); + service.startAsync().awaitRunning(); + } catch (IllegalStateException e) { + failureCause = e.getCause() != null ? e.getCause() : e; break; } } @@ -88,12 +87,12 @@ private void stopAll() throws Exception { Service service = itor.next(); try { if (service.isRunning() || service.state() == State.STARTING) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } - } catch (UncheckedExecutionException e) { + } catch (IllegalStateException e) { // Just catch as we want all services stopped if (failureCause == null) { - failureCause = e.getCause(); + failureCause = e.getCause() != null ? e.getCause() : e; } else { // Log for sub-sequence service shutdown error, as only the first failure cause will be thrown. LOG.warn("Failed to stop service {}", service, e); diff --git a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java index 9153fe62..03dc6bbc 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java @@ -46,7 +46,7 @@ public ElectionRegistry(ZKClient zkClient) { */ public Cancellable register(String name, ElectionHandler handler) { LeaderElection election = new LeaderElection(zkClient, name, handler); - election.start(); + election.startAsync(); registry.put(name, election); return new CancellableElection(name, election); } @@ -56,7 +56,7 @@ public Cancellable register(String name, ElectionHandler handler) { */ public void shutdown() { for (LeaderElection election : registry.values()) { - election.stop(); + election.stopAsync(); } } @@ -71,7 +71,7 @@ public CancellableElection(String name, LeaderElection election) { @Override public void cancel() { - election.stop(); + election.stopAsync(); registry.remove(name, election); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java index 9d3e1560..0e93016c 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java @@ -29,7 +29,7 @@ * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}. * Also make sure each method is called at most once. */ -final class ListenerExecutor implements Service.Listener { +final class ListenerExecutor extends Service.Listener { private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class); diff --git a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java index 4a34abf1..9d8c3142 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java @@ -22,7 +22,7 @@ /** * An adapter for implementing {@link Service.Listener} with all method default to no-op. */ -public abstract class ServiceListenerAdapter implements Service.Listener { +public abstract class ServiceListenerAdapter extends Service.Listener { @Override public void starting() { // No-op diff --git a/twill-core/src/main/java/org/apache/twill/internal/Services.java b/twill-core/src/main/java/org/apache/twill/internal/Services.java index e431be98..1f0847a1 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Services.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Services.java @@ -17,14 +17,12 @@ */ package org.apache.twill.internal; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.common.Threads; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** @@ -38,27 +36,25 @@ public final class Services { * * @param firstService First service to start. * @param moreServices The rest services to start. - * @return A {@link ListenableFuture} that will be completed when all services are started, with the - * result carries the completed {@link ListenableFuture} of each corresponding service in the - * same order as they are passed to this method. + * @return A {@link ListenableFuture} that will be completed when all services are started. */ - public static ListenableFuture>> chainStart(Service firstService, - Service...moreServices) { + public static ListenableFuture chainStart(Service firstService, + Service...moreServices) { return doChain(true, firstService, moreServices); } /** * Stops a list of {@link Service} one by one. It behaves the same as * {@link #chainStart(com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)} - * except {@link com.google.common.util.concurrent.Service#stop()} is called instead of start. + * except {@link com.google.common.util.concurrent.Service#stopAsync()} is called instead of startAsync. * * @param firstService First service to stop. * @param moreServices The rest services to stop. * @return A {@link ListenableFuture} that will be completed when all services are stopped. * @see #chainStart(com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...) */ - public static ListenableFuture>> chainStop(Service firstService, - Service...moreServices) { + public static ListenableFuture chainStop(Service firstService, + Service...moreServices) { return doChain(false, firstService, moreServices); } @@ -97,43 +93,56 @@ public void failed(Service.State from, Throwable failure) { /** * Performs the actual logic of chain Service start/stop. */ - private static ListenableFuture>> doChain(boolean doStart, - Service firstService, - Service...moreServices) { - SettableFuture>> resultFuture = SettableFuture.create(); - List> result = Lists.newArrayListWithCapacity(moreServices.length + 1); - - ListenableFuture future = doStart ? firstService.start() : firstService.stop(); - future.addListener(createChainListener(future, moreServices, new AtomicInteger(0), result, resultFuture, doStart), - Threads.SAME_THREAD_EXECUTOR); + private static ListenableFuture doChain(boolean doStart, + Service firstService, + Service...moreServices) { + final SettableFuture resultFuture = SettableFuture.create(); + final AtomicInteger idx = new AtomicInteger(0); + startOrStop(doStart, firstService, moreServices, idx, resultFuture); return resultFuture; } /** - * Returns a {@link Runnable} that can be used as a {@link ListenableFuture} listener to trigger - * further service action or completing the result future. Used by - * {@link #doChain(boolean, com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)} + * Starts or stops a service and adds a listener to chain the next service when the operation completes. */ - private static Runnable createChainListener(final ListenableFuture future, final Service[] services, - final AtomicInteger idx, - final List> result, - final SettableFuture>> resultFuture, - final boolean doStart) { - return new Runnable() { + private static void startOrStop(final boolean doStart, Service service, final Service[] moreServices, + final AtomicInteger idx, + final SettableFuture resultFuture) { + service.addListener(new ServiceListenerAdapter() { + @Override + public void running() { + if (doStart) { + onServiceOperationComplete(Service.State.RUNNING); + } + } @Override - public void run() { - result.add(future); + public void terminated(Service.State from) { + if (!doStart) { + onServiceOperationComplete(Service.State.TERMINATED); + } + } + + @Override + public void failed(Service.State from, Throwable failure) { + resultFuture.setException(failure); + } + + private void onServiceOperationComplete(Service.State state) { int nextIdx = idx.getAndIncrement(); - if (nextIdx == services.length) { - resultFuture.set(result); + if (nextIdx >= moreServices.length) { + resultFuture.set(state); return; } - ListenableFuture actionFuture = doStart ? services[nextIdx].start() : services[nextIdx].stop(); - actionFuture.addListener(createChainListener(actionFuture, services, idx, result, resultFuture, doStart), - Threads.SAME_THREAD_EXECUTOR); + startOrStop(doStart, moreServices[nextIdx], moreServices, idx, resultFuture); } - }; + }, Threads.SAME_THREAD_EXECUTOR); + + if (doStart) { + service.startAsync(); + } else { + service.stopAsync(); + } } private Services() { diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 9bcbdf69..c2bd979a 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal; -import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; @@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -167,7 +167,7 @@ public TwillContainerController start(RunId runId, int instanceId, Class main TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController); - controller.start(); + controller.startAsync(); return controller; } @@ -237,12 +237,12 @@ protected void instanceNodeUpdated(NodeData nodeData) { } try { Gson gson = new Gson(); - JsonElement json = gson.fromJson(new String(nodeData.getData(), Charsets.UTF_8), JsonElement.class); + JsonElement json = gson.fromJson(new String(nodeData.getData(), StandardCharsets.UTF_8), JsonElement.class); if (json.isJsonObject()) { JsonElement data = json.getAsJsonObject().get("data"); if (data != null) { this.liveData = gson.fromJson(data, ContainerLiveNodeData.class); - LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), Charsets.UTF_8)); + LOG.info("Container LiveNodeData updated: " + new String(nodeData.getData(), StandardCharsets.UTF_8)); } } } catch (Throwable t) { @@ -292,9 +292,8 @@ public int getInstanceId() { } private void killAndWait(long maxWaitSecs) { - Stopwatch watch = new Stopwatch(); - watch.start(); - while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < maxWaitSecs) { // Kill the application try { kill(); @@ -312,7 +311,7 @@ private void killAndWait(long maxWaitSecs) { // Timeout reached, runnable has not stopped LOG.error("Failed to kill runnable {}, instance {} after {} seconds", runnable, instanceId, - watch.elapsedTime(TimeUnit.SECONDS)); + watch.elapsed(TimeUnit.SECONDS)); // TODO: should we throw exception here? } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java index cfcc7e7f..8fe9d1b0 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.state.MessageCodec; @@ -79,14 +80,14 @@ public void onSuccess(String result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, MoreExecutors.directExecutor()); } @Override public void onFailure(Throwable t) { completion.setException(t); } - }); + }, MoreExecutors.directExecutor()); } private ZKMessages() { diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java index 341d3ee9..34021c82 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java @@ -18,8 +18,6 @@ package org.apache.twill.internal.json; import com.google.common.collect.ImmutableMultimap; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -39,6 +37,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; /** * Gson codec for {@link Arguments}. @@ -48,16 +47,24 @@ public final class ArgumentsCodec implements JsonSerializer, JsonDese private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec()) .create(); - public static void encode(Arguments arguments, OutputSupplier writerSupplier) throws IOException { - try (Writer writer = writerSupplier.getOutput()) { + public static void encode(Arguments arguments, Callable writerSupplier) throws IOException { + try (Writer writer = writerSupplier.call()) { GSON.toJson(arguments, writer); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); } } - public static Arguments decode(InputSupplier readerSupplier) throws IOException { - try (Reader reader = readerSupplier.getInput()) { + public static Arguments decode(Callable readerSupplier) throws IOException { + try (Reader reader = readerSupplier.call()) { return GSON.fromJson(reader, Arguments.class); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java index 4df9081e..3a779b27 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.json; -import com.google.common.base.Charsets; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.gson.Gson; @@ -41,6 +40,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.io.Reader; import java.io.Writer; import java.lang.reflect.ParameterizedType; @@ -85,7 +85,7 @@ public void toJson(TwillRuntimeSpecification spec, Writer writer) { } public void toJson(TwillRuntimeSpecification spec, File file) throws IOException { - try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) { + try (Writer writer = Files.newWriter(file, StandardCharsets.UTF_8)) { toJson(spec, writer); } } @@ -99,7 +99,7 @@ public TwillRuntimeSpecification fromJson(Reader reader) { } public TwillRuntimeSpecification fromJson(File file) throws IOException { - try (Reader reader = Files.newReader(file, Charsets.UTF_8)) { + try (Reader reader = Files.newReader(file, StandardCharsets.UTF_8)) { return fromJson(reader); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java index 832faa82..3ebe9dac 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.kafka.client; -import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -34,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -162,7 +162,7 @@ public void changed(BrokerService brokerService) { String newBrokerList = brokerService.getBrokerList(); // If there is no change, whether it is empty or not, just return - if (Objects.equal(brokerList, newBrokerList)) { + if (Objects.equals(brokerList, newBrokerList)) { return; } diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java index de42b9bc..ff81892d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.kafka.client; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -51,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Set; @@ -307,7 +307,7 @@ private T decodeNodeData(NodeData nodeData, Class type) { if (data == null) { return null; } - return GSON.fromJson(new String(data, Charsets.UTF_8), type); + return GSON.fromJson(new String(data, StandardCharsets.UTF_8), type); } /** diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java index a0d93ec0..a712e3a1 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java @@ -98,7 +98,7 @@ protected void startUp() throws Exception { scheduler.scheduleAtFixedRate(this, PUBLISHER_CLEANUP_SECONDS, PUBLISHER_CLEANUP_SECONDS, TimeUnit.SECONDS); // Start broker service to get auto-updated brokers information. - brokerService.startAndWait(); + brokerService.startAsync().awaitRunning(); } @Override @@ -110,7 +110,7 @@ protected void shutDown() throws Exception { } consumer.stop(); - brokerService.stopAndWait(); + brokerService.stopAsync().awaitTerminated(); LOG.info("KafkaClientService stopped"); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java index 493c4cae..02fd3c5b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java +++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java @@ -20,7 +20,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appender; import ch.qos.logback.core.UnsynchronizedAppenderBase; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; @@ -47,6 +46,7 @@ import org.apache.twill.zookeeper.ZKClients; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List; import java.util.Queue; @@ -154,13 +154,9 @@ public void start() { kafkaClient = new ZKKafkaClientService(zkClientService); Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), - new FutureCallback>>() { + new FutureCallback() { @Override - public void onSuccess(List> result) { - for (ListenableFuture future : result) { - Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING, - "Service is not running."); - } + public void onSuccess(Service.State result) { addInfo("Kafka client started: " + zkConnectStr); scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS); } @@ -209,7 +205,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept List logs = Lists.newArrayListWithExpectedSize(bufferedSize.get()); for (String json : Iterables.consumingIterable(buffer)) { - logs.add(Charsets.UTF_8.encode(json)); + logs.add(StandardCharsets.UTF_8.encode(json)); } long backOffTime = timeoutUnit.toNanos(timeout) / 10; @@ -218,8 +214,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept } try { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); long publishTimeout = timeout; do { @@ -230,7 +225,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept } catch (ExecutionException e) { addError("Failed to publish logs to Kafka.", e); TimeUnit.NANOSECONDS.sleep(backOffTime); - publishTimeout -= stopwatch.elapsedTime(timeoutUnit); + publishTimeout -= stopwatch.elapsed(timeoutUnit); stopwatch.reset(); stopwatch.start(); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java index 79a53d2e..e2686cc9 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.state; -import com.google.common.base.Charsets; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -31,6 +30,7 @@ import org.apache.twill.api.Command; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.Map; /** @@ -54,7 +54,7 @@ public static Message decode(byte[] bytes) { if (bytes == null) { return null; } - String content = new String(bytes, Charsets.UTF_8); + String content = new String(bytes, StandardCharsets.UTF_8); return GSON.fromJson(content, Message.class); } @@ -64,7 +64,7 @@ public static Message decode(byte[] bytes) { * @return byte array representing the encoded message. */ public static byte[] encode(Message message) { - return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8); + return GSON.toJson(message, Message.class).getBytes(StandardCharsets.UTF_8); } /** diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java index 0465d467..111dab33 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java +++ b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java @@ -17,9 +17,11 @@ */ package org.apache.twill.internal.state; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import org.apache.twill.api.Command; +import java.util.Objects; + /** * Implementation of {@code Message} interface to pass information about {@code Command} to execute. */ @@ -59,7 +61,7 @@ public Command getCommand() { @Override public String toString() { - return Objects.toStringHelper(Message.class) + return MoreObjects.toStringHelper(Message.class) .add("type", type) .add("scope", scope) .add("runnable", runnableName) @@ -69,7 +71,7 @@ public String toString() { @Override public int hashCode() { - return Objects.hashCode(type, scope, runnableName, command); + return Objects.hash(type, scope, runnableName, command); } @Override @@ -83,7 +85,7 @@ public boolean equals(Object obj) { Message other = (Message) obj; return type == other.getType() && scope == other.getScope() - && Objects.equal(runnableName, other.getRunnableName()) - && Objects.equal(command, other.getCommand()); + && Objects.equals(runnableName, other.getRunnableName()) + && Objects.equals(command, other.getCommand()); } } diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java index e659ab74..9d47e45f 100644 --- a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java +++ b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java @@ -17,7 +17,7 @@ */ package org.apache.twill.kafka.client; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; /** * Represents broker information. This class should be serializable with Gson. @@ -57,7 +57,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(BrokerInfo.class) + return MoreObjects.toStringHelper(BrokerInfo.class) .add("host", host) .add("port", port) .toString(); diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java index 87040bee..eb538078 100644 --- a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java +++ b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java @@ -17,7 +17,7 @@ */ package org.apache.twill.kafka.client; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; /** * Represents a combination of topic and partition. @@ -62,7 +62,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("topic", topic) .add("partition", partition) .toString(); diff --git a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java index 34f866c9..ec75fbe1 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java @@ -49,7 +49,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout } Service service = new CompositeService(services); - service.start().get(5, TimeUnit.SECONDS); + service.startAsync().awaitRunning(5, TimeUnit.SECONDS); // There should be 10 permits after all 10 services started Assert.assertTrue(semaphore.tryAcquire(10, 5, TimeUnit.SECONDS)); @@ -59,7 +59,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout // Release 10 permits for the stop sequence to start semaphore.release(10); - service.stop().get(5, TimeUnit.SECONDS); + service.stopAsync().awaitTerminated(5, TimeUnit.SECONDS); // There should be no permit left after all 10 services stopped Assert.assertFalse(semaphore.tryAcquire(10)); @@ -80,9 +80,9 @@ public void testErrorStart() throws InterruptedException { Service service = new CompositeService(services); try { - service.start().get(); + service.startAsync().awaitRunning(); Assert.fail(); - } catch (ExecutionException e) { + } catch (IllegalStateException e) { // Expected } diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java index 1a5b7785..3c9ecfc1 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java @@ -50,17 +50,17 @@ public class ControllerTest { @Test public void testController() throws ExecutionException, InterruptedException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); Service service = createService(zkClientService, runId); - service.startAndWait(); + service.startAsync().awaitRunning(); TwillController controller = getController(zkClientService, "testController", runId); controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS); @@ -76,10 +76,10 @@ public void terminated(Service.State from) { Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS)); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -87,13 +87,13 @@ public void terminated(Service.State from) { @Test public void testControllerBefore() throws InterruptedException, ExecutionException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); final CountDownLatch runLatch = new CountDownLatch(1); TwillController controller = getController(zkClientService, "testControllerBefore", runId); @@ -105,7 +105,7 @@ public void run() { }, Threads.SAME_THREAD_EXECUTOR); Service service = createService(zkClientService, runId); - service.start(); + service.startAsync(); Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS)); @@ -116,11 +116,11 @@ public void run() { // Expected } - service.stop(); + service.stopAsync(); controller.awaitTerminated(120, TimeUnit.SECONDS); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -128,16 +128,16 @@ public void run() { @Test public void testControllerListener() throws InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); Service service = createService(zkClientService, runId); - service.startAndWait(); + service.startAsync().awaitRunning(); final CountDownLatch runLatch = new CountDownLatch(1); TwillController controller = getController(zkClientService, "testControllerListener", runId); @@ -150,11 +150,11 @@ public void run() { Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -212,7 +212,7 @@ public ResourceReport getResourceReport() { return null; } }; - controller.startAndWait(); + controller.startAsync().awaitRunning(); return controller; } } diff --git a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java index 058fb439..0c0657cd 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java @@ -19,7 +19,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import org.junit.Assert; @@ -45,8 +44,8 @@ public void testChain() throws ExecutionException, InterruptedException { Service s2 = new DummyService("s2", transiting); Service s3 = new DummyService("s3", transiting); - Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get(); - Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get(); + Services.chainStart(s1, s2, s3).get(); + Services.chainStop(s3, s2, s1).get(); } @Test @@ -54,8 +53,8 @@ public void testCompletion() throws ExecutionException, InterruptedException { Service service = new DummyService("s1", new AtomicBoolean()); ListenableFuture completion = Services.getCompletionFuture(service); - service.start(); - service.stop(); + service.startAsync(); + service.stopAsync(); completion.get(); @@ -63,9 +62,9 @@ public void testCompletion() throws ExecutionException, InterruptedException { service = new DummyService("s2", transiting); completion = Services.getCompletionFuture(service); - service.startAndWait(); + service.startAsync().awaitRunning(); transiting.set(true); - service.stop(); + service.stopAsync(); try { completion.get(); diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java index 4609ebd9..7b542984 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java @@ -132,7 +132,9 @@ private void unjar(File jarFile, File targetDir) throws IOException { target.mkdirs(); } else { target.getParentFile().mkdirs(); - ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target)); + try (java.io.OutputStream os = new java.io.FileOutputStream(target)) { + ByteStreams.copy(jarInput, os); + } } jarEntry = jarInput.getNextJarEntry(); diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java index e7e7e7ca..fce001e5 100644 --- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java +++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java @@ -17,7 +17,6 @@ */ package org.apache.twill.kafka.client; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.Queues; import com.google.common.util.concurrent.Futures; @@ -40,6 +39,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Properties; import java.util.concurrent.BlockingQueue; @@ -65,11 +65,11 @@ public class KafkaTest { @BeforeClass public static void init() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); // Extract the kafka.tgz and start the kafka server kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr())); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitRunning(); zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); @@ -80,8 +80,8 @@ public static void init() throws Exception { @AfterClass public static void finish() throws Exception { Services.chainStop(kafkaClient, zkClientService).get(); - kafkaServer.stopAndWait(); - zkServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -91,15 +91,15 @@ public void testKafkaClientReconnect() throws Exception { EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig); ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { zkClient.create("/", null, CreateMode.PERSISTENT).get(); ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); try { - server.startAndWait(); + server.startAsync().awaitRunning(); try { // Publish a messages createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start(); @@ -114,7 +114,7 @@ public long onReceived(Iterator messages) { while (messages.hasNext()) { FetchedMessage message = messages.next(); nextOffset = message.getNextOffset(); - queue.offer(Charsets.UTF_8.decode(message.getPayload()).toString()); + queue.offer(StandardCharsets.UTF_8.decode(message.getPayload()).toString()); } return nextOffset; } @@ -128,12 +128,12 @@ public void finished() { Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS)); // Shutdown the server - server.stopAndWait(); + server.stopAsync().awaitTerminated(); // Start the server again. // Needs to create a new instance with the same config since guava service cannot be restarted server = new EmbeddedKafkaServer(kafkaServerConfig); - server.startAndWait(); + server.startAsync().awaitRunning(); // Wait a little while to make sure changes is reflected in broker service TimeUnit.SECONDS.sleep(3); @@ -146,13 +146,13 @@ public void finished() { cancel.cancel(); } finally { - kafkaClient.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); } } finally { - server.stopAndWait(); + server.stopAsync().awaitTerminated(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -180,7 +180,7 @@ public long onReceived(Iterator messages) { while (messages.hasNext()) { FetchedMessage message = messages.next(); nextOffset = message.getNextOffset(); - LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + LOG.info(StandardCharsets.UTF_8.decode(message.getPayload()).toString()); latch.countDown(); } return nextOffset; @@ -222,7 +222,7 @@ public long onReceived(Iterator messages) { FetchedMessage message = messages.next(); nextOffset = message.getNextOffset() + 1; offsetQueue.offer(message.getOffset()); - LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + LOG.info(StandardCharsets.UTF_8.decode(message.getPayload()).toString()); return nextOffset; } return nextOffset; @@ -247,17 +247,17 @@ public void testBrokerChange() throws Exception { // Create a new namespace in ZK for Kafka server for this test case String connectionStr = zkServer.getConnectionStr() + "/broker_change"; ZKClientService zkClient = ZKClientService.Builder.of(connectionStr).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClient.create("/", null, CreateMode.PERSISTENT).get(); // Start a new kafka server File logDir = TMP_FOLDER.newFolder(); EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + server.startAsync().awaitRunning(); // Start a Kafka client KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); // Attach a consumer final BlockingQueue consumedMessages = Queues.newLinkedBlockingQueue(); @@ -269,7 +269,7 @@ public long onReceived(Iterator messages) { while (messages.hasNext()) { FetchedMessage message = messages.next(); nextOffset = message.getNextOffset(); - consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString()); + consumedMessages.add(StandardCharsets.UTF_8.decode(message.getPayload()).toString()); } return nextOffset; } @@ -282,26 +282,26 @@ public void finished() { // Get a publisher and publish a message KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.FIRE_AND_FORGET, Compression.NONE); - publisher.prepare("test").add(Charsets.UTF_8.encode("Message 0"), 0).send().get(); + publisher.prepare("test").add(StandardCharsets.UTF_8.encode("Message 0"), 0).send().get(); // Should receive one message Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS)); // Now shutdown and restart the server on different port - server.stopAndWait(); + server.stopAsync().awaitTerminated(); server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + server.startAsync().awaitRunning(); // Wait a little while to make sure changes is reflected in broker service TimeUnit.SECONDS.sleep(3); // Now publish again with the same publisher. It should succeed and the consumer should receive the message. - publisher.prepare("test").add(Charsets.UTF_8.encode("Message 1"), 0).send().get(); + publisher.prepare("test").add(StandardCharsets.UTF_8.encode("Message 1"), 0).send().get(); Assert.assertEquals("Message 1", consumedMessages.poll(5, TimeUnit.SECONDS)); - kafkaClient.stopAndWait(); - zkClient.stopAndWait(); - server.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); + zkClient.stopAsync().awaitTerminated(); + server.stopAsync().awaitTerminated(); } private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, @@ -315,7 +315,7 @@ private Thread createPublishThread(final KafkaClient kafkaClient, final String t KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression); KafkaPublisher.Preparer preparer = publisher.prepare(topic); for (int i = 0; i < count; i++) { - preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0); + preparer.add(StandardCharsets.UTF_8.encode((base + i) + " " + message), 0); } Futures.getUnchecked(preparer.send()); }); diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java index b020dc7a..f2d03fc8 100644 --- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java +++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java @@ -17,7 +17,6 @@ */ package org.apache.twill.discovery; -import com.google.common.base.Charsets; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -30,6 +29,7 @@ import com.google.gson.JsonSerializer; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.net.InetSocketAddress; /** @@ -47,7 +47,7 @@ final class DiscoverableAdapter { * @return array of bytes representing an instance of discoverable */ static byte[] encode(Discoverable discoverable) { - return GSON.toJson(discoverable, Discoverable.class).getBytes(Charsets.UTF_8); + return GSON.toJson(discoverable, Discoverable.class).getBytes(StandardCharsets.UTF_8); } /** @@ -59,7 +59,7 @@ static Discoverable decode(byte[] encoded) { if (encoded == null) { return null; } - return GSON.fromJson(new String(encoded, Charsets.UTF_8), Discoverable.class); + return GSON.fromJson(new String(encoded, StandardCharsets.UTF_8), Discoverable.class); } private DiscoverableAdapter() { diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java index dcf39351..5fd0bf07 100644 --- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java +++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java @@ -47,20 +47,20 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase { @BeforeClass public static void beforeClass() { zkServer = InMemoryZKServer.builder().setTickTime(100000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); zkClient = ZKClientServices.delegate( ZKClients.retryOnFailure( ZKClients.reWatchOnExpire( ZKClientService.Builder.of(zkServer.getConnectionStr()).build()), RetryStrategies.fixDelay(1, TimeUnit.SECONDS))); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); } @AfterClass public static void afterClass() { - zkClient.stopAndWait(); - zkServer.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Test (timeout = 30000) @@ -87,7 +87,7 @@ public void testDoubleRegister() throws Exception { ZKClients.reWatchOnExpire( ZKClientService.Builder.of(zkServer.getConnectionStr()).build()), RetryStrategies.fixDelay(1, TimeUnit.SECONDS))); - zkClient2.startAndWait(); + zkClient2.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2)) { cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321); @@ -98,7 +98,7 @@ public void testDoubleRegister() throws Exception { public void run() { try { TimeUnit.SECONDS.sleep(2); - zkClient2.stopAndWait(); + zkClient2.stopAsync().awaitTerminated(); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } @@ -109,7 +109,7 @@ public void run() { cancellable = register(discoveryService, "test_multi_client", "localhost", 54321); cancellable.cancel(); } finally { - zkClient2.stopAndWait(); + zkClient2.stopAsync().awaitTerminated(); } } finally { closeServices(entry); diff --git a/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java b/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java index 4b8641e4..040770f9 100644 --- a/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java +++ b/twill-ext/src/main/java/org/apache/twill/ext/BundledJarRunner.java @@ -17,14 +17,13 @@ */ package org.apache.twill.ext; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; -import com.google.common.io.Files; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.nio.file.Files; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; @@ -35,6 +34,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; +import java.util.Objects; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; @@ -85,7 +85,7 @@ public void load(ClassLoader parentClassLoader) throws IOException, ClassNotFoun Preconditions.checkNotNull(libFolder); File inputJarFile = this.jarFile; - File outputJarDir = Files.createTempDir(); + File outputJarDir = Files.createTempDirectory("bundled-jar").toFile(); LOG.debug("Unpacking jar to " + outputJarDir.getAbsolutePath()); JarFile jarFile = new JarFile(inputJarFile); @@ -220,15 +220,15 @@ public boolean equals(Object o) { } Arguments arguments = (Arguments) o; - return Objects.equal(jarFileName, arguments.jarFileName) - && Objects.equal(libFolder, arguments.libFolder) + return Objects.equals(jarFileName, arguments.jarFileName) + && Objects.equals(libFolder, arguments.libFolder) && Arrays.deepEquals(mainArgs, arguments.mainArgs) - && Objects.equal(mainClassName, arguments.mainClassName); + && Objects.equals(mainClassName, arguments.mainClassName); } @Override public int hashCode() { - return Objects.hashCode(jarFileName, mainClassName, mainArgs, libFolder); + return Objects.hash(jarFileName, mainClassName, mainArgs, libFolder); } public String[] toArray() { diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java index 42bff62d..da5bd24a 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java @@ -86,12 +86,12 @@ protected void startUp() throws Exception { trackerAddr.getPort(), trackerUrl.toString()); maxCapability = response.getMaximumResourceCapability(); - nmClient.startAndWait(); + nmClient.startAsync().awaitRunning(); } @Override protected void shutDown() throws Exception { - nmClient.stopAndWait(); + nmClient.stopAsync().awaitTerminated(); amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString()); amrmClient.stop(); } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index ca0bc080..06e43a88 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -25,7 +25,6 @@ import ch.qos.logback.core.spi.FilterReply; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import org.apache.hadoop.conf.Configuration; @@ -81,7 +80,7 @@ protected final void doMain(final Service mainService, Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - mainService.stopAndWait(); + mainService.stopAsync().awaitTerminated(); } }); @@ -93,7 +92,7 @@ public void run() { try { // Starts the service LOG.info("Starting service {}.", mainService); - Futures.allAsList(Services.chainStart(requiredServices, mainService).get()).get(); + Services.chainStart(requiredServices, mainService).get(); LOG.info("Service {} started.", mainService); } catch (Throwable t) { LOG.error("Exception when starting service {}.", mainService, t); @@ -110,7 +109,7 @@ public void run() { throw Throwables.propagate(t); } } finally { - requiredServices.stopAndWait(); + requiredServices.stopAsync().awaitTerminated(); ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory(); if (loggerFactory instanceof LoggerContext) { diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java index 38cb32cf..c182e899 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java @@ -17,9 +17,10 @@ */ package org.apache.twill.internal.appmaster; -import com.google.common.base.Objects; import org.apache.hadoop.yarn.api.records.Resource; +import java.util.Objects; + /** * This class defines how the containers should be allocated. */ @@ -93,9 +94,9 @@ public boolean equals(Object obj) { } AllocationSpecification other = (AllocationSpecification) obj; return (instanceId == other.instanceId) && - Objects.equal(resource, other.resource) && - Objects.equal(type, other.type) && - Objects.equal(runnableName, other.runnableName); + Objects.equals(resource, other.resource) && + Objects.equals(type, other.type) && + Objects.equals(runnableName, other.runnableName); } @Override diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 036be811..01e45dd2 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -169,7 +169,7 @@ protected void startUp() throws Exception { // no left over content from previous AM attempt. LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get(); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitRunning(); } @Override @@ -183,7 +183,7 @@ protected void shutDown() throws Exception { // Ignore LOG.info("Kafka shutdown delay interrupted", e); } finally { - kafkaServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); } } @@ -230,13 +230,13 @@ private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerSer @Override protected void startUp() throws Exception { trackerService.setHost(yarnAMClient.getHost()); - trackerService.startAndWait(); + trackerService.startAsync().awaitRunning(); yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl()); try { - yarnAMClient.startAndWait(); + yarnAMClient.startAsync().awaitRunning(); } catch (Exception e) { - trackerService.stopAndWait(); + trackerService.stopAsync().awaitTerminated(); throw e; } } @@ -244,9 +244,9 @@ protected void startUp() throws Exception { @Override protected void shutDown() throws Exception { try { - yarnAMClient.stopAndWait(); + yarnAMClient.stopAsync().awaitTerminated(); } finally { - trackerService.stopAndWait(); + trackerService.stopAsync().awaitTerminated(); } } } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 6649bf41..7ac349c3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -21,14 +21,15 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.DiscreteDomains; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; -import com.google.common.collect.Ranges; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; @@ -636,7 +637,7 @@ private long checkProvisionTimeout(long nextTimeoutCheck) { if (action.getTimeout() < 0) { // Abort application stopStatus = StopStatus.ABORTED; - stop(); + stopAsync(); } else { return nextTimeoutCheck + action.getTimeout(); } @@ -1023,7 +1024,7 @@ public void run() { int runningCount = runningContainers.count(runnableName); Set instancesToRemove = instanceIds == null ? null : ImmutableSet.copyOf(instanceIds); if (instancesToRemove == null) { - instancesToRemove = Ranges.closedOpen(0, runningCount).asSet(DiscreteDomains.integers()); + instancesToRemove = ContiguousSet.create(Range.closedOpen(0, runningCount), DiscreteDomain.integers()); } LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName); diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java index e48dcbb6..b226e3f2 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java @@ -17,7 +17,7 @@ */ package org.apache.twill.internal.appmaster; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; import org.apache.twill.common.Cancellable; import org.apache.twill.internal.EnvKeys; @@ -50,7 +50,7 @@ public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmC @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("container", containerInfo) .toString(); } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java index f8ed27ec..29d1e997 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.appmaster; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.HashBasedTable; @@ -33,6 +32,7 @@ import com.google.common.hash.Hashing; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.ArrayList; @@ -581,7 +582,7 @@ public void onFailure(Throwable t) { } } } - }); + }, MoreExecutors.directExecutor()); } /** @@ -735,10 +736,10 @@ private Location saveLogLevels() { try { Gson gson = new GsonBuilder().serializeNulls().create(); String jsonStr = gson.toJson(logLevels); - String fileName = Hashing.md5().hashString(jsonStr) + "." + Constants.Files.LOG_LEVELS; + String fileName = Hashing.md5().hashString(jsonStr, StandardCharsets.UTF_8) + "." + Constants.Files.LOG_LEVELS; Location location = applicationLocation.append(fileName); if (!location.exists()) { - try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) { + try (Writer writer = new OutputStreamWriter(location.getOutputStream(), StandardCharsets.UTF_8)) { writer.write(jsonStr); } } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java index e6d86a55..9ddfb4ef 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.container; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.google.common.reflect.TypeToken; @@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.DataInputStream; +import java.nio.charset.StandardCharsets; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -179,7 +179,7 @@ private static ClassLoader getClassLoader() { private static Map> loadLogLevels() throws IOException { File file = new File(Constants.Files.LOG_LEVELS); if (file.exists()) { - try (Reader reader = Files.newReader(file, Charsets.UTF_8)) { + try (Reader reader = Files.newReader(file, StandardCharsets.UTF_8)) { Gson gson = new GsonBuilder().serializeNulls().create(); return gson.fromJson(reader, new TypeToken>>() { }.getType()); } @@ -188,8 +188,9 @@ private static Map> loadLogLevels() throws IOExcepti } private static Arguments decodeArgs() throws IOException { + File file = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ARGUMENTS); return ArgumentsCodec.decode( - Files.newReaderSupplier(new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ARGUMENTS), Charsets.UTF_8)); + () -> java.nio.file.Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)); } @Override diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java index 4b2fc42c..6c6532bb 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; @@ -29,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; /** @@ -56,7 +56,7 @@ static ApplicationMasterLiveNodeData decode(@Nullable NodeData nodeData) { return null; } - JsonElement json = GSON.fromJson(new String(data, Charsets.UTF_8), JsonElement.class); + JsonElement json = GSON.fromJson(new String(data, StandardCharsets.UTF_8), JsonElement.class); if (!json.isJsonObject()) { LOG.warn("Unable to decode live data node."); return null; diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index cf6c3b22..dae6140a 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -124,10 +124,10 @@ protected void doStartUp() { LOG.info("Application {} with id {} submitted", appName, appId); YarnApplicationState state = report.getYarnApplicationState(); - Stopwatch stopWatch = new Stopwatch().start(); + Stopwatch stopWatch = Stopwatch.createStarted(); LOG.debug("Checking yarn application status for {} {}", appName, appId); - while (!hasRun(state) && stopWatch.elapsedTime(startTimeoutUnit) < startTimeout) { + while (!hasRun(state) && stopWatch.elapsed(startTimeoutUnit) < startTimeout) { report = processController.getReport(); state = report.getYarnApplicationState(); LOG.debug("Yarn application status for {} {}: {}", appName, appId, state); @@ -168,13 +168,13 @@ protected synchronized void doShutDown() { FinalApplicationStatus finalStatus; // Poll application status from yarn try (ProcessController processController = this.processController) { - Stopwatch stopWatch = new Stopwatch().start(); + Stopwatch stopWatch = Stopwatch.createStarted(); YarnApplicationReport report = processController.getReport(); finalStatus = report.getFinalApplicationStatus(); ApplicationId appId = report.getApplicationId(); while (finalStatus == FinalApplicationStatus.UNDEFINED && - stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < timeoutMillis) { + stopWatch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { LOG.debug("Yarn application final status for {} {}: {}", appName, appId, finalStatus); TimeUnit.SECONDS.sleep(1); finalStatus = processController.getReport().getFinalApplicationStatus(); diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index c67406a4..3df7d83c 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -35,7 +34,6 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.ByteStreams; -import com.google.common.io.OutputSupplier; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -447,7 +445,7 @@ public ProcessController call() throws Exception { YarnTwillController controller = controllerFactory.create(runId, isLogCollectionEnabled(), logHandlers, submitTask, timeout, timeoutUnit); - controller.start(); + controller.startAsync(); return controller; } catch (Exception e) { LOG.error("Failed to submit application {}", twillSpec.getName(), e); @@ -605,7 +603,7 @@ private void createApplicationJar(final ApplicationBundler bundler, List classList = classes.stream().map(Class::getName).sorted().collect(Collectors.toList()); Hasher hasher = Hashing.md5().newHasher(); for (String name : classList) { - hasher.putString(name); + hasher.putString(name, StandardCharsets.UTF_8); } // Only depends on class list so that it can be reused across different launches String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR; @@ -811,12 +809,9 @@ private void saveClassPaths(Path targetDir) throws IOException { private JvmOptions saveJvmOptions(final Path targetPath) throws IOException { // Append runnable specific extra options. Map runnableExtraOptions = Maps.newHashMap( - Maps.transformValues(this.runnableExtraOptions, new Function() { - @Override - public String apply(String options) { - return addClassLoaderClassName(extraOptions.isEmpty() ? options : extraOptions + " " + options); - } - })); + Maps.transformValues(this.runnableExtraOptions, (String options) -> + addClassLoaderClassName(extraOptions.isEmpty() ? options : extraOptions + " " + options) + )); String globalOptions = addClassLoaderClassName(extraOptions); JvmOptions jvmOptions = new JvmOptions(globalOptions, runnableExtraOptions, debugOptions); @@ -836,12 +831,7 @@ public String apply(String options) { private void saveArguments(Arguments arguments, final Path targetPath) throws IOException { LOG.debug("Creating {}", targetPath); - ArgumentsCodec.encode(arguments, new OutputSupplier() { - @Override - public Writer getOutput() throws IOException { - return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8); - } - }); + ArgumentsCodec.encode(arguments, () -> Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)); LOG.debug("Done {}", targetPath); } diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 15902f26..cf62e99d 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -180,12 +180,12 @@ protected void shutDown() throws Exception { @Override public void start() { - serviceDelegate.startAndWait(); + serviceDelegate.startAsync().awaitRunning(); } @Override public void stop() { - serviceDelegate.stopAndWait(); + serviceDelegate.stopAsync().awaitTerminated(); } /** @@ -347,7 +347,7 @@ public Iterable lookupLive() { } private void startUp() throws Exception { - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); // Create the root node, so that the namespace root would get created if it is missing // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception. @@ -431,7 +431,7 @@ private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, return !activeLocations.contains(location); }); - cleaner.startAndWait(); + cleaner.startAsync().awaitRunning(); return cleaner; } @@ -442,14 +442,14 @@ private void shutDown() throws Exception { // daemon threads. synchronized (this) { if (locationCacheCleaner != null) { - locationCacheCleaner.stopAndWait(); + locationCacheCleaner.stopAsync().awaitTerminated(); } if (secureStoreScheduler != null) { secureStoreScheduler.shutdownNow(); } } watchCancellable.cancel(); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } private Cancellable watchLiveApps() { @@ -600,7 +600,7 @@ public void onSuccess(NodeData result) { YarnTwillController controller = listenController( new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient)); controllers.put(appName, runId, controller); - controller.start(); + controller.startAsync(); } } } diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java index 379e37bb..01ccdc5c 100644 --- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java +++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java @@ -17,7 +17,6 @@ */ package org.apache.twill.filesystem; -import com.google.common.base.Charsets; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -30,6 +29,7 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Reader; @@ -82,7 +82,7 @@ public void testBasic() throws Exception { Location location2 = factory.create("/file2"); String message = "Testing Message"; - try (Writer writer = new OutputStreamWriter(location2.getOutputStream(), Charsets.UTF_8)) { + try (Writer writer = new OutputStreamWriter(location2.getOutputStream(), StandardCharsets.UTF_8)) { writer.write(message); } long length = location2.length(); @@ -91,7 +91,7 @@ public void testBasic() throws Exception { location2.renameTo(location); Assert.assertFalse(location2.exists()); - try (Reader reader = new InputStreamReader(location.getInputStream(), Charsets.UTF_8)) { + try (Reader reader = new InputStreamReader(location.getInputStream(), StandardCharsets.UTF_8)) { Assert.assertEquals(message, CharStreams.toString(reader)); } Assert.assertEquals(length, location.length()); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java index 4c7d84b4..e5a39321 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java @@ -132,8 +132,8 @@ public boolean waitForSize(Iterable iterable, int count, int limit) throw * @throws Exception if the task through exception or timeout. */ public void waitFor(T expected, Callable callable, long timeout, long delay, TimeUnit unit) throws Exception { - Stopwatch stopwatch = new Stopwatch().start(); - while (callable.call() != expected && stopwatch.elapsedTime(unit) < timeout) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (callable.call() != expected && stopwatch.elapsed(unit) < timeout) { unit.sleep(delay); } } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java index 18691a05..8aa20fca 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.nio.charset.StandardCharsets; import java.io.IOException; import java.io.InputStreamReader; @@ -48,7 +48,8 @@ public void run() { Process process = new ProcessBuilder(ImmutableList.copyOf(Splitter.on(' ').split(cmd))) .redirectErrorStream(true).start(); try ( - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.US_ASCII)) + BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.US_ASCII)) ) { String line = reader.readLine(); while (line != null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java index 278a4363..c57d304a 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.google.common.io.LineReader; @@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; @@ -93,8 +93,10 @@ public void run() { Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), discoverable.getSocketAddress().getPort()) ) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); - LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); + LineReader reader = new LineReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); writer.println(msg); Assert.assertEquals(msg, reader.readLine()); @@ -161,7 +163,7 @@ public void run() { @Test public void testZKCleanup() throws Exception { final ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { TwillRunner runner = getTwillRunner(); @@ -222,7 +224,7 @@ public Stat call() throws Exception { }, 10000, 100, TimeUnit.MILLISECONDS); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -239,8 +241,7 @@ public Stat call() throws Exception { private ResourceReport waitForAfterRestartResourceReport(TwillController controller, String runnable, long timeout, TimeUnit timeoutUnit, int numOfResources, @Nullable Map instanceIdToContainerId) { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); do { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { @@ -271,7 +272,7 @@ private ResourceReport waitForAfterRestartResourceReport(TwillController control } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); LOG.error("Unable to get different container ids for restart."); return null; diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java index 4309cb41..cefc60a4 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.io.LineReader; import com.google.common.util.concurrent.SettableFuture; @@ -33,6 +32,7 @@ import org.junit.Test; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; @@ -66,8 +66,10 @@ public void testEnv() throws Exception { Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), discoverable.getSocketAddress().getPort()) ) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); - LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); + LineReader reader = new LineReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); writer.println("GREETING"); Assert.assertEquals(entry.getValue(), reader.readLine()); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java index bd2b245f..34128d8c 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java @@ -105,8 +105,8 @@ public void testKilled() throws IOException, InterruptedException, TimeoutExcept .start(); // Wait for the runnable to run and create runFile within 120 secs File runFile = new File(parentFolder, RUN_FILE); - Stopwatch stopwatch = new Stopwatch().start(); - while (!runFile.exists() && stopwatch.elapsedTime(TimeUnit.SECONDS) < 120) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (!runFile.exists() && stopwatch.elapsed(TimeUnit.SECONDS) < 120) { TimeUnit.SECONDS.sleep(1); } Assert.assertTrue(runFile.exists()); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java index 8525b3b1..df30ac3d 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.collect.Sets; import com.google.common.io.LineReader; import org.apache.twill.api.Command; @@ -30,6 +29,7 @@ import org.junit.Test; import java.io.BufferedReader; +import java.nio.charset.StandardCharsets; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; @@ -85,8 +85,10 @@ private Set getInstances(Iterable discoverables) throws I for (Discoverable discoverable : discoverables) { InetSocketAddress socketAddress = discoverable.getSocketAddress(); try (Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort())) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); - LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); + LineReader reader = new LineReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String msg = "Failure"; writer.println(msg); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java index b646bbc9..9b12e666 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.io.Files; import com.google.common.io.LineReader; @@ -78,8 +77,10 @@ public void testLocalFile() throws Exception { InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress(); try (Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort())) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); - LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); + LineReader reader = new LineReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String msg = "Local file test"; writer.println(msg); @@ -142,11 +143,11 @@ public void handleRequest(BufferedReader reader, PrintWriter writer) throws Exce LOG.info("handleRequest"); // Read from the localized file - writer.println(Files.readFirstLine(new File("header/header.txt"), Charsets.UTF_8)); + writer.println(Files.readFirstLine(new File("header/header.txt"), StandardCharsets.UTF_8)); // Read from the request writer.println(reader.readLine()); // Read from resource - writer.println(Files.readFirstLine(new File(footerURL.toURI()), Charsets.UTF_8)); + writer.println(Files.readFirstLine(new File(footerURL.toURI()), StandardCharsets.UTF_8)); LOG.info("Flushed response"); } } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java index 43787f0d..2b5a945c 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java @@ -252,9 +252,8 @@ private void waitForLogLevel(TwillController controller, String runnable, long t Map expectedArgs, int expectedInstances) throws InterruptedException { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (stopwatch.elapsedTime(timeoutUnit) < timeout) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (stopwatch.elapsed(timeoutUnit) < timeout) { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java index 717a80f0..58d2c9f1 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java @@ -161,8 +161,7 @@ public void run() { private boolean waitForLogLevel(TwillController controller, String runnable, long timeout, TimeUnit timeoutUnit, @Nullable LogEntry.Level expected) throws InterruptedException { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); do { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { @@ -175,7 +174,7 @@ private boolean waitForLogLevel(TwillController controller, String runnable, lon } } TimeUnit.MILLISECONDS.sleep(100); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); return false; } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java index a61880fe..49fbfd84 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -41,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -121,8 +121,10 @@ public void run() { Socket socket = new Socket(discoverable.getSocketAddress().getHostName(), discoverable.getSocketAddress().getPort()) ) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); - LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); + LineReader reader = new LineReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); writer.println(expected.getKey()); Assert.assertEquals(expected.getValue(), reader.readLine()); } @@ -171,7 +173,8 @@ public void run() { Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), discoverable.getSocketAddress().getPort()) ) { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true); writer.println("0"); } @@ -306,8 +309,8 @@ public Iterator iterator() { private ResourceReport getResourceReport(TwillController controller, long timeoutMillis) { ResourceReport report = controller.getResourceReport(); - Stopwatch stopwatch = new Stopwatch(); - while (report == null && stopwatch.elapsedMillis() < timeoutMillis) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (report == null && stopwatch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); report = controller.getResourceReport(); } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java index 6dcec8f6..172e9141 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java @@ -284,8 +284,7 @@ public void testRestartRunnable() throws Exception { private void waitForContainers(TwillController controller, int count, long timeout, TimeUnit timeoutUnit) throws Exception { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); int yarnContainers = 0; int twillContainers = 0; do { @@ -298,7 +297,7 @@ private void waitForContainers(TwillController controller, int count, long timeo } } TimeUnit.SECONDS.sleep(1); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); throw new TimeoutException("Timeout reached while waiting for num containers to be " + count + ". Yarn containers = " + yarnContainers + ", Twill containers = " + twillContainers); @@ -306,8 +305,7 @@ private void waitForContainers(TwillController controller, int count, long timeo private void waitForInstance(TwillController controller, String runnable, String yarnInstanceId, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); do { ResourceReport report = controller.getResourceReport(); if (report != null && report.getRunnableResources(runnable) != null) { @@ -318,7 +316,7 @@ private void waitForInstance(TwillController controller, String runnable, String } } TimeUnit.SECONDS.sleep(1); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); throw new TimeoutException("Timeout reached while waiting for runnable " + runnable + " instance " + yarnInstanceId); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java index 74cf80e1..f672c1a9 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java @@ -89,8 +89,7 @@ private boolean expireAppMasterZKSession(TwillController controller, long timeou MBeanServer mbeanServer = MBeanRegistry.getInstance().getPlatformMBeanServer(); QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName())); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); + Stopwatch stopwatch = Stopwatch.createStarted(); do { // Find the AM session and expire it Set connectionBeans = mbeanServer.queryNames(ObjectName.WILDCARD, query); @@ -108,7 +107,7 @@ private boolean expireAppMasterZKSession(TwillController controller, long timeou } } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); return false; } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java index d59a15ba..f1ad27fa 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java @@ -17,7 +17,6 @@ */ package org.apache.twill.yarn; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; import org.apache.twill.api.AbstractTwillRunnable; import org.apache.twill.api.TwillContext; @@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.nio.charset.StandardCharsets; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; @@ -82,7 +82,8 @@ public void run() { try { while (running) { try (Socket socket = serverSocket.accept()) { - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + BufferedReader reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); handleRequest(reader, writer); } catch (SocketException e) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index 407d519a..05a988e4 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -111,7 +111,7 @@ protected void before() throws Throwable { // Starts Zookeeper zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); // Start YARN mini cluster File miniDFSDir = tmpFolder.newFolder(); @@ -241,7 +241,7 @@ public ApplicationResourceUsageReport getApplicationResourceReport(String appId) private void stopQuietly(Service service) { try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { LOG.warn("Failed to stop service {}.", service, e); } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java index cad233e5..18c4aeef 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java @@ -17,11 +17,11 @@ */ package org.apache.twill.internal.zookeeper; -import com.google.common.base.Objects; import org.apache.twill.zookeeper.NodeChildren; import org.apache.zookeeper.data.Stat; import java.util.List; +import java.util.Objects; /** * @@ -61,6 +61,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(children, stat); + return Objects.hash(children, stat); } } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java index 13df625c..2215c06e 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java @@ -17,11 +17,11 @@ */ package org.apache.twill.internal.zookeeper; -import com.google.common.base.Objects; import org.apache.twill.zookeeper.NodeData; import org.apache.zookeeper.data.Stat; import java.util.Arrays; +import java.util.Objects; /** * A straightforward implementation for {@link NodeData}. @@ -62,6 +62,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(data, stat); + return Objects.hash(data, stat); } } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java index dc2bfa99..3f0ae281 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java @@ -58,6 +58,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -189,14 +190,14 @@ public void onFailure(Throwable t) { // handle the failure updateFailureResult(t, result, path, ignoreNodeExists); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } @Override public void onFailure(Throwable t) { result.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } /** @@ -236,7 +237,7 @@ private String getParent(String path) { String parentPath = path.substring(0, path.lastIndexOf('/')); return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath; } - }); + }, Threads.SAME_THREAD_EXECUTOR); return result; } @@ -302,13 +303,19 @@ public ZooKeeper get() { } @Override - public ListenableFuture start() { - return serviceDelegate.start(); + public Service startAsync() { + serviceDelegate.startAsync(); + return this; } @Override - public State startAndWait() { - return serviceDelegate.startAndWait(); + public void awaitRunning() { + serviceDelegate.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitRunning(timeout, unit); } @Override @@ -322,13 +329,24 @@ public State state() { } @Override - public ListenableFuture stop() { - return serviceDelegate.stop(); + public Service stopAsync() { + serviceDelegate.stopAsync(); + return this; + } + + @Override + public void awaitTerminated() { + serviceDelegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return serviceDelegate.stopAndWait(); + public Throwable failureCause() { + return serviceDelegate.failureCause(); } @Override @@ -519,7 +537,7 @@ public void run() { // // 1. session expired, hence the expired event is triggered // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client - // 3. Service.stop() gets called, Service.state() changed to STOPPING + // 3. Service.stopAsync() gets called, Service.state() changed to STOPPING // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one closeZooKeeper(zooKeeper.getAndSet(null)); notifyStopped(); diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java index 73ee3088..a07bb080 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java @@ -74,7 +74,7 @@ public OperationFuture create(final String path, @Nullable final byte[] public OperationFuture get() { return FailureRetryZKClient.super.create(path, data, createMode, createParent, acl); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -88,7 +88,7 @@ public OperationFuture exists(final String path, final Watcher watcher) { public OperationFuture get() { return FailureRetryZKClient.super.exists(path, watcher); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -104,7 +104,7 @@ public OperationFuture getChildren(final String path, final Watche public OperationFuture get() { return FailureRetryZKClient.super.getChildren(path, watcher); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -118,7 +118,7 @@ public OperationFuture getData(final String path, final Watcher watche public OperationFuture get() { return FailureRetryZKClient.super.getData(path, watcher); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -132,7 +132,7 @@ public OperationFuture setData(final String dataPath, final byte[] data, f public OperationFuture get() { return FailureRetryZKClient.super.setData(dataPath, data, version); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -148,7 +148,7 @@ public OperationFuture delete(final String deletePath, final int version public OperationFuture get() { return FailureRetryZKClient.super.delete(deletePath, version); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -162,7 +162,7 @@ public OperationFuture getACL(final String path) { public OperationFuture get() { return FailureRetryZKClient.super.getACL(path); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -176,7 +176,7 @@ public OperationFuture setACL(final String path, final Iterable acl, public OperationFuture get() { return FailureRetryZKClient.super.setACL(path, acl, version); } - })); + }), Threads.SAME_THREAD_EXECUTOR); return result; } @@ -230,7 +230,7 @@ private boolean doRetry(Throwable t) { SCHEDULER.schedule(new Runnable() { @Override public void run() { - Futures.addCallback(retryAction.get(), OperationFutureCallback.this); + Futures.addCallback(retryAction.get(), OperationFutureCallback.this, Threads.SAME_THREAD_EXECUTOR); } }, nextRetry, TimeUnit.MILLISECONDS); diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java index d18d5edf..e6161ced 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java @@ -18,9 +18,7 @@ package org.apache.twill.internal.zookeeper; import com.google.common.base.Preconditions; -import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -29,9 +27,13 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @@ -79,7 +81,11 @@ public static Builder builder() { private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port) { if (dataDir == null) { - dataDir = Files.createTempDir(); + try { + dataDir = Files.createTempDirectory("zk").toFile(); + } catch (IOException e) { + throw new RuntimeException("Failed to create temp directory for ZK data", e); + } autoClean = true; } else { Preconditions.checkArgument(dataDir.isDirectory() || dataDir.mkdirs() || dataDir.isDirectory()); @@ -123,13 +129,19 @@ private void cleanDir(File dir) { } @Override - public ListenableFuture start() { - return delegateService.start(); + public Service startAsync() { + delegateService.startAsync(); + return this; + } + + @Override + public void awaitRunning() { + delegateService.awaitRunning(); } @Override - public State startAndWait() { - return delegateService.startAndWait(); + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegateService.awaitRunning(timeout, unit); } @Override @@ -143,13 +155,24 @@ public State state() { } @Override - public ListenableFuture stop() { - return delegateService.stop(); + public Service stopAsync() { + delegateService.stopAsync(); + return this; + } + + @Override + public void awaitTerminated() { + delegateService.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + delegateService.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return delegateService.stopAndWait(); + public Throwable failureCause() { + return delegateService.failureCause(); } @Override diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java index d8bb49d1..bf169298 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java @@ -17,8 +17,6 @@ */ package org.apache.twill.internal.zookeeper; -import com.google.common.base.Charsets; -import com.google.common.base.Optional; import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -39,7 +37,9 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -146,7 +146,7 @@ private byte[] getNodeData() { LOG.warn("Failed to get local hostname.", e); hostname = "unknown"; } - return hostname.getBytes(Charsets.UTF_8); + return hostname.getBytes(StandardCharsets.UTF_8); } private void register() { @@ -234,7 +234,7 @@ private void becomeLeader() { handler.leader(); } catch (Throwable t) { LOG.warn("Exception thrown when calling leader() method. Withdraw from the leader election process.", t); - stop(); + stopAsync(); } } @@ -245,7 +245,7 @@ private void becomeFollower() { handler.follower(); } catch (Throwable t) { LOG.warn("Exception thrown when calling follower() method. Withdraw from the leader election process.", t); - stop(); + stopAsync(); } } @@ -322,8 +322,8 @@ public void run() { } /** - * Find the node to watch for and return it in the {@link com.google.common.base.Optional} value. If this client is - * the leader, return an {@link com.google.common.base.Optional#absent()}. This method also tries to set the + * Find the node to watch for and return it in the {@link Optional} value. If this client is + * the leader, return an {@link Optional#empty()}. This method also tries to set the * zkNodePath if it is not set and return {@code null} if the zkNodePath cannot be determined. */ private Optional findNodeToWatch(List nodes) { @@ -354,7 +354,7 @@ private Optional findNodeToWatch(List nodes) { } } - return nodeToWatch == null ? Optional.absent() : Optional.of(nodeToWatch); + return nodeToWatch == null ? Optional.empty() : Optional.of(nodeToWatch); } /** diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java index 239a6560..6c41a693 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java @@ -135,7 +135,7 @@ public void onSuccess(V result) { public void onFailure(Throwable t) { to.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); return to; } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java index c45db7a5..00e2e6f8 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java @@ -258,7 +258,7 @@ public void onFailure(Throwable t) { completion.setException(t); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); // Gets the result from the completion try { @@ -290,6 +290,7 @@ private void doAcquire(final SettableFuture completion, final boolean wa @Override public void onSuccess(NodeChildren children) { + // Find the lock node in case the creation step failed by matching the guid // See "Recoverable Errors and the GUID" in the ZooKeeper guide final String lockNode = lockPath == null ? findLockNode(children.getChildren(), guid) : lockPath; @@ -353,7 +354,7 @@ public void onFailure(Throwable t) { completion.setException(t); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); } @Override @@ -364,7 +365,7 @@ public void onFailure(Throwable t) { doAcquire(completion, waitForLock, guid, null); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); } /** diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java index 776efe4a..769a1dd3 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.zookeeper.NodeChildren; import org.apache.twill.zookeeper.NodeData; import org.apache.twill.zookeeper.ZKClient; @@ -122,7 +123,7 @@ public void onFailure(Throwable t) { LOG.error("Fail to re-set watch on exists for path " + path, t); } } - }); + }, MoreExecutors.directExecutor()); } private void children() { @@ -168,7 +169,7 @@ public void onFailure(Throwable t) { } LOG.error("Fail to re-set watch on getChildren for path " + path, t); } - }); + }, MoreExecutors.directExecutor()); } private void data() { @@ -202,6 +203,6 @@ public void onFailure(Throwable t) { } LOG.error("Fail to re-set watch on getData for path " + path, t); } - }); + }, MoreExecutors.directExecutor()); } } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java index ed0e0bd5..708cc78d 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.internal.zookeeper.RewatchOnExpireWatcher.ActionType; import org.apache.twill.zookeeper.ForwardingZKClient; import org.apache.twill.zookeeper.NodeChildren; @@ -55,7 +56,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } @@ -76,7 +77,7 @@ public void onSuccess(NodeChildren result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } @@ -97,7 +98,7 @@ public void onSuccess(NodeData result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java index 10391b2d..ca555a51 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java @@ -18,11 +18,12 @@ package org.apache.twill.zookeeper; import com.google.common.base.Supplier; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @@ -42,13 +43,19 @@ public Supplier getZooKeeperSupplier() { } @Override - public ListenableFuture start() { - return delegate.start(); + public Service startAsync() { + delegate.startAsync(); + return this; } @Override - public State startAndWait() { - return Futures.getUnchecked(start()); + public void awaitRunning() { + delegate.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitRunning(timeout, unit); } @Override @@ -62,13 +69,24 @@ public State state() { } @Override - public ListenableFuture stop() { - return delegate.stop(); + public Service stopAsync() { + delegate.stopAsync(); + return this; + } + + @Override + public void awaitTerminated() { + delegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return Futures.getUnchecked(stop()); + public Throwable failureCause() { + return delegate.failureCause(); } @Override diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java index 70520443..71e6f46a 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java @@ -28,8 +28,8 @@ /** * A {@link ZKClient} that extends from {@link Service} to provide lifecycle management functions. - * The {@link #start()} method needed to be called before calling any other method on this interface. - * When the client is no longer needed, call {@link #stop()} to release any resources that it holds. + * The {@link #startAsync()} method needed to be called before calling any other method on this interface. + * When the client is no longer needed, call {@link #stopAsync()} to release any resources that it holds. */ public interface ZKClientService extends ZKClient, Service { diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java index bce63914..13d03dbd 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java @@ -148,7 +148,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } public static Cancellable watchChildren(final ZKClient zkClient, String path, ChildrenCallback callback) { @@ -378,7 +378,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } private static void watchChanges(final Operation operation, final String path, @@ -419,7 +419,7 @@ public void run() { } LOG.error("Failed to watch data for path " + path + " " + t, t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } private ZKOperations() { diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java index 2d4b5d51..8dd8472e 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java @@ -18,8 +18,6 @@ package org.apache.twill.internal.zookeeper; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.twill.api.ElectionHandler; import org.apache.twill.zookeeper.ZKClientService; @@ -72,7 +70,7 @@ public void testElection() throws ExecutionException, InterruptedException, Brok final AtomicInteger currentLeader = new AtomicInteger(-1); for (int i = 0; i < participantCount; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); stopLatch[i] = new CountDownLatch(1); zkClients.add(zkClient); @@ -95,10 +93,10 @@ public void follower() { followerSem.release(); } }); - leaderElection.start(); + leaderElection.startAsync(); stopLatch[idx].await(10, TimeUnit.SECONDS); - leaderElection.stopAndWait(); + leaderElection.stopAsync().awaitTerminated(); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -125,7 +123,7 @@ public void follower() { executor.awaitTermination(5L, TimeUnit.SECONDS); for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } @@ -143,7 +141,7 @@ public void testCancel() throws InterruptedException, IOException { try { for (int i = 0; i < 2; i++) { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClients.add(zkClient); @@ -163,7 +161,7 @@ public void follower() { } for (LeaderElection leaderElection : leaderElections) { - leaderElection.start(); + leaderElection.startAsync(); } leaderSem.tryAcquire(10, TimeUnit.SECONDS); @@ -177,7 +175,7 @@ public void follower() { zkClients.get(follower).getConnectString(), 20000); // Cancel the leader - leaderElections.get(leader).stopAndWait(); + leaderElections.get(leader).stopAsync().awaitTerminated(); // Now follower should still be able to become leader. leaderSem.tryAcquire(30, TimeUnit.SECONDS); @@ -197,19 +195,19 @@ public void follower() { followerSem.release(); } })); - leaderElections.get(follower).start(); + leaderElections.get(follower).startAsync(); // Cancel the follower first. - leaderElections.get(follower).stopAndWait(); + leaderElections.get(follower).stopAsync().awaitTerminated(); // Cancel the leader. - leaderElections.get(leader).stopAndWait(); + leaderElections.get(leader).stopAsync().awaitTerminated(); // Since the follower has been cancelled before leader, there should be no leader. Assert.assertFalse(leaderSem.tryAcquire(10, TimeUnit.SECONDS)); } finally { for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } @@ -218,10 +216,10 @@ public void follower() { public void testDisconnect() throws IOException, InterruptedException { File zkDataDir = tmpFolder.newFolder(); InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); try { ZKClientService zkClient = ZKClientService.Builder.of(ownZKServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { final Semaphore leaderSem = new Semaphore(0); @@ -238,44 +236,44 @@ public void follower() { followerSem.release(); } }); - leaderElection.start(); + leaderElection.startAsync(); leaderSem.tryAcquire(20, TimeUnit.SECONDS); int zkPort = ownZKServer.getLocalAddress().getPort(); // Disconnect by shutting the server and restart it on the same port - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); // Right after disconnect, it should become follower followerSem.tryAcquire(20, TimeUnit.SECONDS); ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); // Right after reconnect, it should be leader again. leaderSem.tryAcquire(20, TimeUnit.SECONDS); // Now disconnect it again, but then cancel it before reconnect, it shouldn't become leader - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); // Right after disconnect, it should become follower followerSem.tryAcquire(20, TimeUnit.SECONDS); - ListenableFuture cancelFuture = leaderElection.stop(); + leaderElection.stopAsync(); ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); - Futures.getUnchecked(cancelFuture); + leaderElection.awaitTerminated(); // After reconnect, it should not be leader Assert.assertFalse(leaderSem.tryAcquire(10, TimeUnit.SECONDS)); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } finally { - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); } } @@ -289,7 +287,7 @@ public void testRace() throws InterruptedException { // This is to test the case when a follower tries to watch for leader node, but the leader is already gone for (int i = 0; i < 2; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); executor.execute(new Runnable() { @Override public void run() { @@ -308,13 +306,13 @@ public void follower() { // no-op } }); - election.startAndWait(); + election.startAsync().awaitRunning(); Uninterruptibles.awaitUninterruptibly(leaderLatch); - election.stopAndWait(); + election.stopAsync().awaitTerminated(); } completeLatch.countDown(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } }); @@ -330,11 +328,11 @@ public void follower() { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java index a617b1de..b130a345 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java @@ -49,12 +49,12 @@ public class ReentrantDistributedLockTest { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } @Test(timeout = 20000) @@ -74,7 +74,7 @@ public void testReentrant() { lock.unlock(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -111,7 +111,7 @@ public void run() { t.join(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -160,8 +160,8 @@ public void run() { Assert.assertTrue(lockAcquired.await(5, TimeUnit.SECONDS)); t.join(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -201,7 +201,7 @@ public void run() { lock.unlock(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -253,8 +253,8 @@ public void run() { lock2.unlock(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -310,7 +310,7 @@ public void run() { Assert.assertTrue(lock.tryLock()); lock.unlock(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -368,8 +368,8 @@ public void run() { Assert.assertTrue(lock1.tryLock()); lock1.unlock(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -418,8 +418,8 @@ public void run() { Assert.assertTrue(lockLatch.await(30, TimeUnit.SECONDS)); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -470,14 +470,14 @@ public void run() { } } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } private ZKClientService createZKClient() { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); return zkClient; } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java index b9cb8a44..de3f1ea7 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.internal.zookeeper.KillZKSession; import org.apache.zookeeper.CreateMode; @@ -66,11 +67,11 @@ public class ZKClientTest { @Test public void testChroot() throws Exception { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/chroot").build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { List> futures = Lists.newArrayList(); futures.add(client.create("/test1/test2", null, CreateMode.PERSISTENT)); @@ -81,21 +82,21 @@ public void testChroot() throws Exception { Assert.assertNotNull(client.exists("/test1/test3").get()); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testCreateParent() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { String path = client.create("/test1/test2/test3/test4/test5", @@ -109,21 +110,21 @@ public void testCreateParent() throws ExecutionException, InterruptedException { } Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData(path).get().getData())); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testGetChildren() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { client.create("/test", null, CreateMode.PERSISTENT).get(); @@ -138,21 +139,21 @@ public void testGetChildren() throws ExecutionException, InterruptedException { Assert.assertEquals(ImmutableSet.of("c1", "c2"), ImmutableSet.copyOf(nodeChildren.getChildren())); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testSetData() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); client.create("/test", null, CreateMode.PERSISTENT).get(); Assert.assertNull(client.getData("/test").get().getData()); @@ -161,14 +162,14 @@ public void testSetData() throws ExecutionException, InterruptedException { Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData("/test").get().getData())); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testExpireRewatch() throws InterruptedException, IOException, ExecutionException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { final CountDownLatch expireReconnectLatch = new CountDownLatch(1); @@ -186,7 +187,7 @@ public void process(WatchedEvent event) { } } }).build())); - client.startAndWait(); + client.startAsync().awaitRunning(); try { final BlockingQueue events = new LinkedBlockingQueue<>(); @@ -203,7 +204,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { LOG.error("Failed to call exists on /expireRewatch", t); } - }); + }, MoreExecutors.directExecutor()); } }); @@ -222,10 +223,10 @@ public void onFailure(Throwable t) { Assert.assertEquals(Watcher.Event.EventType.NodeDeleted, events.poll(60, TimeUnit.SECONDS)); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -233,7 +234,7 @@ public void onFailure(Throwable t) { public void testRetry() throws ExecutionException, InterruptedException, TimeoutException, IOException { File dataDir = tmpFolder.newFolder(); InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); int port = zkServer.getLocalAddress().getPort(); final CountDownLatch disconnectLatch = new CountDownLatch(1); @@ -248,9 +249,9 @@ public void process(WatchedEvent event) { }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS))); final CountDownLatch createLatch = new CountDownLatch(1); - client.startAndWait(); + client.startAsync().awaitRunning(); try { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS)); Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback() { @@ -263,7 +264,7 @@ public void onSuccess(String result) { public void onFailure(Throwable t) { t.printStackTrace(System.out); } - }); + }, MoreExecutors.directExecutor()); TimeUnit.SECONDS.sleep(2); zkServer = InMemoryZKServer.builder() @@ -272,21 +273,21 @@ public void onFailure(Throwable t) { .setPort(port) .setTickTime(1000) .build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS)); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } @Test public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { String userPass = "user:pass"; @@ -297,10 +298,10 @@ public void testACL() throws IOException, ExecutionException, InterruptedExcepti .of(zkServer.getConnectionStr()) .addAuthInfo("digest", userPass.getBytes()) .build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - noAuthClient.startAndWait(); + noAuthClient.startAsync().awaitRunning(); // Create a node that is readable by all client, but admin for the creator @@ -335,11 +336,11 @@ public void testACL() throws IOException, ExecutionException, InterruptedExcepti // Write again with the non-auth client, now should succeed. noAuthClient.setData(path, "test2".getBytes()).get(); - noAuthClient.stopAndWait(); - zkClient.stopAndWait(); + noAuthClient.stopAsync().awaitTerminated(); + zkClient.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -348,7 +349,7 @@ public void testDeadlock() throws IOException, InterruptedException { // This is to test deadlock bug as described in (TWILL-110) // This test has very high chance to get deadlock before the bug fix, hence failed with timeout. InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { for (int i = 0; i < 5000; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); @@ -358,12 +359,12 @@ public void process(WatchedEvent event) { LOG.debug("Connection event: {}", event); } }); - zkClient.startAndWait(); - zkClient.stopAndWait(); + zkClient.startAsync().awaitRunning(); + zkClient.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -387,10 +388,10 @@ public void run() { serverThread.start(); ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build(); - zkClient.start(); + zkClient.startAsync(); Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS)); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); serverThread.interrupt(); } } @@ -398,13 +399,13 @@ public void run() { @Test public void testNamespace() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService zkClient = ZKClientService.Builder .of(zkServer.getConnectionStr()) .build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); ZKClient zk = ZKClients.namespace(zkClient, "/test"); // Create the "/ should create the "/test" from the root @@ -446,7 +447,7 @@ public void testNamespace() throws ExecutionException, InterruptedException { // The namespace must be gone Assert.assertNull(zkClient.exists("/test").get()); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java index 9518d6eb..14b15ca6 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java @@ -34,11 +34,11 @@ public class ZKOperationsTest { @Test public void recursiveDelete() throws ExecutionException, InterruptedException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { client.create("/test1/test10/test101", null, CreateMode.PERSISTENT).get(); @@ -54,10 +54,10 @@ public void recursiveDelete() throws ExecutionException, InterruptedException, T Assert.assertNull(client.exists("/test1").get(2, TimeUnit.SECONDS)); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } }