diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java index 1f1ddf954d..29ac413192 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java @@ -18,6 +18,8 @@ import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED; import java.nio.ByteBuffer; import java.time.Clock; @@ -43,6 +45,8 @@ public class KafkaEventContext private final int authorizationFailedEventId; private final int apiVersionRejectedEventId; private final int clusterAuthorizationFailedEventId; + private final int topicAuthorizationFailedEventId; + private final int saslAuthenticationFailedEventId; private final MessageConsumer eventWriter; private final Clock clock; @@ -53,6 +57,8 @@ public KafkaEventContext( this.authorizationFailedEventId = context.supplyEventId("binding.kafka.authorization.failed"); this.apiVersionRejectedEventId = context.supplyEventId("binding.kafka.api.version.rejected"); this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed"); + this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed"); + this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed"); this.eventWriter = context.supplyEventWriter(); this.clock = context.clock(); } @@ -129,4 +135,64 @@ public void clusterAuthorizationFailed( .build(); eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); } + + public void topicAuthorizationFailed( + long traceId, + long bindingId, + int apiKey, + int apiVersion, + String topic) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .topicAuthorizationFailed(e -> e + .typeId(TOPIC_AUTHORIZATION_FAILED.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) + .topic(topic) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(topicAuthorizationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void saslAuthenticationFailed( + long traceId, + long bindingId, + String identity) + { + saslAuthenticationFailed(traceId, bindingId, identity, null); + } + + public void saslAuthenticationFailed( + long traceId, + long bindingId, + String identity, + String error) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .saslAuthenticationFailed(e -> e + .typeId(SASL_AUTHENTICATION_FAILED.value()) + .identity(identity) + .error(error) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(saslAuthenticationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java index bdc75a3d29..2af2eb9a7d 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java @@ -23,6 +23,8 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; @@ -67,6 +69,21 @@ public String format( result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion()); break; } + case TOPIC_AUTHORIZATION_FAILED: + { + final KafkaTopicAuthorizationFailedExFW ex = extension.topicAuthorizationFailed(); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Topic authorization failed for topic (%s).", + apiKey.title(), ex.apiVersion(), asString(ex.topic())); + break; + } + case SASL_AUTHENTICATION_FAILED: + { + final KafkaSaslAuthenticationFailedExFW ex = extension.saslAuthenticationFailed(); + result = String.format("SASL authentication failed for identity (%s): %s", + asString(ex.identity()), asString(ex.error())); + break; + } } return result; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java index 5c869d0c55..cb8b00fd60 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientFetchFactory.java @@ -2951,7 +2951,7 @@ private void onDecodeOffsetsPartition( this.nextOffset = partitionOffset; break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); cleanupApplication(traceId, errorCode); doNetworkEnd(traceId, authorization); break; @@ -2993,7 +2993,7 @@ private void onDecodeFetchPartition( } else { - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); } cleanupApplication(traceId, errorCode); @@ -3005,9 +3005,10 @@ private void onDecodeFetchPartition( private void onDecodeResponseErrorCode( long traceId, long originId, - int errorCode) + int errorCode, + String topic) { - super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode, topic); } private void onDecodeFetchTransactionAbort( diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java index 6594978abd..4964c96090 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java @@ -1813,7 +1813,7 @@ private void onDecodeTopic( newPartitions.clear(); break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) .error(errorCode) @@ -1845,7 +1845,16 @@ private void onDecodeResponseErrorCode( long originId, int errorCode) { - super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, null); + } + + private void onDecodeResponseErrorCode( + long traceId, + long originId, + int errorCode, + String topic) + { + super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode, topic); } @Override diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index b03526d08c..26f1a49ba4 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -2261,7 +2261,7 @@ private void onDecodeProducePartition( assert partitionId == this.partitionId; break; default: - onDecodeResponseErrorCode(traceId, originId, errorCode); + onDecodeResponseErrorCode(traceId, originId, errorCode, topic); final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) .error(errorCode) @@ -2275,9 +2275,10 @@ private void onDecodeProducePartition( private void onDecodeResponseErrorCode( long traceId, long originId, - int errorCode) + int errorCode, + String topic) { - super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode); + super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode, topic); } @Override diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java index f1e5f8ed90..31b566f767 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java @@ -67,6 +67,7 @@ public abstract class KafkaClientSaslHandshaker private static final int ERROR_NONE = 0; private static final int ERROR_CLUSTER_AUTHORIZATION_FAILED = 31; private static final int ERROR_UNSUPPORTED_VERSION = 35; + private static final int ERROR_TOPIC_AUTHORIZATION_FAILED = 29; private static final String CLIENT_KEY = "Client Key"; private static final String SERVER_KEY = "Server Key"; @@ -434,11 +435,24 @@ protected final void onDecodeResponseErrorCode( int apiKey, int apiVersion, int errorCode) + { + onDecodeResponseErrorCode(traceId, bindingId, apiKey, apiVersion, errorCode, null); + } + + protected final void onDecodeResponseErrorCode( + long traceId, + long bindingId, + int apiKey, + int apiVersion, + int errorCode, + String topic) { switch (errorCode) { case ERROR_CLUSTER_AUTHORIZATION_FAILED -> event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); case ERROR_UNSUPPORTED_VERSION -> event.apiVersionRejected(traceId, bindingId, apiKey, apiVersion); + case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, apiKey, + apiVersion, topic); } } @@ -710,7 +724,8 @@ private int decodeSaslPlainAuthenticate( final int errorCode = authenticateResponse.errorCode(); if (errorCode != ERROR_NONE) { - event.authorizationFailed(traceId, client.originId, client.sasl.username); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, + authenticateResponse.errorMessage().asString()); } progress = authenticateResponse.limit(); @@ -748,7 +763,8 @@ private int decodeSaslScramAuthenticateFirst( final int errorCode = authenticateResponse.errorCode(); if (errorCode != ERROR_NONE) { - event.authorizationFailed(traceId, client.originId, client.sasl.username); + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username, + authenticateResponse.errorMessage().asString()); } progress = authenticateResponse.limit(); @@ -777,6 +793,7 @@ private int decodeSaslScramAuthenticateFirst( } else { + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } @@ -826,6 +843,7 @@ private int decodeSaslScramAuthenticateFinal( if (!Arrays.equals(Base64.getDecoder().decode(serverFinalMessage), serverSignature)) { + event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username); client.onDecodeSaslResponse(traceId); client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED); } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java index 14a930f2c5..3896708952 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaIT.java @@ -107,4 +107,14 @@ public void shouldReceiveTopicPartitionInfoChanged() throws Exception { k3po.finish(); } + + @Test + @Configuration("client.event.topic.authorization.failed.yaml") + @Specification({ + "${app}/topic.authorization.failed/client", + "${net}/topic.authorization.failed/server"}) + public void shouldReceiveTopicAuthorizationFailed() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java index c942342a53..507b0f178d 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java @@ -73,6 +73,16 @@ public void shouldRequestTopicPartitionInfoWithSaslScram() throws Exception k3po.finish(); } + @Test + @Configuration("client.event.sasl.authentication.failed.yaml") + @Specification({ + "${app}/sasl.authentication.failed/client", + "${net}/sasl.authentication.failed/server"}) + public void shouldReceiveSaslAuthenticationFailed() throws Exception + { + k3po.finish(); + } + public static String supplyNonce() { return "fyko+d2lbbFgONRv9qkxdawL"; diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 2c9d81b12d..3060cda68a 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -669,7 +669,9 @@ scope kafka { AUTHORIZATION_FAILED (1), API_VERSION_REJECTED (2), - CLUSTER_AUTHORIZATION_FAILED (3) + CLUSTER_AUTHORIZATION_FAILED (3), + TOPIC_AUTHORIZATION_FAILED (4), + SASL_AUTHENTICATION_FAILED (5) } struct KafkaAuthorizationFailedEx extends core::stream::Extension @@ -689,11 +691,26 @@ scope kafka int32 apiVersion; } + struct KafkaTopicAuthorizationFailedEx extends core::stream::Extension + { + int32 apiKey; + int32 apiVersion; + string8 topic = null; + } + + struct KafkaSaslAuthenticationFailedEx extends core::stream::Extension + { + string8 identity; + string16 error = null; + } + union KafkaEventEx switch (KafkaEventType) { case AUTHORIZATION_FAILED: KafkaAuthorizationFailedEx authorizationFailed; case API_VERSION_REJECTED: KafkaApiVersionRejectedEx apiVersionRejected; case CLUSTER_AUTHORIZATION_FAILED: KafkaClusterAuthorizationFailedEx clusterAuthorizationFailed; + case TOPIC_AUTHORIZATION_FAILED: KafkaTopicAuthorizationFailedEx topicAuthorizationFailed; + case SASL_AUTHENTICATION_FAILED: KafkaSaslAuthenticationFailedEx saslAuthenticationFailed; } } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml new file mode 100644 index 0000000000..2c4b738dbb --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.sasl.authentication.failed.yaml @@ -0,0 +1,49 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.sasl.authentication.failed + name: BINDING_KAFKA_SASL_AUTHENTICATION_FAILED + message: "SASL authentication failed for identity (username): Authentication failed." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + options: + servers: + - localhost:9092 + sasl: + mechanism: plain + username: username + password: password + routes: + - exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml new file mode 100644 index 0000000000..d26b589b79 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.topic.authorization.failed.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.topic.authorization.failed + name: BINDING_KAFKA_TOPIC_AUTHORIZATION_FAILED + message: "Metadata (Version: 5) Topic authorization failed for topic (test)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt new file mode 100644 index 0000000000..c93df37607 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/client.rpt @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connect aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt new file mode 100644 index 0000000000..b1b16c2817 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/sasl.authentication.failed/server.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +rejected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt new file mode 100644 index 0000000000..c93df37607 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/client.rpt @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connect aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt new file mode 100644 index 0000000000..b1b16c2817 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/topic.authorization.failed/server.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +rejected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt new file mode 100644 index 0000000000..1d3f39b462 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/client.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 22 # size + 17s # sasl.handshake + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +read 17 # size + ${newRequestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +write 37 # size + 36s # sasl.authenticate + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +read 42 # size + ${newRequestId} + 58s # sasl authentication failed + 22s "Authentication failed." # error message + -1 # null auth bytes + 0L # session lifetime diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt new file mode 100644 index 0000000000..5ae0a39baa --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/sasl.authentication.failed/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 22 # size + 17s # sasl.handshake + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +write 17 # size + ${requestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +read 37 # size + 36s # sasl.authenticate + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +write 42 # size + ${requestId} + 58s # sasl authentication failed + 22s "Authentication failed." # error message + -1 # null auth bytes + 0L # session lifetime diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt new file mode 100644 index 0000000000..56c562c45b --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/client.rpt @@ -0,0 +1,58 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 106 # size + ${newRequestId} + [0..4] + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 2 + 19s "broker2.example.com" + 9092 + -1s + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 29s # topic-authorization-failed + 4s "test" # "test" topic + [0x00] # not internal + 0 # partitions diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt new file mode 100644 index 0000000000..8e1ddd3e83 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5/topic.authorization.failed/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 106 # size + ${requestId} + 0 + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 2 + 19s "broker2.example.com" + 9092 + -1s + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 29s # topic-authorization-failed + 4s "test" # "test" topic + [0x00] # not internal + 0 # partitions