Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public class CompositeTypeConstructorGenerator extends AbstractProcessor
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability",
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy",
"org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability");

private static final List<String> SINGLETONS = List.of("Accepted");

@Override
Expand All @@ -99,6 +98,8 @@ public boolean process(final Set<? extends TypeElement> annotations, final Round
return true;
}

SymbolMapper.initialize(processingEnv);

Filer filer = processingEnv.getFiler();
try
{
Expand Down Expand Up @@ -136,6 +137,8 @@ private void generateCompositeTypeConstructor(final Filer filer, final TypeEleme
final String compositeTypeConstructorPackage = packageElement.getQualifiedName() + ".codec";
String compositeTypeConstructorName = compositeTypeConstructorPackage + "." + compositeTypeConstructorNameSimpleName;
final CompositeType annotation = typeElement.getAnnotation(CompositeType.class);
final String symbol = SymbolMapper.getConstantName(annotation.symbolicDescriptor());
final boolean isConstant = symbol.startsWith("Symbols.");

processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Generating composite constructor file for " + objectQualifiedClassName);

Expand All @@ -159,8 +162,15 @@ private void generateCompositeTypeConstructor(final Filer filer, final TypeEleme
pw.println();
pw.println("import org.apache.qpid.server.protocol.v1_0.codec.AbstractCompositeTypeConstructor;");
pw.println("import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;");
if (isConstant)
{
pw.println("import org.apache.qpid.server.protocol.v1_0.constants.Symbols;");
}
pw.println("import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;");
pw.println("import org.apache.qpid.server.protocol.v1_0.type.Symbol;");
if (!isConstant)
{
pw.println("import org.apache.qpid.server.protocol.v1_0.type.Symbol;");
}
pw.println("import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;");
pw.println("import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;");
pw.println("import org.apache.qpid.server.protocol.v1_0.type.transport.Error;");
Expand All @@ -174,7 +184,7 @@ private void generateCompositeTypeConstructor(final Filer filer, final TypeEleme

pw.println(" public static void register(DescribedTypeConstructorRegistry registry)");
pw.println(" {");
pw.println(" registry.register(Symbol.valueOf(\"" + annotation.symbolicDescriptor() + "\"), INSTANCE);");
pw.println(" registry.register(%s, INSTANCE);".formatted(symbol));
pw.println(String.format(" registry.register(UnsignedLong.valueOf(%#016x), INSTANCE);", annotation.numericDescriptor()));
pw.println(" }");
pw.println();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.qpid.server.protocol.v1_0;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.processing.ProcessingEnvironment;
import javax.lang.model.element.Element;
import javax.lang.model.element.ElementKind;
import javax.lang.model.element.Modifier;
import javax.lang.model.element.TypeElement;
import javax.lang.model.element.VariableElement;
import javax.lang.model.type.DeclaredType;
import javax.lang.model.type.TypeMirror;

public final class SymbolMapper
{
private SymbolMapper()
{

}

private static final String SYMBOL = "org.apache.qpid.server.protocol.v1_0.type.Symbol";
private static final String SYMBOLS = "org.apache.qpid.server.protocol.v1_0.constants.Symbols";
private static final String SYMBOL_TEXTS = "org.apache.qpid.server.protocol.v1_0.constants.SymbolTexts";
private static final Map<String, String> NAME_MAPPING = new HashMap<>();

static void initialize(final ProcessingEnvironment processingEnv)
{
if (!NAME_MAPPING.isEmpty())
{
return;
}

final List<VariableElement> symbolTextFields = getAllFields(processingEnv, SYMBOL_TEXTS);
final List<VariableElement> symbolFields = getAllFields(processingEnv, SYMBOLS);
final Map<String, VariableElement> symbolTextFieldsByName = new HashMap<>();

for (final VariableElement field : symbolTextFields)
{
final String name = field.getSimpleName().toString();
final VariableElement previous = symbolTextFieldsByName.putIfAbsent(name, field);
if (previous != null)
{
throw new RuntimeException("Duplicate field name " + name + " found in SymbolTexts");
}
}

for (final VariableElement field : symbolFields)
{
if (field.getModifiers().contains(Modifier.STATIC) && SYMBOL.equals(field.asType().toString()))
{
final String name = field.getSimpleName().toString();
final VariableElement symbolTextField = symbolTextFieldsByName.get(name);
if (symbolTextField != null && symbolTextField.getConstantValue() != null)
{
NAME_MAPPING.put(String.valueOf(symbolTextField.getConstantValue()), "Symbols.%s".formatted(name));
}
else
{
throw new RuntimeException("Field name " + name + " exists in Symbols but not in SymbolTexts. " +
"All Symbols field names must have a matching field name in SymbolTexts");
}

}
}
}

static String getConstantName(String symbolText)
{
return NAME_MAPPING.getOrDefault(symbolText, "Symbol.valueOf(\"%s\")".formatted(symbolText));
}

private static List<VariableElement> getAllFields(final ProcessingEnvironment processingEnv, final String typeName)
{
final TypeElement typeElement = processingEnv.getElementUtils().getTypeElement(typeName);
if (typeElement == null)
{
throw new RuntimeException("Unable to resolve type " + typeName + " during SymbolMapper initialization");
}

final List<VariableElement> fields = new ArrayList<>();
collectFields(typeElement, fields, new HashSet<>());
return fields;
}

private static void collectFields(final TypeElement typeElement,
final List<VariableElement> fields,
final Set<String> visited)
{
final String typeName = typeElement.getQualifiedName().toString();
if (!visited.add(typeName))
{
return;
}

for (final Element element : typeElement.getEnclosedElements())
{
if (element.getKind() == ElementKind.FIELD)
{
fields.add((VariableElement) element);
}
}

for (final TypeMirror iface : typeElement.getInterfaces())
{
if (iface instanceof DeclaredType)
{
final Element element = ((DeclaredType) iface).asElement();
if (element instanceof TypeElement)
{
collectFields((TypeElement) element, fields, visited);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
Expand All @@ -51,9 +50,6 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
@ManagedContextDefault(name = SEND_SASL_FINAL_CHALLENGE_AS_CHALLENGE)
boolean DEFAULT_SEND_SASL_FINAL_CHALLENGE_AS_CHALLENGE = false;

Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");

@DerivedAttribute(description = "The idle timeout (in milliseconds) for incoming traffic.")
long getIncomingIdleTimeout();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;

import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;

import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
Expand Down Expand Up @@ -74,6 +72,7 @@
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.apache.qpid.server.protocol.v1_0.constants.Symbols;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
Expand All @@ -87,7 +86,6 @@
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException;
Expand Down Expand Up @@ -226,9 +224,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
_subjectCreator = port.getSubjectCreator(transport.isSecure(), network.getSelectedHost());

List<Symbol> offeredCapabilities = new ArrayList<>();
offeredCapabilities.add(ANONYMOUS_RELAY);
offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
offeredCapabilities.add(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER);
offeredCapabilities.add(Symbols.ANONYMOUS_RELAY);
offeredCapabilities.add(Symbols.SHARED_SUBSCRIPTIONS);
offeredCapabilities.add(Symbols.SOLE_CONNECTION_FOR_CONTAINER);

setOfferedCapabilities(offeredCapabilities);

Expand Down Expand Up @@ -833,23 +831,23 @@ public void receiveOpen(final int channel, final Open open)
: Collections.unmodifiableMap(new LinkedHashMap<>(open.getProperties()));
_remoteDesiredCapabilities = open.getDesiredCapabilities() == null ? Set.of() : Stream.of(open.getDesiredCapabilities())
.collect(Collectors.toSet());
if (remoteProperties.containsKey(Symbol.valueOf("product")))
if (remoteProperties.containsKey(Symbols.PRODUCT))
{
setClientProduct(remoteProperties.get(Symbol.valueOf("product")).toString());
setClientProduct(remoteProperties.get(Symbols.PRODUCT).toString());
}
if (remoteProperties.containsKey(Symbol.valueOf("version")))
if (remoteProperties.containsKey(Symbols.VERSION))
{
setClientVersion(remoteProperties.get(Symbol.valueOf("version")).toString());
setClientVersion(remoteProperties.get(Symbols.VERSION).toString());
}
setClientId(_remoteContainerId);
if (_remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
if (_remoteDesiredCapabilities.contains(Symbols.SOLE_CONNECTION_FOR_CONTAINER))
{
if (remoteProperties != null && remoteProperties.containsKey(SOLE_CONNECTION_ENFORCEMENT_POLICY))
if (remoteProperties != null && remoteProperties.containsKey(Symbols.SOLE_CONNECTION_ENFORCEMENT_POLICY))
{
try
{
_soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.valueOf(remoteProperties.get(
SOLE_CONNECTION_ENFORCEMENT_POLICY));
Symbols.SOLE_CONNECTION_ENFORCEMENT_POLICY));
}
catch (IllegalArgumentException e)
{
Expand Down Expand Up @@ -955,10 +953,10 @@ private void handleSoleConnectionEnforcement(final NamedAddressSpace addressSpac
if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION)
{
LOGGER.debug("Closing newly open connection: {}", e.getMessage());
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
_properties.put(Symbols.AMQP_CONN_ESTABLISHMENT_FAILED, true);
final Error error = new Error(AmqpError.INVALID_FIELD,
String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy()));
error.setInfo(Map.of(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
error.setInfo(Map.of(Symbols.INVALID_FIELD, Symbols.CONTAINER_ID));
closeConnection(error);
getEventLogger().message(ResourceLimitMessages.REJECTED(
"Opening", "connection", String.format("container '%s'", e.getContainerID()), e.getMessage()));
Expand All @@ -967,7 +965,7 @@ else if (e.getPolicy() == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
{
final Error error = new Error(AmqpError.RESOURCE_LOCKED,
String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy()));
error.setInfo(Map.of(Symbol.valueOf("sole-connection-enforcement"), true));
error.setInfo(Map.of(Symbols.SOLE_CONNECTION_ENFORCEMENT, true));

final EventLogger logger = getEventLogger();
final List<CompletableFuture<Void>> rescheduleFutures = new ArrayList<>();
Expand Down Expand Up @@ -1036,10 +1034,10 @@ private void populateConnectionRedirect(final NamedAddressSpace addressSpace, fi
}
}
final Map<Symbol, Object> infoMap = new HashMap<>();
infoMap.put(Symbol.valueOf("network-host"), networkHost);
infoMap.put(Symbols.NETWORK_HOST, networkHost);
if(port > 0)
{
infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
infoMap.put(Symbols.PORT, UnsignedInteger.valueOf(port));
}
err.setInfo(infoMap);
}
Expand Down Expand Up @@ -1711,15 +1709,15 @@ private void sendOpen(final int channelMax, final int maxFrameSize)
}

if (_remoteDesiredCapabilities != null
&& _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
&& _remoteDesiredCapabilities.contains(Symbols.SOLE_CONNECTION_FOR_CONTAINER))
{
_properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
_properties.put(Symbols.SOLE_CONNECTION_DETECTION_POLICY,
SoleConnectionDetectionPolicy.STRONG);
}

if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
{
_properties.put(SOLE_CONNECTION_ENFORCEMENT_POLICY, SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
_properties.put(Symbols.SOLE_CONNECTION_ENFORCEMENT_POLICY, SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
}

open.setProperties(_properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.qpid.server.protocol.v1_0;/*
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -19,7 +19,7 @@
*
*/

import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
package org.apache.qpid.server.protocol.v1_0;

import java.util.Arrays;

Expand All @@ -29,6 +29,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.DestinationAddress;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.constants.Symbols;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
Expand All @@ -37,22 +38,22 @@

public class AnonymousRelayDestination implements ReceivingDestination
{
private static final Symbol[] CAPABILITIES = { DELAYED_DELIVERY };
private static final Symbol[] CAPABILITIES = { Symbols.DELAYED_DELIVERY };

private final Target _target;
private final NamedAddressSpace _addressSpace;
private final EventLogger _eventLogger;
private final boolean _discardUnroutable;

AnonymousRelayDestination(final NamedAddressSpace addressSpace,
final Target target,
final EventLogger eventLogger)
final Target target,
final EventLogger eventLogger)
{
_addressSpace = addressSpace;
_target = target;
_eventLogger = eventLogger;
_discardUnroutable = target.getCapabilities() != null && Arrays.asList(target.getCapabilities())
.contains(DISCARD_UNROUTABLE);
.contains(Symbols.DISCARD_UNROUTABLE);
}

/**
Expand Down
Loading