Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED;

import java.nio.ByteBuffer;
import java.time.Clock;
Expand All @@ -43,6 +45,8 @@ public class KafkaEventContext
private final int authorizationFailedEventId;
private final int apiVersionRejectedEventId;
private final int clusterAuthorizationFailedEventId;
private final int topicAuthorizationFailedEventId;
private final int saslAuthenticationFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

Expand All @@ -53,6 +57,8 @@ public KafkaEventContext(
this.authorizationFailedEventId = context.supplyEventId("binding.kafka.authorization.failed");
this.apiVersionRejectedEventId = context.supplyEventId("binding.kafka.api.version.rejected");
this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed");
this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed");
this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand Down Expand Up @@ -129,4 +135,64 @@ public void clusterAuthorizationFailed(
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void topicAuthorizationFailed(
long traceId,
long bindingId,
int apiKey,
int apiVersion,
String topic)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.topicAuthorizationFailed(e -> e
.typeId(TOPIC_AUTHORIZATION_FAILED.value())
.apiKey(apiKey)
.apiVersion(apiVersion)
.topic(topic)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(topicAuthorizationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void saslAuthenticationFailed(
long traceId,
long bindingId,
String identity)
{
saslAuthenticationFailed(traceId, bindingId, identity, null);
}

public void saslAuthenticationFailed(
long traceId,
long bindingId,
String identity,
String error)
{
Comment on lines +174 to +179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error is not required, then create an overload method same name but without error parameter and call this passing null for error.

KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.saslAuthenticationFailed(e -> e
.typeId(SASL_AUTHENTICATION_FAILED.value())
.identity(identity)
.error(error)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(saslAuthenticationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

Expand Down Expand Up @@ -67,6 +69,21 @@ public String format(
result = String.format("%s (Version: %d)", apiKey.title(), ex.apiVersion());
break;
}
case TOPIC_AUTHORIZATION_FAILED:
{
final KafkaTopicAuthorizationFailedExFW ex = extension.topicAuthorizationFailed();
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d) Topic authorization failed for topic (%s).",
apiKey.title(), ex.apiVersion(), asString(ex.topic()));
break;
}
case SASL_AUTHENTICATION_FAILED:
{
final KafkaSaslAuthenticationFailedExFW ex = extension.saslAuthenticationFailed();
result = String.format("SASL authentication failed for identity (%s): %s",
asString(ex.identity()), asString(ex.error()));
break;
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2951,7 +2951,7 @@ private void onDecodeOffsetsPartition(
this.nextOffset = partitionOffset;
break;
default:
onDecodeResponseErrorCode(traceId, originId, errorCode);
onDecodeResponseErrorCode(traceId, originId, errorCode, topic);
cleanupApplication(traceId, errorCode);
doNetworkEnd(traceId, authorization);
break;
Expand Down Expand Up @@ -2993,7 +2993,7 @@ private void onDecodeFetchPartition(
}
else
{
onDecodeResponseErrorCode(traceId, originId, errorCode);
onDecodeResponseErrorCode(traceId, originId, errorCode, topic);
}

cleanupApplication(traceId, errorCode);
Expand All @@ -3005,9 +3005,10 @@ private void onDecodeFetchPartition(
private void onDecodeResponseErrorCode(
long traceId,
long originId,
int errorCode)
int errorCode,
String topic)
{
super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode);
super.onDecodeResponseErrorCode(traceId, originId, FETCH_API_KEY, FETCH_API_VERSION, errorCode, topic);
}

private void onDecodeFetchTransactionAbort(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,7 @@ private void onDecodeTopic(
newPartitions.clear();
break;
default:
onDecodeResponseErrorCode(traceId, originId, errorCode);
onDecodeResponseErrorCode(traceId, originId, errorCode, topic);
final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.error(errorCode)
Expand Down Expand Up @@ -1845,7 +1845,16 @@ private void onDecodeResponseErrorCode(
long originId,
int errorCode)
{
super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode);
onDecodeResponseErrorCode(traceId, originId, errorCode, null);
}

private void onDecodeResponseErrorCode(
long traceId,
long originId,
int errorCode,
String topic)
{
super.onDecodeResponseErrorCode(traceId, originId, METADATA_API_KEY, METADATA_API_VERSION, errorCode, topic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,7 @@ private void onDecodeProducePartition(
assert partitionId == this.partitionId;
break;
default:
onDecodeResponseErrorCode(traceId, originId, errorCode);
onDecodeResponseErrorCode(traceId, originId, errorCode, topic);
final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.error(errorCode)
Expand All @@ -2275,9 +2275,10 @@ private void onDecodeProducePartition(
private void onDecodeResponseErrorCode(
long traceId,
long originId,
int errorCode)
int errorCode,
String topic)
{
super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode);
super.onDecodeResponseErrorCode(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, errorCode, topic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class KafkaClientSaslHandshaker
private static final int ERROR_NONE = 0;
private static final int ERROR_CLUSTER_AUTHORIZATION_FAILED = 31;
private static final int ERROR_UNSUPPORTED_VERSION = 35;
private static final int ERROR_TOPIC_AUTHORIZATION_FAILED = 29;

private static final String CLIENT_KEY = "Client Key";
private static final String SERVER_KEY = "Server Key";
Expand Down Expand Up @@ -434,11 +435,24 @@ protected final void onDecodeResponseErrorCode(
int apiKey,
int apiVersion,
int errorCode)
{
onDecodeResponseErrorCode(traceId, bindingId, apiKey, apiVersion, errorCode, null);
}

protected final void onDecodeResponseErrorCode(
long traceId,
long bindingId,
int apiKey,
int apiVersion,
int errorCode,
String topic)
Comment on lines +447 to +448
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding the extra parameter on the existing method and changing all the call sites to pass null, let's add an override with the same arguments as the original here, and default null for topic, then all the call sites that don't need topic won't need to change to pass null explicitly any more.

{
switch (errorCode)
{
case ERROR_CLUSTER_AUTHORIZATION_FAILED -> event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion);
case ERROR_UNSUPPORTED_VERSION -> event.apiVersionRejected(traceId, bindingId, apiKey, apiVersion);
case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, apiKey,
apiVersion, topic);
}
}

Expand Down Expand Up @@ -710,7 +724,8 @@ private int decodeSaslPlainAuthenticate(
final int errorCode = authenticateResponse.errorCode();
if (errorCode != ERROR_NONE)
{
event.authorizationFailed(traceId, client.originId, client.sasl.username);
event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username,
authenticateResponse.errorMessage().asString());
}

progress = authenticateResponse.limit();
Expand Down Expand Up @@ -748,7 +763,8 @@ private int decodeSaslScramAuthenticateFirst(
final int errorCode = authenticateResponse.errorCode();
if (errorCode != ERROR_NONE)
{
event.authorizationFailed(traceId, client.originId, client.sasl.username);
event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username,
authenticateResponse.errorMessage().asString());
}

progress = authenticateResponse.limit();
Expand Down Expand Up @@ -777,6 +793,7 @@ private int decodeSaslScramAuthenticateFirst(
}
else
{
event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username);
client.onDecodeSaslResponse(traceId);
client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED);
}
Expand Down Expand Up @@ -826,6 +843,7 @@ private int decodeSaslScramAuthenticateFinal(
if (!Arrays.equals(Base64.getDecoder().decode(serverFinalMessage),
serverSignature))
{
event.saslAuthenticationFailed(traceId, client.originId, client.sasl.username);
client.onDecodeSaslResponse(traceId);
client.onDecodeSaslAuthenticateResponse(traceId, authorization, ERROR_SASL_AUTHENTICATION_FAILED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ public void shouldReceiveTopicPartitionInfoChanged() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.event.topic.authorization.failed.yaml")
@Specification({
"${app}/topic.authorization.failed/client",
"${net}/topic.authorization.failed/server"})
public void shouldReceiveTopicAuthorizationFailed() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public void shouldRequestTopicPartitionInfoWithSaslScram() throws Exception
k3po.finish();
}

@Test
@Configuration("client.event.sasl.authentication.failed.yaml")
@Specification({
"${app}/sasl.authentication.failed/client",
"${net}/sasl.authentication.failed/server"})
public void shouldReceiveSaslAuthenticationFailed() throws Exception
{
k3po.finish();
}

public static String supplyNonce()
{
return "fyko+d2lbbFgONRv9qkxdawL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,9 @@ scope kafka
{
AUTHORIZATION_FAILED (1),
API_VERSION_REJECTED (2),
CLUSTER_AUTHORIZATION_FAILED (3)
CLUSTER_AUTHORIZATION_FAILED (3),
TOPIC_AUTHORIZATION_FAILED (4),
SASL_AUTHENTICATION_FAILED (5)
}

struct KafkaAuthorizationFailedEx extends core::stream::Extension
Expand All @@ -689,11 +691,26 @@ scope kafka
int32 apiVersion;
}

struct KafkaTopicAuthorizationFailedEx extends core::stream::Extension
{
int32 apiKey;
int32 apiVersion;
string8 topic = null;
}

struct KafkaSaslAuthenticationFailedEx extends core::stream::Extension
{
string8 identity;
string16 error = null;
}

union KafkaEventEx switch (KafkaEventType)
{
case AUTHORIZATION_FAILED: KafkaAuthorizationFailedEx authorizationFailed;
case API_VERSION_REJECTED: KafkaApiVersionRejectedEx apiVersionRejected;
case CLUSTER_AUTHORIZATION_FAILED: KafkaClusterAuthorizationFailedEx clusterAuthorizationFailed;
case TOPIC_AUTHORIZATION_FAILED: KafkaTopicAuthorizationFailedEx topicAuthorizationFailed;
case SASL_AUTHENTICATION_FAILED: KafkaSaslAuthenticationFailedEx saslAuthenticationFailed;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Copyright 2021-2024 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

---
name: test
telemetry:
exporters:
exporter0:
type: test
options:
events:
- qname: engine:events
id: engine.started
name: ENGINE_STARTED
message: Engine Started.
- qname: test:app0
id: binding.kafka.sasl.authentication.failed
name: BINDING_KAFKA_SASL_AUTHENTICATION_FAILED
message: "SASL authentication failed for identity (username): Authentication failed."
- qname: engine:events
id: engine.stopped
name: ENGINE_STOPPED
message: Engine Stopped.
bindings:
app0:
type: kafka
kind: client
options:
servers:
- localhost:9092
sasl:
mechanism: plain
username: username
password: password
routes:
- exit: net0
Loading
Loading