Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public Message getMessage() {

private boolean reencoded = false;

private int applicationPropertiesSize;

/**
* AMQPLargeMessagePersister will save the buffer here.
*/
Expand Down Expand Up @@ -264,7 +266,9 @@ protected void readSavedEncoding(ByteBuf buf) {
applicationPropertiesPosition = buf.readInt();
remainingBodyPosition = buf.readInt();

int applicationPropertiesInitialPosition = buf.readerIndex();
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;

if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
if (!expirationReload) {
Expand Down Expand Up @@ -412,6 +416,16 @@ private void genericParseLargeMessage() {
}
}

@Override
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
applicationProperties = super.readApplicationProperties(data, position);
if (applicationProperties != null) {
this.applicationPropertiesSize = data.position() - position;
}
return applicationProperties;
}


protected void parseLargeMessage(ReadableBuffer data) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
Expand Down Expand Up @@ -604,8 +618,7 @@ public long getWholeMessageSize() {
@Override
public synchronized int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
originalEstimate = memoryEstimate;
memoryEstimate = AMQP_OFFSET + memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 4;
}
return memoryEstimate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -119,6 +118,10 @@
*/
public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message {

// how much an AMQP Message takes more in the memory, beyond the Message.offset.
// this is an estimate, and it's based on testing
public static final int AMQP_OFFSET = 1024;

private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.of("m.");

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand Down Expand Up @@ -146,7 +149,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
* developing purposes.
*/
public enum MessageDataScanningStatus {
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
NOT_SCANNED(0), SCANNED(1);

private static final MessageDataScanningStatus[] STATES;

Expand Down Expand Up @@ -205,7 +208,6 @@ private static void checkCode(int code) {
protected long messageID;
protected SimpleString address;
protected volatile int memoryEstimate = -1;
protected volatile int originalEstimate = -1;
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
Expand Down Expand Up @@ -546,36 +548,27 @@ protected ApplicationProperties lazyDecodeApplicationProperties() {
// need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate
protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);

// it is difficult to track the updates for paged messages
// for that reason we won't do it if paged
// we also only do the update if the message was previously routed
// so if a debug method or an interceptor changed the size before routing we would get a different size
if (!isPaged && routed) {
((PagingStore) owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}
}
readApplicationProperties(data, applicationPropertiesPosition);
}

return applicationProperties;
}

protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
applicationProperties = scanForMessageSection(data, position, ApplicationProperties.class);
return applicationProperties;
}

protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
if (applicationProperties != null) {
// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
// no need to rescan if it's from RELOAD_PERSISTENCE
ensureScanning();

// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
return 0;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -661,9 +654,6 @@ protected synchronized void ensureMessageDataScanned() {
case NOT_SCANNED:
scanMessageData();
break;
case RELOAD_PERSISTENCE:
lazyScanAfterReloadPersistence();
break;
case SCANNED:
// NO-OP
break;
Expand All @@ -686,7 +676,6 @@ protected synchronized void resetMessageData() {
priority = DEFAULT_MESSAGE_PRIORITY;
encodedHeaderSize = 0;
memoryEstimate = -1;
originalEstimate = -1;
scheduledTime = -1;
encodedDeliveryAnnotationsSize = 0;
headerPosition = VALUE_NOT_PRESENT;
Expand Down Expand Up @@ -885,12 +874,8 @@ public final void receiveBuffer(ByteBuf buffer) {

@Override
public int getOriginalEstimate() {
if (originalEstimate < 0) {
// getMemoryEstimate should initialize originalEstimate
return getMemoryEstimate();
} else {
return originalEstimate;
}
// getMemoryEstimate should initialize originalEstimate
return getMemoryEstimate();
}

@Override
Expand Down Expand Up @@ -1033,13 +1018,9 @@ protected int internalPersistSize() {
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);

protected synchronized void lazyScanAfterReloadPersistence() {
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
scanMessageData();
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
modified = false;
// reinitialise memory estimate as message will already be on a queue
// and lazy decode will want to update
getMemoryEstimate();
}

@Override
Expand Down Expand Up @@ -1223,9 +1204,8 @@ public boolean isDurable() {
if (header != null && header .getDurable() != null) {
return header.getDurable();
} else {
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
// even though the parsing hasn't happened yet
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
// we will assume it's non persistent if no header
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,10 @@ public synchronized int getMemoryEstimate() {
if (isPaged) {
// When the message is paged, we don't take the unmarshalled application properties because it could be
// updated at different places. We just keep the estimate simple when paging.
memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
memoryEstimate = AMQPMessage.AMQP_OFFSET + memoryOffset + (data != null ? data.capacity() : 0);
} else {
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
memoryEstimate = AMQP_OFFSET + memoryEstimate + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) * 4 : 0);
}
originalEstimate = memoryEstimate;
}

return memoryEstimate;
Expand Down Expand Up @@ -237,10 +236,10 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool

// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
resetMessageData();
recoverHeaderDataFromEncoding();
scanMessageData(data);

modified = false;
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
}

private void recoverHeaderDataFromEncoding() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
// Now reload from encoded data
message.reloadPersistence(encoded, null);

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

message.getHeader();

Expand Down Expand Up @@ -249,11 +249,11 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
// Now reload from encoded data
message.reloadPersistence(encoded, null);

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertTrue(message.hasScheduledDeliveryTime());

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

message.getHeader();

Expand All @@ -279,11 +279,11 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
// Now reload from encoded data
message.reloadPersistence(encoded, null);

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

assertFalse(message.hasScheduledDeliveryTime());

assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());

message.getHeader();

Expand Down Expand Up @@ -331,12 +331,7 @@ private void testGetMemoryEstimateWithDecodedApplicationProperties(boolean paged
}

assertEquals(TEST_APPLICATION_PROPERTY_VALUE, decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY));

if (paged) {
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
} else {
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
}
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
}

//----- Test Connection ID access -----------------------------------------//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,14 @@ protected void sendMessages(String destinationName,
}

protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
sendMessages(destinationName, count, durable, null);
sendMessages(destinationName, count, durable, null, null);
}

protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
sendMessages(destinationName, count, durable, payload, null);
}

protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload, Map<String, Object> properties) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
Expand All @@ -320,6 +324,9 @@ protected void sendMessages(String destinationName, int count, boolean durable,
if (payload != null) {
message.setBytes(payload);
}
if (properties != null) {
properties.forEach((a, b) -> message.setApplicationProperty(a, b));
}
sender.send(message);
}
} finally {
Expand Down
Loading
Loading