From e1e3901e2432e262b4c04ae61707c079833409ae Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Wed, 25 Mar 2026 16:36:19 +0530 Subject: [PATCH 1/2] logging for auth errors in kafka client binding --- .../internal/events/KafkaEventContext.java | 54 +++++++++++++++++ .../internal/events/KafkaEventFormatter.java | 15 +++++ .../stream/KafkaClientDescribeFactory.java | 2 +- .../stream/KafkaClientFetchFactory.java | 9 +-- .../stream/KafkaClientGroupFactory.java | 18 +++--- .../stream/KafkaClientMetaFactory.java | 9 +-- .../KafkaClientOffsetCommitFactory.java | 2 +- .../stream/KafkaClientOffsetFetchFactory.java | 2 +- .../stream/KafkaClientProduceFactory.java | 7 ++- .../stream/KafkaClientSaslHandshaker.java | 13 ++++- .../kafka/internal/stream/ClientMetaIT.java | 10 ++++ .../internal/stream/ClientMetaSaslIT.java | 10 ++++ .../main/resources/META-INF/zilla/kafka.idl | 17 +++++- ...ient.event.sasl.authentication.failed.yaml | 49 ++++++++++++++++ ...ient.event.topic.authorization.failed.yaml | 41 +++++++++++++ .../sasl.authentication.failed/client.rpt | 28 +++++++++ .../sasl.authentication.failed/server.rpt | 23 ++++++++ .../topic.authorization.failed/client.rpt | 28 +++++++++ .../topic.authorization.failed/server.rpt | 23 ++++++++ .../sasl.authentication.failed/client.rpt | 55 ++++++++++++++++++ .../sasl.authentication.failed/server.rpt | 55 ++++++++++++++++++ .../topic.authorization.failed/client.rpt | 58 +++++++++++++++++++ .../topic.authorization.failed/server.rpt | 55 ++++++++++++++++++ 23 files changed, 558 insertions(+), 25 deletions(-) create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt create mode 100644 specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java index 1f1ddf954d..9ff99d2185 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java @@ -18,6 +18,8 @@ import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED; import java.nio.ByteBuffer; import java.time.Clock; @@ -43,6 +45,8 @@ public class KafkaEventContext private final int authorizationFailedEventId; private final int apiVersionRejectedEventId; private final int clusterAuthorizationFailedEventId; + private final int topicAuthorizationFailedEventId; + private final int saslAuthenticationFailedEventId; private final MessageConsumer eventWriter; private final Clock clock; @@ -53,6 +57,8 @@ public KafkaEventContext( this.authorizationFailedEventId = context.supplyEventId("binding.kafka.authorization.failed"); this.apiVersionRejectedEventId = context.supplyEventId("binding.kafka.api.version.rejected"); this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed"); + this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed"); + this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed"); this.eventWriter = context.supplyEventWriter(); this.clock = context.clock(); } @@ -129,4 +135,52 @@ public void clusterAuthorizationFailed( .build(); eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); } + + public void topicAuthorizationFailed( + long traceId, + long bindingId, + String topic) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .topicAuthorizationFailed(e -> e + .typeId(TOPIC_AUTHORIZATION_FAILED.value()) + .topic(topic) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(topicAuthorizationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void saslAuthenticationFailed( + long traceId, + long bindingId, + String identity, + String error) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .saslAuthenticationFailed(e -> e + .typeId(SASL_AUTHENTICATION_FAILED.value()) + .identity(identity) + .error(error) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(saslAuthenticationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java index bdc75a3d29..cb9c0801f2 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java @@ -23,6 +23,8 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; @@ -67,6 +69,19 @@ public String format( result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion()); break; } + case TOPIC_AUTHORIZATION_FAILED: + { + final KafkaTopicAuthorizationFailedExFW ex = extension.topicAuthorizationFailed(); + result = String.format("Topic authorization failed for topic (%s).", asString(ex.topic())); + break; + } + case SASL_AUTHENTICATION_FAILED: + { + final KafkaSaslAuthenticationFailedExFW ex = extension.saslAuthenticationFailed(); + result = String.format("SASL authentication failed for identity (%s): %s", + asString(ex.identity()), asString(ex.error())); + break; + } } return result; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java index d9c6ff6a85..4fc3979a4c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java @@ -1028,7 +1028,7 @@ private void onDecodeResponseErrorCode( int errorCode) { super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, - errorCode); + errorCode, null); } private void onNetwork( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java index 5c869d0c55..cb8b00fd60 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java @@ -2951,7 +2951,7 @@ private void onDecodeOffsetsPartition( this.nextOffset = partitionOffset; break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); cleanupApplication(traceId, errorCode); doNetworkEnd(traceId, authorization); break; @@ -2993,7 +2993,7 @@ private void onDecodeFetchPartition( } else { - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); } cleanupApplication(traceId, errorCode); @@ -3005,9 +3005,10 @@ private void onDecodeFetchPartition( private void onDecodeResponseErrorCode( long traceId, long originId, - int errorCode) + int errorCode, + String topic) { - super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode, topic); } private void onDecodeFetchTransactionAbort( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java index d04640db55..6b0c1dee08 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java @@ -945,7 +945,7 @@ private int decodeFindCoordinatorResponse( break; default: client.onDecodeResponseErrorCode(traceId, client.originId, FIND_COORDINATOR_API_KEY, - FIND_COORDINATOR_API_VERSION, errorCode); + FIND_COORDINATOR_API_VERSION, errorCode, null); client.errorCode = errorCode; client.decoder = decodeClusterReject; break; @@ -1095,7 +1095,8 @@ private int decodeJoinGroupResponse( joinGroupResponse.memberId().asString()); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, JOIN_GROUP_API_KEY, JOIN_GROUP_VERSION, errorCode); + client.onDecodeResponseErrorCode(traceId, client.originId, JOIN_GROUP_API_KEY, JOIN_GROUP_VERSION, errorCode, + null); client.errorCode = errorCode; client.decoder = decodeJoinGroupReject; break; @@ -1155,7 +1156,8 @@ private int decodeSyncGroupResponse( client.onSyncGroupResponse(traceId, authorization, syncGroupResponse.assignment()); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, SYNC_GROUP_API_KEY, SYNC_GROUP_VERSION, errorCode); + client.onDecodeResponseErrorCode(traceId, client.originId, SYNC_GROUP_API_KEY, SYNC_GROUP_VERSION, errorCode, + null); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; break; @@ -1218,7 +1220,8 @@ private int decodeHeartbeatResponse( client.onHeartbeatResponse(traceId, authorization); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, HEARTBEAT_API_KEY, HEARTBEAT_VERSION, errorCode); + client.onDecodeResponseErrorCode(traceId, client.originId, HEARTBEAT_API_KEY, HEARTBEAT_VERSION, errorCode, + null); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; break; @@ -1285,7 +1288,7 @@ private int decodeLeaveGroupResponse( else { client.onDecodeResponseErrorCode(traceId, client.originId, LEAVE_GROUP_API_KEY, - LEAVE_GROUP_VERSION, errorCode); + LEAVE_GROUP_VERSION, errorCode, null); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; } @@ -1301,7 +1304,7 @@ private int decodeLeaveGroupResponse( else { client.onDecodeResponseErrorCode(traceId, client.originId, LEAVE_GROUP_API_KEY, LEAVE_GROUP_VERSION, - errorCode); + errorCode, null); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; } @@ -2679,7 +2682,8 @@ private void onDecodeResponseErrorCode( long originId, int errorCode) { - super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, errorCode, + null); } private void onNetwork( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java index 6594978abd..eb6ed07e63 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java @@ -1813,7 +1813,7 @@ private void onDecodeTopic( newPartitions.clear(); break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) .error(errorCode) @@ -1836,16 +1836,17 @@ private void onDecodePartition( } else { - onDecodeResponseErrorCode(traceId, originId, partitionError); + onDecodeResponseErrorCode(traceId, originId, partitionError, null); } } private void onDecodeResponseErrorCode( long traceId, long originId, - int errorCode) + int errorCode, + String topic) { - super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode, topic); } @Override diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java index 94d987adbe..707c49b051 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java @@ -593,7 +593,7 @@ private int decodeOffsetCommitPartition( else { client.onDecodeResponseErrorCode(traceId, client.originId, OFFSET_COMMIT_API_KEY, OFFSET_COMMIT_API_VERSION, - errorCode); + errorCode, null); client.errorCode = errorCode; client.decoder = decodeReject; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java index 5266347e99..f2f3ef0980 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java @@ -666,7 +666,7 @@ private int decodeOffsetFetchPartition( break; default: client.onDecodeResponseErrorCode(traceId, client.originId, OFFSET_FETCH_API_KEY, OFFSET_FETCH_API_VERSION, - errorCode); + errorCode, null); client.errorCode = errorCode; client.decoder = decodeReject; break; diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index b03526d08c..26f1a49ba4 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -2261,7 +2261,7 @@ private void onDecodeProducePartition( assert partitionId == this.partitionId; break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) .error(errorCode) @@ -2275,9 +2275,10 @@ private void onDecodeProducePartition( private void onDecodeResponseErrorCode( long traceId, long originId, - int errorCode) + int errorCode, + String topic) { - super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode, topic); } @Override diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java index f1e5f8ed90..19f3c6fa3c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java @@ -67,6 +67,7 @@ public abstract class KafkaClientSaslHandshaker private static final int ERROR_NONE = 0; private static final int ERROR_CLUSTER_AUTHORIZATION_FAILED = 31; private static final int ERROR_UNSUPPORTED_VERSION = 35; + private static final int ERROR_TOPIC_AUTHORIZATION_FAILED = 29; private static final String CLIENT_KEY = "Client Key"; private static final String SERVER_KEY = "Server Key"; @@ -433,12 +434,14 @@ protected final void onDecodeResponseErrorCode( long bindingId, int apiKey, int apiVersion, - int errorCode) + int errorCode, + String topic) { switch (errorCode) { case ERROR_CLUSTER_AUTHORIZATION_FAILED -> event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); case ERROR_UNSUPPORTED_VERSION -> event.apiVersionRejected(traceId, bindingId, apiKey, apiVersion); + case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, topic); } } @@ -710,7 +713,8 @@ private int decodeSaslPlainAuthenticate( final int errorCode = authenticateResponse.errorCode(); if (errorCode != ERROR_NONE) { - event.authorizationFailed(traceId, client.originId, client.sasl.username); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, + authenticateResponse.errorMessage().asString()); } progress = authenticateResponse.limit(); @@ -748,7 +752,8 @@ private int decodeSaslScramAuthenticateFirst( final int errorCode = authenticateResponse.errorCode(); if (errorCode != ERROR_NONE) { - event.authorizationFailed(traceId, client.originId, client.sasl.username); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, + authenticateResponse.errorMessage().asString()); } progress = authenticateResponse.limit(); @@ -777,6 +782,7 @@ private int decodeSaslScramAuthenticateFirst( } else { + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, null); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } @@ -826,6 +832,7 @@ private int decodeSaslScramAuthenticateFinal( if (!Arrays.equals(Base64.getDecoder().decode(serverFinalMessage), serverSignature)) { + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, null); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java index 14a930f2c5..3896708952 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java @@ -107,4 +107,14 @@ public void shouldReceiveTopicPartitionInfoChanged() throws Exception { k3po.finish(); } + + @Test + @Configuration("client.event.topic.authorization.failed.yaml") + @Specification({ + "${app}/topic.authorization.failed/client", + "${net}/topic.authorization.failed/server"}) + public void shouldReceiveTopicAuthorizationFailed() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java index c942342a53..507b0f178d 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java @@ -73,6 +73,16 @@ public void shouldRequestTopicPartitionInfoWithSaslScram() throws Exception k3po.finish(); } + @Test + @Configuration("client.event.sasl.authentication.failed.yaml") + @Specification({ + "${app}/sasl.authentication.failed/client", + "${net}/sasl.authentication.failed/server"}) + public void shouldReceiveSaslAuthenticationFailed() throws Exception + { + k3po.finish(); + } + public static String supplyNonce() { return "fyko+d2lbbFgONRv9qkxdawL"; diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 2c9d81b12d..70c7150684 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -669,7 +669,9 @@ scope kafka { AUTHORIZATION_FAILED (1), API_VERSION_REJECTED (2), - CLUSTER_AUTHORIZATION_FAILED (3) + CLUSTER_AUTHORIZATION_FAILED (3), + TOPIC_AUTHORIZATION_FAILED (4), + SASL_AUTHENTICATION_FAILED (5) } struct KafkaAuthorizationFailedEx extends core::stream::Extension @@ -689,11 +691,24 @@ scope kafka int32 apiVersion; } + struct KafkaTopicAuthorizationFailedEx extends core::stream::Extension + { + string8 topic = null; + } + + struct KafkaSaslAuthenticationFailedEx extends core::stream::Extension + { + string8 identity; + string16 error = null; + } + union KafkaEventEx switch (KafkaEventType) { case AUTHORIZATION_FAILED: KafkaAuthorizationFailedEx authorizationFailed; case API_VERSION_REJECTED: KafkaApiVersionRejectedEx apiVersionRejected; case CLUSTER_AUTHORIZATION_FAILED: KafkaClusterAuthorizationFailedEx clusterAuthorizationFailed; + case TOPIC_AUTHORIZATION_FAILED: KafkaTopicAuthorizationFailedEx topicAuthorizationFailed; + case SASL_AUTHENTICATION_FAILED: KafkaSaslAuthenticationFailedEx saslAuthenticationFailed; } } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml new file mode 100644 index 0000000000..2c4b738dbb --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml @@ -0,0 +1,49 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.sasl.authentication.failed + name: BINDING_KAFKA_SASL_AUTHENTICATION_FAILED + message: "SASL authentication failed for identity (username): Authentication failed." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + options: + servers: + - localhost:9092 + sasl: + mechanism: plain + username: username + password: password + routes: + - exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml new file mode 100644 index 0000000000..7b8a31645b --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.topic.authorization.failed + name: BINDING_KAFKA_TOPIC_AUTHORIZATION_FAILED + message: "Topic authorization failed for topic (test)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt new file mode 100644 index 0000000000..c93df37607 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connect aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt new file mode 100644 index 0000000000..b1b16c2817 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +rejected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt new file mode 100644 index 0000000000..c93df37607 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connect aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt new file mode 100644 index 0000000000..b1b16c2817 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +rejected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt new file mode 100644 index 0000000000..1d3f39b462 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 22 # size + 17s # sasl.handshake + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +read 17 # size + ${newRequestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +write 37 # size + 36s # sasl.authenticate + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +read 42 # size + ${newRequestId} + 58s # sasl authentication failed + 22s "Authentication failed." # error message + -1 # null auth bytes + 0L # session lifetime diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt new file mode 100644 index 0000000000..5ae0a39baa --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 22 # size + 17s # sasl.handshake + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +write 17 # size + ${requestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +read 37 # size + 36s # sasl.authenticate + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +write 42 # size + ${requestId} + 58s # sasl authentication failed + 22s "Authentication failed." # error message + -1 # null auth bytes + 0L # session lifetime diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt new file mode 100644 index 0000000000..56c562c45b --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt @@ -0,0 +1,58 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 106 # size + ${newRequestId} + [0..4] + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 2 + 19s "broker2.example.com" + 9092 + -1s + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 29s # topic-authorization-failed + 4s "test" # "test" topic + [0x00] # not internal + 0 # partitions diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt new file mode 100644 index 0000000000..8e1ddd3e83 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 106 # size + ${requestId} + 0 + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 2 + 19s "broker2.example.com" + 9092 + -1s + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 29s # topic-authorization-failed + 4s "test" # "test" topic + [0x00] # not internal + 0 # partitions From 5ef6f417ac77f803a67a7ae826ca3282bcfaeb1f Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 27 Mar 2026 09:56:31 +0530 Subject: [PATCH 2/2] addressing review comments --- .../internal/events/KafkaEventContext.java | 12 ++++++++++++ .../internal/events/KafkaEventFormatter.java | 4 +++- .../stream/KafkaClientDescribeFactory.java | 2 +- .../stream/KafkaClientGroupFactory.java | 18 +++++++----------- .../stream/KafkaClientMetaFactory.java | 10 +++++++++- .../stream/KafkaClientOffsetCommitFactory.java | 2 +- .../stream/KafkaClientOffsetFetchFactory.java | 2 +- .../stream/KafkaClientSaslHandshaker.java | 17 ++++++++++++++--- .../main/resources/META-INF/zilla/kafka.idl | 2 ++ ...lient.event.topic.authorization.failed.yaml | 2 +- 10 files changed, 51 insertions(+), 20 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java index 9ff99d2185..29ac413192 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java @@ -139,12 +139,16 @@ public void clusterAuthorizationFailed( public void topicAuthorizationFailed( long traceId, long bindingId, + int apiKey, + int apiVersion, String topic) { KafkaEventExFW extension = kafkaEventExRW .wrap(extensionBuffer, 0, extensionBuffer.capacity()) .topicAuthorizationFailed(e -> e .typeId(TOPIC_AUTHORIZATION_FAILED.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) .topic(topic) ) .build(); @@ -159,6 +163,14 @@ public void topicAuthorizationFailed( eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); } + public void saslAuthenticationFailed( + long traceId, + long bindingId, + String identity) + { + saslAuthenticationFailed(traceId, bindingId, identity, null); + } + public void saslAuthenticationFailed( long traceId, long bindingId, diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java index cb9c0801f2..2af2eb9a7d 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java @@ -72,7 +72,9 @@ public String format( case TOPIC_AUTHORIZATION_FAILED: { final KafkaTopicAuthorizationFailedExFW ex = extension.topicAuthorizationFailed(); - result = String.format("Topic authorization failed for topic (%s).", asString(ex.topic())); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Topic authorization failed for topic (%s).", + apiKey.title(), ex.apiVersion(), asString(ex.topic())); break; } case SASL_AUTHENTICATION_FAILED: diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java index 4fc3979a4c..d9c6ff6a85 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java @@ -1028,7 +1028,7 @@ private void onDecodeResponseErrorCode( int errorCode) { super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, - errorCode, null); + errorCode); } private void onNetwork( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java index 6b0c1dee08..d04640db55 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java @@ -945,7 +945,7 @@ private int decodeFindCoordinatorResponse( break; default: client.onDecodeResponseErrorCode(traceId, client.originId, FIND_COORDINATOR_API_KEY, - FIND_COORDINATOR_API_VERSION, errorCode, null); + FIND_COORDINATOR_API_VERSION, errorCode); client.errorCode = errorCode; client.decoder = decodeClusterReject; break; @@ -1095,8 +1095,7 @@ private int decodeJoinGroupResponse( joinGroupResponse.memberId().asString()); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, JOIN_GROUP_API_KEY, JOIN_GROUP_VERSION, errorCode, - null); + client.onDecodeResponseErrorCode(traceId, client.originId, JOIN_GROUP_API_KEY, JOIN_GROUP_VERSION, errorCode); client.errorCode = errorCode; client.decoder = decodeJoinGroupReject; break; @@ -1156,8 +1155,7 @@ private int decodeSyncGroupResponse( client.onSyncGroupResponse(traceId, authorization, syncGroupResponse.assignment()); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, SYNC_GROUP_API_KEY, SYNC_GROUP_VERSION, errorCode, - null); + client.onDecodeResponseErrorCode(traceId, client.originId, SYNC_GROUP_API_KEY, SYNC_GROUP_VERSION, errorCode); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; break; @@ -1220,8 +1218,7 @@ private int decodeHeartbeatResponse( client.onHeartbeatResponse(traceId, authorization); break; default: - client.onDecodeResponseErrorCode(traceId, client.originId, HEARTBEAT_API_KEY, HEARTBEAT_VERSION, errorCode, - null); + client.onDecodeResponseErrorCode(traceId, client.originId, HEARTBEAT_API_KEY, HEARTBEAT_VERSION, errorCode); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; break; @@ -1288,7 +1285,7 @@ private int decodeLeaveGroupResponse( else { client.onDecodeResponseErrorCode(traceId, client.originId, LEAVE_GROUP_API_KEY, - LEAVE_GROUP_VERSION, errorCode, null); + LEAVE_GROUP_VERSION, errorCode); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; } @@ -1304,7 +1301,7 @@ private int decodeLeaveGroupResponse( else { client.onDecodeResponseErrorCode(traceId, client.originId, LEAVE_GROUP_API_KEY, LEAVE_GROUP_VERSION, - errorCode, null); + errorCode); client.errorCode = errorCode; client.decoder = decodeCoordinatorReject; } @@ -2682,8 +2679,7 @@ private void onDecodeResponseErrorCode( long originId, int errorCode) { - super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, errorCode, - null); + super.onDecodeResponseErrorCode(traceId, originId, DESCRIBE_CONFIGS_API_KEY, DESCRIBE_CONFIGS_API_VERSION, errorCode); } private void onNetwork( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java index eb6ed07e63..4964c96090 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java @@ -1836,10 +1836,18 @@ private void onDecodePartition( } else { - onDecodeResponseErrorCode(traceId, originId, partitionError, null); + onDecodeResponseErrorCode(traceId, originId, partitionError); } } + private void onDecodeResponseErrorCode( + long traceId, + long originId, + int errorCode) + { + onDecodeResponseErrorCode(traceId, originId, errorCode, null); + } + private void onDecodeResponseErrorCode( long traceId, long originId, diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java index 707c49b051..94d987adbe 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetCommitFactory.java @@ -593,7 +593,7 @@ private int decodeOffsetCommitPartition( else { client.onDecodeResponseErrorCode(traceId, client.originId, OFFSET_COMMIT_API_KEY, OFFSET_COMMIT_API_VERSION, - errorCode, null); + errorCode); client.errorCode = errorCode; client.decoder = decodeReject; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java index f2f3ef0980..5266347e99 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientOffsetFetchFactory.java @@ -666,7 +666,7 @@ private int decodeOffsetFetchPartition( break; default: client.onDecodeResponseErrorCode(traceId, client.originId, OFFSET_FETCH_API_KEY, OFFSET_FETCH_API_VERSION, - errorCode, null); + errorCode); client.errorCode = errorCode; client.decoder = decodeReject; break; diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java index 19f3c6fa3c..31b566f767 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java @@ -429,6 +429,16 @@ private void doEncodeSaslScramFinalAuthenticateRequest( doDecodeSaslAuthenticateResponse(traceId); } + protected final void onDecodeResponseErrorCode( + long traceId, + long bindingId, + int apiKey, + int apiVersion, + int errorCode) + { + onDecodeResponseErrorCode(traceId, bindingId, apiKey, apiVersion, errorCode, null); + } + protected final void onDecodeResponseErrorCode( long traceId, long bindingId, @@ -441,7 +451,8 @@ protected final void onDecodeResponseErrorCode( { case ERROR_CLUSTER_AUTHORIZATION_FAILED -> event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); case ERROR_UNSUPPORTED_VERSION -> event.apiVersionRejected(traceId, bindingId, apiKey, apiVersion); - case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, topic); + case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, apiKey, + apiVersion, topic); } } @@ -782,7 +793,7 @@ private int decodeSaslScramAuthenticateFirst( } else { - event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, null); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } @@ -832,7 +843,7 @@ private int decodeSaslScramAuthenticateFinal( if (!Arrays.equals(Base64.getDecoder().decode(serverFinalMessage), serverSignature)) { - event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, null); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 70c7150684..3060cda68a 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -693,6 +693,8 @@ scope kafka struct KafkaTopicAuthorizationFailedEx extends core::stream::Extension { + int32 apiKey; + int32 apiVersion; string8 topic = null; } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml index 7b8a31645b..d26b589b79 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml @@ -29,7 +29,7 @@ telemetry: - qname: test:app0 id: binding.kafka.topic.authorization.failed name: BINDING_KAFKA_TOPIC_AUTHORIZATION_FAILED - message: "Topic authorization failed for topic (test)." + message: "Metadata (Version: 5) Topic authorization failed for topic (test)." - qname: engine:events id: engine.stopped name: ENGINE_STOPPED