diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 85206672a17..46d0745b33c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -483,6 +483,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH); allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH); + allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 6c81cd83ee4..04dc87e104c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.FederationConfiguration; @@ -1347,4 +1348,38 @@ public void testParsingDiskFullPolicy() throws Exception { AddressSettings settings = configuration.getAddressSettings().get("foo"); assertEquals(DiskFullMessagePolicy.FAIL, settings.getDiskFullMessagePolicy()); } + + @Test + public void testWebSocketCompressionOptionInAcceptorURI() throws Exception { + final String FIRST_PART = """ + + ActiveMQ.main.config + org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory + ${jboss.server.data.dir}/activemq/bindings + ${jboss.server.data.dir}/activemq/journal + 10 + ${jboss.server.data.dir}/activemq/largemessages + ${jboss.server.data.dir}/activemq/paging + + tcp://0.0.0.0:2994?protocols=MQTT;webSocketCompressionSupported=true + + """; + + String configStr = FIRST_PART + LAST_PART; + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + Configuration configuration = parser.parseMainConfig(input); + + Set acceptors = configuration.getAcceptorConfigurations(); + + assertEquals(1, acceptors.size()); + + TransportConfiguration acceptor = acceptors.iterator().next(); + + assertNotNull(acceptor); + + assertTrue(acceptor.getParams().containsKey("webSocketCompressionSupported")); + assertFalse(acceptor.getExtraParams().containsKey("webSocketCompressionSupported")); + } } diff --git a/docs/user-manual/amqp.adoc b/docs/user-manual/amqp.adoc index 55522f7a7e6..31032c2347f 100644 --- a/docs/user-manual/amqp.adoc +++ b/docs/user-manual/amqp.adoc @@ -207,3 +207,5 @@ AMQP over WebSockets is supported via a normal AMQP acceptor: With this configuration, the broker will accept AMQP connections over WebSockets on the port `5672`. Web browsers can then connect to `ws://:5672` using a Web Socket to send and receive AMQP messages. +WebSockets per-message deflate is supported but not enabled by default, to enable it add the `webSocketCompressionSupported=true` option to the acceptor URI. +The client must also support per-message deflate and request it via the standard WebSockets extension. diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc index 1a08c04f598..3e48ddafe68 100644 --- a/docs/user-manual/mqtt.adoc +++ b/docs/user-manual/mqtt.adoc @@ -189,6 +189,8 @@ SSL/TLS is also available, e.g.: ---- Web browsers can then connect to `wss://:8883` using a Web Socket to send and receive MQTT messages. +WebSockets per-message deflate is supported but not enabled by default, to enable it add the `webSocketCompressionSupported=true` option to the acceptor URI. +The client must also support per-message deflate and request it via the standard WebSockets extension. == Link Stealing diff --git a/docs/user-manual/stomp.adoc b/docs/user-manual/stomp.adoc index 8d59642ecb9..6c2878ae777 100644 --- a/docs/user-manual/stomp.adoc +++ b/docs/user-manual/stomp.adoc @@ -264,6 +264,8 @@ STOMP over WebSockets is supported via the normal STOMP acceptor: With this configuration, the broker will accept STOMP connections over WebSockets on the port `61614`. Web browsers can then connect to `ws://:61614` using a Web Socket to send and receive STOMP messages. +WebSockets per-message deflate is supported but not enabled by default, to enable it add the `webSocketCompressionSupported=true` option to the acceptor URI. +The client must also support per-message deflate and request it via the standard WebSockets extension. A companion JavaScript library to ease client-side development is available from https://github.com/jmesnil/stomp-websocket[GitHub] (please see its http://jmesnil.net/stomp-websocket/doc/[documentation] for a complete description). diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java new file mode 100644 index 00000000000..d7af2b3fb5d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URL; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.tests.integration.jms.RedeployTest; +import org.apache.qpid.protonj2.test.driver.ProtonTestClient; +import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Test connections via Web Sockets + */ +public class AmqpWebSocketCompressionConfigTest extends AmqpClientTestSupport { + + private static final int SERVER_PORT = 5678; + + final URL urlServerWSCoff = RedeployTest.class.getClassLoader().getResource("ws-compression-disabled.xml"); + final URL urlServerWSCon = RedeployTest.class.getClassLoader().getResource("ws-compression-enabled.xml"); + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Test + @Timeout(20) + public void testClientConnectsWithWebSocketCompressionOn() throws Exception { + final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + + try { + embeddedActiveMQ.setConfigResourcePath(urlServerWSCon.toURI().toString()); + embeddedActiveMQ.start(); + + testClientConnectsWithWebSockets(true); + } finally { + embeddedActiveMQ.stop(); + } + } + + @Test + @Timeout(20) + public void testClientConnectsWithWebSocketCompressionOff() throws Exception { + final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + + try { + embeddedActiveMQ.setConfigResourcePath(urlServerWSCoff.toURI().toString()); + embeddedActiveMQ.start(); + + testClientConnectsWithWebSockets(false); + } finally { + embeddedActiveMQ.stop(); + } + } + + private void testClientConnectsWithWebSockets(boolean serverCompressionOn) throws Exception { + final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions(); + + clientOpts.setUseWebSockets(true); + clientOpts.setWebSocketCompression(true); + + try (ProtonTestClient client = new ProtonTestClient(clientOpts)) { + client.queueClientSaslAnonymousConnect(); + client.remoteOpen().queue(); + client.expectOpen(); + client.remoteBegin().queue(); + client.expectBegin(); + client.connect("localhost", SERVER_PORT); + + client.waitForScriptToComplete(5, TimeUnit.MINUTES); + + if (serverCompressionOn) { + assertTrue(client.isWSCompressionActive()); + } else { + assertFalse(client.isWSCompressionActive()); + } + + client.expectAttach().ofSender(); + client.expectAttach().ofReceiver(); + client.expectFlow(); + + // Attach a sender and receiver + client.remoteAttach().ofReceiver() + .withName("ws-compression-test") + .withSource().withAddress(getQueueName()) + .withCapabilities("queue").also() + .withTarget().and() + .now(); + client.remoteFlow().withLinkCredit(10).now(); + client.remoteAttach().ofSender() + .withInitialDeliveryCount(0) + .withName("ws-compression-test") + .withTarget().withAddress(getQueueName()) + .withCapabilities("queue").also() + .withSource().and() + .now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "test-data:" + "A".repeat(1000); + + // Broker sends message to subscription and acknowledges to sender + client.expectTransfer().withMessage().withValue(payload); + client.expectDisposition().withSettled(true).withState().accepted(); + + // Client sends message to queue with subscription + client.remoteTransfer().withDeliveryId(0) + .withBody().withValue(payload).also() + .now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + client.expectClose(); + client.remoteClose().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + client.close(); + } + } +} diff --git a/tests/integration-tests/src/test/resources/ws-compression-disabled.xml b/tests/integration-tests/src/test/resources/ws-compression-disabled.xml new file mode 100644 index 00000000000..69956912b01 --- /dev/null +++ b/tests/integration-tests/src/test/resources/ws-compression-disabled.xml @@ -0,0 +1,135 @@ + + + + + + + + 0.0.0.0 + + false + + NIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + true + + 2 + + 10 + + 4096 + + 10M + + 40000 + + 1 + + 5000 + + 90 + + false + + 120000 + + 60000 + + HALT + + false + + 40000 + + + tcp://0.0.0.0:5678?;protocols=AMQP;webSocketCompressionSupported=false + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + -1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + +
+
+
+
diff --git a/tests/integration-tests/src/test/resources/ws-compression-enabled.xml b/tests/integration-tests/src/test/resources/ws-compression-enabled.xml new file mode 100644 index 00000000000..21f0c9058ac --- /dev/null +++ b/tests/integration-tests/src/test/resources/ws-compression-enabled.xml @@ -0,0 +1,135 @@ + + + + + + + + 0.0.0.0 + + false + + NIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + true + + 2 + + 10 + + 4096 + + 10M + + 40000 + + 1 + + 5000 + + 90 + + false + + 120000 + + 60000 + + HALT + + false + + 40000 + + + tcp://0.0.0.0:5678?;protocols=AMQP;webSocketCompressionSupported=true + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + -1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + +
+
+
+