From 35e0016cdb5515fc4116ed637869cf8fa9e35d27 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Tue, 25 Nov 2025 04:28:33 +0700 Subject: [PATCH 1/3] bug-fix: null check consumerRecord.value() before sending it to PB DynamicMessage.parseFrom --- .../serde/proto/deserialization/ProtoDeserializer.java | 7 ++++++- dagger-core/build.gradle | 4 ++-- version.txt | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java index 5cf61eb90..6da14e54c 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java @@ -58,8 +58,13 @@ public boolean isEndOfStream(Row nextElement) { @Override public Row deserialize(ConsumerRecord consumerRecord) { Descriptors.Descriptor descriptor = getProtoParser(); + byte[] value = consumerRecord.value(); + if (value == null) { + LOGGER.warn("Record value / byteArray is NULL! " + protoClassName); + return createDefaultInvalidRow(DynamicMessage.getDefaultInstance(descriptor)); + } try { - DynamicMessage proto = DynamicMessage.parseFrom(descriptor, consumerRecord.value()); + DynamicMessage proto = DynamicMessage.parseFrom(descriptor, value); return addTimestampFieldToRow(proto); } catch (DescriptorNotFoundException e) { throw new DescriptorNotFoundException(e); diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index daab72661..3105a8d71 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -103,8 +103,8 @@ dependencies { exclude group: "io.grpc" } dependenciesJar 'org.apache.flink:flink-connector-kafka_2.11:' + flinkVersion - dependenciesJar 'com.google.protobuf:protobuf-java:3.23.2' - dependenciesJar 'com.google.protobuf:protobuf-java-util:3.1.0' + dependenciesJar 'com.google.protobuf:protobuf-java:3.25.3' + dependenciesJar 'com.google.protobuf:protobuf-java-util:3.25.3' dependenciesJar 'org.influxdb:influxdb-java:2.8' dependenciesJar 'org.elasticsearch.client:elasticsearch-rest-client:6.6.1' dependenciesJar 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.10.0' diff --git a/version.txt b/version.txt index 34a83616b..26acbf080 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.1 +0.12.2 From dc269b571c89c5b21892dfcbcc854919347faabe Mon Sep 17 00:00:00 2001 From: rajuGT Date: Fri, 30 Jan 2026 02:57:06 +0700 Subject: [PATCH 2/3] Fix test related to payload being null --- .../serde/proto/deserialization/ProtoDeserializerTest.java | 6 +++--- version.txt | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java index 9a949edba..16084ab06 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java @@ -246,10 +246,10 @@ public void shouldIgnoreStructWhileDeserialising() { } @Test - public void shouldThrowExceptionIfNotAbleToDeserialise() { + public void shouldMarkRecordInvalidIfThePayloadIsNull() { ProtoDeserializer protoDeserializer = new ProtoDeserializer(TestNestedRepeatedMessage.class.getTypeName(), 6, "rowtime", stencilClientOrchestrator); - assertThrows(DaggerDeserializationException.class, - () -> protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, null))); + Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, null)); + assertFalse((boolean) row.getField(row.getArity() - 2)); } @Test diff --git a/version.txt b/version.txt index 26acbf080..34a83616b 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.2 +0.12.1 From 1f1faced04d32580c48931a716166eedee51f328 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Fri, 30 Jan 2026 03:01:44 +0700 Subject: [PATCH 3/3] fix checkstyleTest --- .../serde/proto/deserialization/ProtoDeserializerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java index 16084ab06..ac938b170 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java @@ -5,7 +5,6 @@ import com.gotocompany.dagger.common.configuration.Configuration; import com.gotocompany.dagger.common.core.StencilClientOrchestrator; import com.gotocompany.dagger.common.exceptions.DescriptorNotFoundException; -import com.gotocompany.dagger.common.exceptions.serde.DaggerDeserializationException; import com.gotocompany.dagger.common.serde.typehandler.RowFactory; import com.gotocompany.dagger.consumer.*; import org.apache.flink.api.common.typeinfo.TypeInformation;