From 9db275f74d6393149bb0273dec831bb06091d8db Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 21 Jan 2026 22:33:58 +0700 Subject: [PATCH 1/6] feat and bug-fix - The previous implementation of StructMessageHandler did not correctly handle the fields This MR provides the implementation for the below "message" types google.protobuf.Struct, google.protobuf.Value, google.protobuf.ListValue, and google.protobuf.NullValue Adds GoogleProtobufComplexMessageHandler to handle some of the complex Google Protobuf message types that are dynamic and recursive in nature. This implementation converts these message types to Protobuf's byte-array representation. While outputting the data, the byte array is converted back to the original structure using the associated field descriptor. --- .../common/serde/typehandler/TypeHandler.java | 3 + .../serde/typehandler/TypeHandlerFactory.java | 7 +- .../GoogleProtobufComplexMessageHandler.java | 101 ++++++++++++++++++ dagger-core/build.gradle | 2 +- version.txt | 2 +- 5 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java index c7f3cdaba..b9120189b 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java @@ -17,6 +17,7 @@ public interface TypeHandler { */ boolean canHandle(); + // ---------- Flink -> Proto ---------- /** * Transform to protobuf message builder. * @@ -34,6 +35,8 @@ public interface TypeHandler { */ Object transformFromPostProcessor(Object field); + + // ---------- Proto -> Flink ---------- /** * Transform from protobuf message. * diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java index 2599f06bc..d9887eab2 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java @@ -1,15 +1,11 @@ package com.gotocompany.dagger.common.serde.typehandler; import com.google.protobuf.Descriptors; +import com.gotocompany.dagger.common.serde.typehandler.complex.*; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +60,7 @@ private static List getSpecificHandlers(Descriptors.FieldDescriptor new MapHandler(fieldDescriptor), new TimestampHandler(fieldDescriptor), new EnumHandler(fieldDescriptor), + new GoogleProtobufComplexMessageHandler(fieldDescriptor), new StructMessageHandler(fieldDescriptor), new RepeatedStructMessageHandler(fieldDescriptor), new RepeatedPrimitiveHandler(fieldDescriptor), diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java new file mode 100644 index 000000000..b1da0e1a1 --- /dev/null +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java @@ -0,0 +1,101 @@ +package com.gotocompany.dagger.common.serde.typehandler.complex; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.gotocompany.dagger.common.core.FieldDescriptorCache; +import com.gotocompany.dagger.common.serde.typehandler.TypeHandler; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.parquet.example.data.simple.SimpleGroup; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * + * A TypeHandler to handle some of the complex Google Protobuf message types + * that are dynamic and recursive in nature. + *

+ * github-link + *

+ * Struct is primarily used to represent JSON object types. + * Value can represent any primitive, a Struct, or an array type. + * ListValue represents a JSON array type. + * NullValue is an enum used to represent null. + *

+ * This implementation converts these message types to Protobuf's byte-array + * representation. While outputting the data, the byte array is converted back + * to the original structure using the associated field descriptor. + * + */ +public class GoogleProtobufComplexMessageHandler implements TypeHandler { + + private static final Set RECOGNIZED_COMPLEX_TYPES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "google.protobuf.Struct", + "google.protobuf.Value", + "google.protobuf.ListValue", + "google.protobuf.NullValue" + ))); + + private final Descriptors.FieldDescriptor fieldDescriptor; + + public GoogleProtobufComplexMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) { + this.fieldDescriptor = fieldDescriptor; + } + + @Override + public boolean canHandle() { + return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE + && RECOGNIZED_COMPLEX_TYPES.contains(fieldDescriptor.getMessageType().getFullName()); + } + + @Override + public Object transformFromProto(Object field) { + if (field == null) { + return null; + } + return ((DynamicMessage) field).toByteArray(); + } + + @Override + public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) { + return transformFromProto(field); + } + + @Override + public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) { + if (!canHandle() || field == null) { + return builder; + } + + try { + DynamicMessage parsed = DynamicMessage.parseFrom(fieldDescriptor.getMessageType(), (byte[]) field); + builder.setField(fieldDescriptor, parsed); + return builder; + } catch (Exception e) { + throw new RuntimeException("Failed to parse protobuf bytes for field: " + fieldDescriptor.getFullName(), e); + } + } + + @Override + public Object transformFromPostProcessor(Object field) { + return field; + } + + @Override + public Object transformFromParquet(SimpleGroup simpleGroup) { + return null; + } + + @Override + public Object transformToJson(Object field) { + return null; + } + + @Override + public TypeInformation getTypeInformation() { + return Types.PRIMITIVE_ARRAY(Types.BYTE); + } +} diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index daab72661..732126cb5 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -29,7 +29,7 @@ def flinkVersion = rootProject.flinkVersion version = rootProject.file('version.txt').text.trim() def minimalVersion = version -def dependenciesVersion = "0.5.3" +def dependenciesVersion = "0.5.4" description = """dagger to the heart!""" diff --git a/version.txt b/version.txt index 34a83616b..aa22d3ce3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.1 +0.12.3 From aa29fb191b34a2b4cdf5906745699e4c52d637fb Mon Sep 17 00:00:00 2001 From: rajuGT Date: Fri, 30 Jan 2026 03:15:01 +0700 Subject: [PATCH 2/6] version bump up --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index aa22d3ce3..26acbf080 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.3 +0.12.2 From f641289fd590d2eb34e022125189ad6f83f9e72e Mon Sep 17 00:00:00 2001 From: rajuGT Date: Fri, 30 Jan 2026 04:23:41 +0700 Subject: [PATCH 3/6] fix tests --- .../serde/typehandler/TypeHandlerFactory.java | 7 ++++++- .../GoogleProtobufComplexMessageHandler.java | 17 ++++++++++++++--- .../proto/deserialization/ProtoTypeTest.java | 9 +++++---- .../typehandler/TypeHandlerFactoryTest.java | 7 +++---- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java index d9887eab2..852155260 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java @@ -1,7 +1,12 @@ package com.gotocompany.dagger.common.serde.typehandler; import com.google.protobuf.Descriptors; -import com.gotocompany.dagger.common.serde.typehandler.complex.*; +import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java index b1da0e1a1..d72c382f8 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java @@ -14,7 +14,6 @@ import java.util.Set; /** - * * A TypeHandler to handle some of the complex Google Protobuf message types * that are dynamic and recursive in nature. *

@@ -28,7 +27,6 @@ * This implementation converts these message types to Protobuf's byte-array * representation. While outputting the data, the byte array is converted back * to the original structure using the associated field descriptor. - * */ public class GoogleProtobufComplexMessageHandler implements TypeHandler { @@ -56,7 +54,20 @@ public Object transformFromProto(Object field) { if (field == null) { return null; } - return ((DynamicMessage) field).toByteArray(); + + // Struct / Value default instance or empty + if (field instanceof DynamicMessage) { + DynamicMessage msg = (DynamicMessage) field; + + // CRITICAL: field not actually set + if (msg.getAllFields().isEmpty()) { + return null; + } + + return msg.toByteArray(); + } + + return null; } @Override diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java index 81aac42fe..1b32dcb8d 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java @@ -6,6 +6,7 @@ import com.gotocompany.dagger.consumer.TestBookingLogMessage; import com.gotocompany.dagger.consumer.TestNestedRepeatedMessage; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; @@ -137,9 +138,9 @@ public void shouldGiveAllNamesAndTypesIncludingStructFields() { @Test public void shouldReturnRowTypeForStructFields() { ProtoType protoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]); assertEquals("profile_data", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[35]); assertEquals("event_properties", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[36]); assertEquals("key_values", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[37]); @@ -155,7 +156,7 @@ public void shouldGiveAllNamesAndTypesIncludingPrimitiveArrayFields() { public void shouldGiveNameAndTypeForRepeatingStructType() { ProtoType testNestedRepeatedMessage = new ProtoType(TestNestedRepeatedMessage.class.getName(), "rowtime", stencilClientOrchestrator); assertEquals("metadata", ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldNames()[4]); - assertEquals(OBJECT_ARRAY(ROW()), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]); } private int bookingLogFieldIndex(String propertyName) { diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java index 155b5b05f..70b00ac05 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java @@ -4,14 +4,13 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler; import com.gotocompany.dagger.consumer.TestBookingLogMessage; import com.gotocompany.dagger.consumer.TestFeedbackLogMessage; import com.gotocompany.dagger.consumer.TestGrpcResponse; @@ -107,14 +106,14 @@ public void shouldReturnRepeatedEnumHandlerIfRepeatedEnumFieldDescriptorPassed() public void shouldReturnRepeatedStructHandlerIfRepeatedStructFieldDescriptorPassed() { Descriptors.FieldDescriptor repeatedStructFieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("metadata"); TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(repeatedStructFieldDescriptor); - assertEquals(RepeatedStructMessageHandler.class, typeHandler.getClass()); + assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass()); } @Test public void shouldReturnStructHandlerIfStructFieldDescriptorPassed() { Descriptors.FieldDescriptor structFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(structFieldDescriptor); - assertEquals(StructMessageHandler.class, typeHandler.getClass()); + assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass()); } @Test From 2b373e133ef9a1ee1fe3afd2fc16951880d7c1c9 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Mon, 2 Feb 2026 17:19:59 +0700 Subject: [PATCH 4/6] Add GoogleProtobufComplexMessageHandlerTest test cases --- ...ogleProtobufComplexMessageHandlerTest.java | 169 ++++++++++++++++++ .../src/test/proto/TestLogMessage.proto | 6 + 2 files changed, 175 insertions(+) create mode 100644 dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java new file mode 100644 index 000000000..04ae42085 --- /dev/null +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java @@ -0,0 +1,169 @@ +package com.gotocompany.dagger.common.serde.typehandler.complex; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import com.gotocompany.dagger.consumer.TestBookingLogMessage; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.schema.GroupType; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class GoogleProtobufComplexMessageHandlerTest { + + @Test + public void shouldReturnTrueForCanHandleForStructFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForValueFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("tag"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForListValueFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("tags"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForRepeatedFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("labels"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnFalseForCanHandleForMessageFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("driver_pickup_location"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertFalse(handler.canHandle()); + } + + @Test + public void shouldReturnTypeInformation() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + TypeInformation actualTypeInformation = handler.getTypeInformation(); + TypeInformation expectedTypeInformation = Types.PRIMITIVE_ARRAY(Types.BYTE); + assertEquals(expectedTypeInformation, actualTypeInformation); + } + + @Test + public void shouldReturnByteArrayForTransformFromProto() { + Struct struct = Struct.newBuilder() + .putFields("structKey1", Value.newBuilder().setStringValue("structValue").build()) + .putFields("structKey2", Value.newBuilder().setNumberValue(23.0).build()) + .build(); + byte[] expectedBytes = struct.toByteArray(); + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + + DynamicMessage structAsDynamicMessage = DynamicMessage.newBuilder(fieldDescriptor.getMessageType()) + .mergeFrom(struct) + .build(); + DynamicMessage dynamicMessage = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()) + .setField(fieldDescriptor, structAsDynamicMessage) + .build(); + + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + + // when + Object fieldValue = dynamicMessage.getField(fieldDescriptor); + byte[] actualBytes1 = (byte[]) handler.transformFromProto(fieldValue); + + // when called using cache - as the implementation is same so asserting here itself + byte[] actualBytes2 = (byte[]) handler.transformFromProtoUsingCache(fieldValue, null); + + assertArrayEquals(expectedBytes, actualBytes1); + assertArrayEquals(expectedBytes, actualBytes2); + } + + @Test + public void shouldReturnTheBuilderSettingByteArrayValue() { + Struct struct = Struct.newBuilder() + .putFields("structKey1", Value.newBuilder().setStringValue("structValue").build()) + .putFields("structKey2", Value.newBuilder().setNumberValue(23.0).build()) + .build(); + byte[] byteArray = struct.toByteArray(); + + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); + + // when + DynamicMessage.Builder resultBuilder = handler.transformToProtoBuilder(builder, byteArray); + + assertTrue(resultBuilder.hasField(fieldDescriptor)); + + Object fieldValue = resultBuilder.getField(fieldDescriptor); + assertTrue(fieldValue instanceof DynamicMessage); + + DynamicMessage parsedMessage = (DynamicMessage) fieldValue; + + // parsed struct is NOT empty + assertFalse(parsedMessage.getAllFields().isEmpty()); + + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L53 field name is "fields" + Descriptors.FieldDescriptor fieldsDescriptor = parsedMessage.getDescriptorForType().findFieldByName("fields"); + + List fields = (List) parsedMessage.getField(fieldsDescriptor); + + // convert map entries to java map for easier assertions + Map structFields = new HashMap<>(); + + for (DynamicMessage entry : fields) { + String key = (String) entry.getField(entry.getDescriptorForType().findFieldByName("key")); + DynamicMessage value = (DynamicMessage) entry.getField(entry.getDescriptorForType().findFieldByName("value")); + structFields.put(key, value); + } + + // assert keys exist + assertTrue(structFields.containsKey("structKey1")); + assertTrue(structFields.containsKey("structKey2")); + + // assert values + DynamicMessage value1 = structFields.get("structKey1"); + DynamicMessage value2 = structFields.get("structKey2"); + + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L70 + Descriptors.FieldDescriptor stringValueFd = value1.getDescriptorForType().findFieldByName("string_value"); + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L68 + Descriptors.FieldDescriptor numberValueFd = value2.getDescriptorForType().findFieldByName("number_value"); + + assertEquals("structValue", value1.getField(stringValueFd)); + assertEquals(23.0, value2.getField(numberValueFd)); + } + + // Not implemented but adding basic test cases for test coverage + + @Test + public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup().named("TestGroupType"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + assertNull(handler.transformFromParquet(simpleGroup)); + } + + @Test + public void shouldReturnAsIsForTransformForPostProcessor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + byte[] input = "test".getBytes(); + assertEquals(handler.transformFromPostProcessor(input), input); + } +} diff --git a/dagger-common/src/test/proto/TestLogMessage.proto b/dagger-common/src/test/proto/TestLogMessage.proto index 216e3c6a2..91f068cf3 100644 --- a/dagger-common/src/test/proto/TestLogMessage.proto +++ b/dagger-common/src/test/proto/TestLogMessage.proto @@ -119,6 +119,12 @@ message TestBookingLogMessage { repeated float float_array_field = 73; repeated int64 long_array_field = 74; + + google.protobuf.Value tag = 75; + + google.protobuf.ListValue tags = 76; + + repeated google.protobuf.Value labels = 77; } message TestPaymentOptionMetadata { From 8c169ff3650a58c7082fe5a2c6738d205cd9790e Mon Sep 17 00:00:00 2001 From: rajuGT Date: Mon, 2 Feb 2026 17:39:00 +0700 Subject: [PATCH 5/6] fix-tests: added 3 new fields in TestBookingLogMessage proto which caused the tests to fail which are fixed in this comment --- .../dagger/common/core/FieldDescriptorCacheTest.java | 2 +- .../proto/deserialization/ProtoDeserializerTest.java | 4 ++-- .../serde/proto/deserialization/ProtoTypeTest.java | 4 ++-- .../common/serde/typehandler/RowFactoryTest.java | 11 +++++------ .../function/functions/JsonPayloadFunctionTest.java | 2 +- 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java index ebfb96dcf..e3b1bbb96 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java @@ -37,7 +37,7 @@ public void shouldReturnOriginalFieldIndex() { @Test public void shouldReturnOriginalFieldCount() { FieldDescriptorCache fieldDescriptorCache = new FieldDescriptorCache(TestBookingLogMessage.getDescriptor()); - assertEquals(49, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor())); + assertEquals(52, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor())); } @Test diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java index ac938b170..991227be0 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java @@ -88,7 +88,7 @@ public void shouldAddExtraFieldsToRow() { Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes)); int size = row.getArity(); - assertEquals(51, size); + assertEquals(54, size); assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2)); assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime()); } @@ -290,7 +290,7 @@ public void shouldAddExtraFieldsToRowWhenStencilAutoRefreshEnabled() { Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes)); int size = row.getArity(); - assertEquals(51, size); + assertEquals(54, size); assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2)); assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime()); } diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java index 1b32dcb8d..1bb808812 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java @@ -131,8 +131,8 @@ public void shouldProcessArrayForStringData() { @Test public void shouldGiveAllNamesAndTypesIncludingStructFields() { ProtoType clevertapMessageProtoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator); - assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length); - assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length); + assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length); + assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length); } @Test diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java index 2f44f1c68..906a1ff18 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java @@ -40,11 +40,10 @@ public void shouldCreateRowForInputMap() { inputMap.put("created_at", "2016-01-18T08:55:26.16Z"); Row row = RowFactory.createRow(inputMap, descriptor); - Row expectedRow = new Row(49); + Row expectedRow = new Row(52); expectedRow.setField(5, "144614"); expectedRow.setField(6, "https://www.abcd.com/1234"); assertEquals(expectedRow, row); - } @Test @@ -52,7 +51,7 @@ public void shouldReturnAEmptyRowOfSizeEqualToNoOfFieldsInDescriptorForInputMap( Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Map inputMap = new HashMap<>(); Row row = RowFactory.createRow(inputMap, descriptor); - assertEquals(new Row(49), row); + assertEquals(new Row(52), row); } @Test @@ -74,7 +73,7 @@ public void shouldCreateRowWithPassedFieldsForInputMap() { public void shouldReturnEmptyRowIfNullPassedAsMapForInputMap() { Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Row row = RowFactory.createRow(null, descriptor); - assertEquals(new Row(49), row); + assertEquals(new Row(52), row); } @Test @@ -83,7 +82,7 @@ public void shouldCreateRowForDynamicMessage() throws InvalidProtocolBufferExcep DynamicMessage dynamicMessage = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), customerLogMessage.toByteArray()); Row row = RowFactory.createRow(dynamicMessage); assertNotNull(row); - assertEquals(49, row.getArity()); + assertEquals(52, row.getArity()); } @Test @@ -135,7 +134,7 @@ public void shouldCreateRowUsingCacheForDynamicMessage() throws InvalidProtocolB Row row = RowFactory.createRow(dynamicMessage, fieldDescriptorCache); assertNotNull(row); - assertEquals(49, row.getArity()); + assertEquals(52, row.getArity()); } @Test diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java index da606653a..ae77b2d7d 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java @@ -142,7 +142,7 @@ public void shouldThrowExceptionWhenStencilClientIsNull() { public void shouldGetJsonPayloadAsResult() { JsonPayloadFunction jsonPayloadFunction = new JsonPayloadFunction(commonInternalSourceConfig, commonSchemaConfig); - String expectedJsonPayload = "{\"service_type\":\"UNKNOWN\",\"order_number\":\"\",\"order_url\":\"\",\"status\":\"UNKNOWN\",\"event_timestamp\":{\"seconds\":0,\"nanos\":0},\"customer_id\":\"\",\"customer_url\":\"\",\"driver_id\":\"\",\"driver_url\":\"\",\"activity_source\":\"\",\"service_area_id\":\"\",\"amount_paid_by_cash\":0.0,\"driver_pickup_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"driver_dropoff_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"customer_email\":\"\",\"customer_name\":\"\",\"customer_phone\":\"\",\"driver_email\":\"\",\"driver_name\":\"\",\"driver_phone\":\"\",\"cancel_reason_id\":0,\"cancel_reason_description\":\"\",\"booking_creation_time\":{\"seconds\":0,\"nanos\":0},\"total_customer_discount\":0.0,\"gopay_customer_discount\":0.0,\"voucher_customer_discount\":0.0,\"pickup_time\":{\"seconds\":0,\"nanos\":0},\"driver_paid_in_cash\":0.0,\"driver_paid_in_credit\":0.0,\"vehicle_type\":\"UNKNOWN\",\"customer_total_fare_without_surge\":0,\"customer_dynamic_surge_enabled\":false,\"driver_total_fare_without_surge\":0,\"driver_dynamic_surge_enabled\":false,\"meta_array\":[],\"profile_data\":null,\"event_properties\":null,\"key_values\":null,\"cash_amount\":0.0,\"int_array_field\":[],\"metadata\":[],\"payment_option_metadata\":{\"masked_card\":\"\",\"network\":\"\"},\"test_enums\":[],\"routes\":[],\"customer_price\":0.0,\"boolean_array_field\":[],\"double_array_field\":[],\"float_array_field\":[],\"long_array_field\":[]}"; + String expectedJsonPayload = "{\"service_type\":\"UNKNOWN\",\"order_number\":\"\",\"order_url\":\"\",\"status\":\"UNKNOWN\",\"event_timestamp\":{\"seconds\":0,\"nanos\":0},\"customer_id\":\"\",\"customer_url\":\"\",\"driver_id\":\"\",\"driver_url\":\"\",\"activity_source\":\"\",\"service_area_id\":\"\",\"amount_paid_by_cash\":0.0,\"driver_pickup_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"driver_dropoff_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"customer_email\":\"\",\"customer_name\":\"\",\"customer_phone\":\"\",\"driver_email\":\"\",\"driver_name\":\"\",\"driver_phone\":\"\",\"cancel_reason_id\":0,\"cancel_reason_description\":\"\",\"booking_creation_time\":{\"seconds\":0,\"nanos\":0},\"total_customer_discount\":0.0,\"gopay_customer_discount\":0.0,\"voucher_customer_discount\":0.0,\"pickup_time\":{\"seconds\":0,\"nanos\":0},\"driver_paid_in_cash\":0.0,\"driver_paid_in_credit\":0.0,\"vehicle_type\":\"UNKNOWN\",\"customer_total_fare_without_surge\":0,\"customer_dynamic_surge_enabled\":false,\"driver_total_fare_without_surge\":0,\"driver_dynamic_surge_enabled\":false,\"meta_array\":[],\"profile_data\":null,\"event_properties\":null,\"key_values\":null,\"cash_amount\":0.0,\"int_array_field\":[],\"metadata\":[],\"payment_option_metadata\":{\"masked_card\":\"\",\"network\":\"\"},\"test_enums\":[],\"routes\":[],\"customer_price\":0.0,\"boolean_array_field\":[],\"double_array_field\":[],\"float_array_field\":[],\"long_array_field\":[],\"tag\":null,\"tags\":null,\"labels\":null}"; String actualJsonPayload = (String) jsonPayloadFunction.getResult(commonRowManager); assertEquals(expectedJsonPayload, actualJsonPayload); } From 08d48a0090d52b7b7350c10219bad257b72ff840 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Mon, 2 Feb 2026 20:13:16 +0700 Subject: [PATCH 6/6] comments refactor --- .../complex/GoogleProtobufComplexMessageHandler.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java index d72c382f8..9076d6b98 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java @@ -55,18 +55,13 @@ public Object transformFromProto(Object field) { return null; } - // Struct / Value default instance or empty if (field instanceof DynamicMessage) { DynamicMessage msg = (DynamicMessage) field; - - // CRITICAL: field not actually set if (msg.getAllFields().isEmpty()) { return null; } - return msg.toByteArray(); } - return null; }