From 725de953274f8b2c4a53045c4533ffd59dbe5a05 Mon Sep 17 00:00:00 2001 From: "Dustin H." Date: Wed, 1 Apr 2026 17:18:53 +0200 Subject: [PATCH 1/2] add ability to stop client completely will also free resources like listeners and thread pools --- .../tiktok/listener/ListenersManager.java | 7 + .../jwdeveloper/tiktok/live/LiveClient.java | 9 ++ .../tiktok/live/LiveEventsHandler.java | 10 +- .../jwdeveloper/tiktok/TikTokLiveClient.java | 64 +++++++- .../tiktok/TikTokLiveEventHandler.java | 19 ++- .../listener/TikTokListenersManager.java | 14 ++ .../tiktok/TikTokLiveClientStopTest.java | 141 ++++++++++++++++++ .../listener/TikTokListenersManagerTest.java | 11 ++ 8 files changed, 260 insertions(+), 15 deletions(-) create mode 100644 Client/src/test/java/io/github/jwdeveloper/tiktok/TikTokLiveClientStopTest.java diff --git a/API/src/main/java/io/github/jwdeveloper/tiktok/listener/ListenersManager.java b/API/src/main/java/io/github/jwdeveloper/tiktok/listener/ListenersManager.java index e8e9a902..e96de889 100644 --- a/API/src/main/java/io/github/jwdeveloper/tiktok/listener/ListenersManager.java +++ b/API/src/main/java/io/github/jwdeveloper/tiktok/listener/ListenersManager.java @@ -34,4 +34,11 @@ public interface ListenersManager void addListener(Object listener); void removeListener(Object listener); + + /** + * Releases resources held by this manager (e.g. async listener executor). Default no-op. + * Idempotent. + */ + default void shutdown() { + } } diff --git a/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveClient.java b/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveClient.java index 18f9fa9b..7d9646db 100644 --- a/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveClient.java +++ b/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveClient.java @@ -62,6 +62,15 @@ default void disconnect() { disconnect(LiveClientStopType.NORMAL); } + /** + * Shuts down this client permanently: closes the connection, cancels pending reconnect attempts, + * removes event listeners and subscriptions, and releases listener thread-pool resources. + *

+ * The client must not be used after {@code stop()}; further {@link #connect()} calls will fail. + * Idempotent: safe to call more than once. + */ + void stop(); + /** * Use to manually invoke event */ diff --git a/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveEventsHandler.java b/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveEventsHandler.java index be9af421..c83b48c9 100644 --- a/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveEventsHandler.java +++ b/API/src/main/java/io/github/jwdeveloper/tiktok/live/LiveEventsHandler.java @@ -25,9 +25,6 @@ import io.github.jwdeveloper.tiktok.data.events.common.TikTokEvent; import io.github.jwdeveloper.tiktok.live.builder.EventConsumer; -import java.util.HashSet; -import java.util.Optional; - public interface LiveEventsHandler { void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent); @@ -38,4 +35,11 @@ public interface LiveEventsHandler { void unsubscribe(EventConsumer consumer); void unsubscribe(Class clazz, EventConsumer consumer); + + /** + * Removes all event subscriptions. Default implementation does nothing; custom handlers should + * clear their internal subscriber state when supporting full client shutdown. + */ + default void clearSubscriptions() { + } } diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java index ac6acca3..420ee00a 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java @@ -37,10 +37,13 @@ import io.github.jwdeveloper.tiktok.messages.webcast.ProtoMessageFetchResult; import io.github.jwdeveloper.tiktok.models.ConnectionState; import io.github.jwdeveloper.tiktok.websocket.*; +import lombok.AccessLevel; import lombok.Getter; +import java.util.ArrayList; import java.util.Base64; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.logging.Logger; @@ -57,6 +60,13 @@ public class TikTokLiveClient implements LiveClient private final GiftsManager giftManager; private final LiveMessagesHandler messageHandler; + @Getter(AccessLevel.NONE) + private final Object lifecycleLock = new Object(); + @Getter(AccessLevel.NONE) + private final AtomicBoolean stopped = new AtomicBoolean(false); + @Getter(AccessLevel.NONE) + private volatile ScheduledFuture pendingReconnect; + public TikTokLiveClient( LiveMessagesHandler messageHandler, GiftsManager giftsManager, @@ -79,6 +89,11 @@ public TikTokLiveClient( } public void connect() { + synchronized (lifecycleLock) { + if (stopped.get()) { + throw new TikTokLiveException("Client has been stopped and cannot connect again"); + } + } try { if (clientSettings.isUseEulerstreamWebsocket()) tryEulerConnect(); @@ -90,11 +105,19 @@ public void connect() { tikTokEventHandler.publish(this, new TikTokDisconnectedEvent("Exception: " + e.getMessage())); if (e instanceof TikTokLiveOfflineHostException && clientSettings.isRetryOnConnectionFailure()) { - AsyncHandler.getReconnectScheduler().schedule(() -> { - logger.info("Reconnecting"); - tikTokEventHandler.publish(this, new TikTokReconnectingEvent()); - this.connect(); - }, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS); + synchronized (lifecycleLock) { + if (!stopped.get()) { + cancelPendingReconnectLocked(); + pendingReconnect = AsyncHandler.getReconnectScheduler().schedule(() -> { + if (stopped.get()) { + return; + } + logger.info("Reconnecting"); + tikTokEventHandler.publish(this, new TikTokReconnectingEvent()); + this.connect(); + }, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS); + } + } } throw e; } catch (Exception e) { @@ -111,6 +134,10 @@ private void tryEulerConnect() { setState(ConnectionState.CONNECTING); tikTokEventHandler.publish(this, new TikTokConnectingEvent()); + if (stopped.get()) { + setState(ConnectionState.DISCONNECTED); + throw new TikTokLiveException("Connection aborted: client was stopped"); + } webSocketClient.start(null, this); setState(ConnectionState.CONNECTED); } @@ -161,6 +188,10 @@ public void tryConnect() { var liveConnectionRequest = new LiveConnectionData.Request(userData.getRoomInfo().getRoomId()); var liveConnectionData = httpClient.fetchLiveConnectionData(liveConnectionRequest); + if (stopped.get()) { + setState(ConnectionState.DISCONNECTED); + throw new TikTokLiveException("Connection aborted: client was stopped"); + } webSocketClient.start(liveConnectionData, this); setState(ConnectionState.CONNECTED); @@ -174,6 +205,29 @@ public void disconnect(LiveClientStopType type) { setState(ConnectionState.DISCONNECTED); } + @Override + public void stop() { + synchronized (lifecycleLock) { + if (!stopped.compareAndSet(false, true)) { + return; + } + cancelPendingReconnectLocked(); + } + disconnect(LiveClientStopType.DISCONNECT); + for (Object listener : new ArrayList<>(listenersManager.getListeners())) { + listenersManager.removeListener(listener); + } + tikTokEventHandler.clearSubscriptions(); + listenersManager.shutdown(); + } + + private void cancelPendingReconnectLocked() { + if (pendingReconnect != null) { + pendingReconnect.cancel(false); + pendingReconnect = null; + } + } + private void setState(ConnectionState connectionState) { logger.info("TikTokLive client state: " + connectionState.name()); roomInfo.setConnectionState(connectionState); diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveEventHandler.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveEventHandler.java index 3649d43d..8434e454 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveEventHandler.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveEventHandler.java @@ -36,24 +36,29 @@ public TikTokLiveEventHandler() { events = new HashMap<>(); } - public void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent) { + public synchronized void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent) { Optional.ofNullable(events.get(TikTokEvent.class)).ifPresent(handlers -> handlers.forEach(handler -> handler.onEvent(tikTokLiveClient, tikTokEvent))); Optional.ofNullable(events.get(tikTokEvent.getClass())).ifPresent(handlers -> handlers.forEach(handler -> handler.onEvent(tikTokLiveClient, tikTokEvent))); } - public void subscribe(Class clazz, EventConsumer event) { + public synchronized void subscribe(Class clazz, EventConsumer event) { events.computeIfAbsent(clazz, e -> new HashSet<>()).add(event); } - public void unsubscribeAll(Class clazz) { + public synchronized void unsubscribeAll(Class clazz) { events.remove(clazz); } - public void unsubscribe(EventConsumer consumer) { - events.forEach((key, value) -> value.remove(consumer)); + public synchronized void unsubscribe(EventConsumer consumer) { + events.forEach((key, value) -> value.remove(consumer)); } - public void unsubscribe(Class clazz, EventConsumer consumer) { + public synchronized void unsubscribe(Class clazz, EventConsumer consumer) { Optional.ofNullable(clazz).map(events::get).ifPresent(consumers -> consumers.remove(consumer)); - } + } + + @Override + public synchronized void clearSubscriptions() { + events.clear(); + } } \ No newline at end of file diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManager.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManager.java index 9c6bbb56..bb0fa25a 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManager.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManager.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; public class TikTokListenersManager implements ListenersManager { @@ -44,6 +45,7 @@ public class TikTokListenersManager implements ListenersManager { private final LiveEventsHandler eventsHandler; private final ExecutorService executorService; private final DependanceContainer dependanceContainer; + private final AtomicBoolean shutdown = new AtomicBoolean(false); public TikTokListenersManager(LiveEventsHandler tikTokEventHandler, @@ -61,6 +63,9 @@ public List getListeners() { @Override public void addListener(Object listener) { + if (shutdown.get()) { + throw new TikTokLiveException("ListenersManager has been shut down"); + } if (listeners.containsKey(listener)) { throw new TikTokLiveException("Listener " + listener.getClass() + " has already been registered"); } @@ -84,6 +89,15 @@ public void removeListener(Object listener) { listeners.remove(listener); } + @Override + public void shutdown() { + if (!shutdown.compareAndSet(false, true)) { + return; + } + executorService.shutdownNow(); + listeners.clear(); + } + private List getMethodsInfo(Object listener) { return Arrays.stream(listener.getClass().getDeclaredMethods()) .filter(e -> e.isAnnotationPresent(TikTokEventObserver.class)) diff --git a/Client/src/test/java/io/github/jwdeveloper/tiktok/TikTokLiveClientStopTest.java b/Client/src/test/java/io/github/jwdeveloper/tiktok/TikTokLiveClientStopTest.java new file mode 100644 index 00000000..e8b5f4b2 --- /dev/null +++ b/Client/src/test/java/io/github/jwdeveloper/tiktok/TikTokLiveClientStopTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2023-2024 jwdeveloper jacekwoln@gmail.com + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package io.github.jwdeveloper.tiktok; + +import io.github.jwdeveloper.tiktok.data.requests.LiveUserData; +import io.github.jwdeveloper.tiktok.data.settings.LiveClientSettings; +import io.github.jwdeveloper.tiktok.exceptions.TikTokLiveException; +import io.github.jwdeveloper.tiktok.exceptions.TikTokLiveOfflineHostException; +import io.github.jwdeveloper.tiktok.http.LiveHttpClient; +import io.github.jwdeveloper.tiktok.listener.ListenersManager; +import io.github.jwdeveloper.tiktok.live.GiftsManager; +import io.github.jwdeveloper.tiktok.live.LiveEventsHandler; +import io.github.jwdeveloper.tiktok.live.LiveMessagesHandler; +import io.github.jwdeveloper.tiktok.websocket.LiveClientStopType; +import io.github.jwdeveloper.tiktok.websocket.LiveSocketClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.List; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TikTokLiveClientStopTest { + + @Mock + private LiveMessagesHandler messageHandler; + @Mock + private GiftsManager giftManager; + @Mock + private LiveHttpClient httpClient; + @Mock + private LiveSocketClient webSocketClient; + @Mock + private LiveEventsHandler eventHandler; + @Mock + private LiveClientSettings clientSettings; + @Mock + private ListenersManager listenersManager; + + private final Logger logger = Logger.getLogger(TikTokLiveClientStopTest.class.getName()); + private TikTokRoomInfo roomInfo; + private TikTokLiveClient client; + + @BeforeEach + void setUp() { + roomInfo = new TikTokRoomInfo(); + roomInfo.setHostName("user"); + lenient().when(clientSettings.isUseEulerstreamWebsocket()).thenReturn(false); + lenient().when(webSocketClient.isConnected()).thenReturn(false); + client = new TikTokLiveClient(messageHandler, giftManager, roomInfo, httpClient, webSocketClient, + eventHandler, clientSettings, listenersManager, logger); + } + + @Test + void stop_disconnectsClearsListenersAndShutsDownManager() { + when(webSocketClient.isConnected()).thenReturn(true); + Object l1 = new Object(); + when(listenersManager.getListeners()).thenReturn(List.of(l1)); + + client.stop(); + + verify(webSocketClient).stop(LiveClientStopType.DISCONNECT); + verify(listenersManager).removeListener(l1); + verify(eventHandler).clearSubscriptions(); + verify(listenersManager).shutdown(); + } + + @Test + void stop_secondCallIsIdempotent() { + when(webSocketClient.isConnected()).thenReturn(true); + + client.stop(); + client.stop(); + + verify(webSocketClient, times(1)).stop(LiveClientStopType.DISCONNECT); + } + + @Test + void connect_afterStop_throws() { + client.stop(); + assertThrows(TikTokLiveException.class, () -> client.connect()); + } + + @Test + void stop_cancelsScheduledReconnect() throws InterruptedException { + when(clientSettings.isRetryOnConnectionFailure()).thenReturn(true); + when(clientSettings.getRetryConnectionTimeout()).thenReturn(Duration.ofMillis(80)); + var offline = new LiveUserData.Response("{}", LiveUserData.UserStatus.Offline, roomInfo); + when(httpClient.fetchLiveUserData(isA(LiveUserData.Request.class))).thenReturn(offline); + + assertThrows(TikTokLiveOfflineHostException.class, () -> client.connect()); + client.stop(); + Thread.sleep(250); + verify(httpClient, times(1)).fetchLiveUserData(isA(LiveUserData.Request.class)); + } + + @Test + void retryRunsSecondConnect_whenNotStopped() throws InterruptedException { + when(clientSettings.isRetryOnConnectionFailure()).thenReturn(true); + when(clientSettings.getRetryConnectionTimeout()).thenReturn(Duration.ofMillis(80)); + var offline = new LiveUserData.Response("{}", LiveUserData.UserStatus.Offline, roomInfo); + when(httpClient.fetchLiveUserData(isA(LiveUserData.Request.class))).thenReturn(offline); + + assertThrows(TikTokLiveOfflineHostException.class, () -> client.connect()); + Thread.sleep(250); + verify(httpClient, atLeast(2)).fetchLiveUserData(isA(LiveUserData.Request.class)); + client.stop(); + } +} diff --git a/Client/src/test/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManagerTest.java b/Client/src/test/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManagerTest.java index 71aaa71d..1f63c18d 100644 --- a/Client/src/test/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManagerTest.java +++ b/Client/src/test/java/io/github/jwdeveloper/tiktok/listener/TikTokListenersManagerTest.java @@ -111,6 +111,17 @@ void removeListener_notRegistered_doesNotThrow() { assertDoesNotThrow(() -> tikTokListenersManager.removeListener(listener)); } + @Test + void shutdown_isIdempotent() { + tikTokListenersManager.shutdown(); + assertDoesNotThrow(() -> tikTokListenersManager.shutdown()); + } + + @Test + void addListener_afterShutdown_throws() { + tikTokListenersManager.shutdown(); + assertThrows(TikTokLiveException.class, () -> tikTokListenersManager.addListener(new TikTokEventListenerTest())); + } public static class TikTokEventListenerTest { @TikTokEventObserver From 5d57e1d719f0657f9be99a183bc8c1a18fe10a6e Mon Sep 17 00:00:00 2001 From: "Dustin H." Date: Wed, 1 Apr 2026 17:21:37 +0200 Subject: [PATCH 2/2] Update version in pom.xml --- API/pom.xml | 2 +- Client/pom.xml | 2 +- examples/pom.xml | 2 +- extension-collector/pom.xml | 2 +- extension-recorder/pom.xml | 2 +- pom.xml | 2 +- tools-readme/pom.xml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/API/pom.xml b/API/pom.xml index 3b344976..2e88ee62 100644 --- a/API/pom.xml +++ b/API/pom.xml @@ -5,7 +5,7 @@ TikTokLiveJava io.github.jwdeveloper.tiktok - 1.11.11-Release + 1.11.12-Release 4.0.0 API diff --git a/Client/pom.xml b/Client/pom.xml index 69f9cb1e..5da5967e 100644 --- a/Client/pom.xml +++ b/Client/pom.xml @@ -5,7 +5,7 @@ TikTokLiveJava io.github.jwdeveloper.tiktok - 1.11.11-Release + 1.11.12-Release 4.0.0 diff --git a/examples/pom.xml b/examples/pom.xml index 47502d00..2a01efdc 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -41,7 +41,7 @@ TikTokLiveJava io.github.jwdeveloper.tiktok - 1.11.11-Release + 1.11.12-Release 4.0.0 diff --git a/extension-collector/pom.xml b/extension-collector/pom.xml index 8ee1b0bf..62eb3ec3 100644 --- a/extension-collector/pom.xml +++ b/extension-collector/pom.xml @@ -6,7 +6,7 @@ io.github.jwdeveloper.tiktok TikTokLiveJava - 1.11.11-Release + 1.11.12-Release diff --git a/extension-recorder/pom.xml b/extension-recorder/pom.xml index 29b0801b..cbe3fc9a 100644 --- a/extension-recorder/pom.xml +++ b/extension-recorder/pom.xml @@ -5,7 +5,7 @@ TikTokLiveJava io.github.jwdeveloper.tiktok - 1.11.11-Release + 1.11.12-Release 4.0.0 extension-recorder diff --git a/pom.xml b/pom.xml index 06d4391e..2ec8d285 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.github.jwdeveloper.tiktok TikTokLiveJava pom - 1.11.11-Release + 1.11.12-Release API Client diff --git a/tools-readme/pom.xml b/tools-readme/pom.xml index 6379e6cb..999b6d99 100644 --- a/tools-readme/pom.xml +++ b/tools-readme/pom.xml @@ -5,7 +5,7 @@ TikTokLiveJava io.github.jwdeveloper.tiktok - 1.11.11-Release + 1.11.12-Release 4.0.0