diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java index c7f3cdaba..b9120189b 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandler.java @@ -17,6 +17,7 @@ public interface TypeHandler { */ boolean canHandle(); + // ---------- Flink -> Proto ---------- /** * Transform to protobuf message builder. * @@ -34,6 +35,8 @@ public interface TypeHandler { */ Object transformFromPostProcessor(Object field); + + // ---------- Proto -> Flink ---------- /** * Transform from protobuf message. * diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java index 2599f06bc..852155260 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java @@ -1,15 +1,16 @@ package com.gotocompany.dagger.common.serde.typehandler; import com.google.protobuf.Descriptors; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler; +import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; +import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; +import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; +import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +65,7 @@ private static List getSpecificHandlers(Descriptors.FieldDescriptor new MapHandler(fieldDescriptor), new TimestampHandler(fieldDescriptor), new EnumHandler(fieldDescriptor), + new GoogleProtobufComplexMessageHandler(fieldDescriptor), new StructMessageHandler(fieldDescriptor), new RepeatedStructMessageHandler(fieldDescriptor), new RepeatedPrimitiveHandler(fieldDescriptor), diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java new file mode 100644 index 000000000..9076d6b98 --- /dev/null +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java @@ -0,0 +1,107 @@ +package com.gotocompany.dagger.common.serde.typehandler.complex; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.gotocompany.dagger.common.core.FieldDescriptorCache; +import com.gotocompany.dagger.common.serde.typehandler.TypeHandler; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.parquet.example.data.simple.SimpleGroup; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A TypeHandler to handle some of the complex Google Protobuf message types + * that are dynamic and recursive in nature. + *

+ * github-link + *

+ * Struct is primarily used to represent JSON object types. + * Value can represent any primitive, a Struct, or an array type. + * ListValue represents a JSON array type. + * NullValue is an enum used to represent null. + *

+ * This implementation converts these message types to Protobuf's byte-array + * representation. While outputting the data, the byte array is converted back + * to the original structure using the associated field descriptor. + */ +public class GoogleProtobufComplexMessageHandler implements TypeHandler { + + private static final Set RECOGNIZED_COMPLEX_TYPES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "google.protobuf.Struct", + "google.protobuf.Value", + "google.protobuf.ListValue", + "google.protobuf.NullValue" + ))); + + private final Descriptors.FieldDescriptor fieldDescriptor; + + public GoogleProtobufComplexMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) { + this.fieldDescriptor = fieldDescriptor; + } + + @Override + public boolean canHandle() { + return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE + && RECOGNIZED_COMPLEX_TYPES.contains(fieldDescriptor.getMessageType().getFullName()); + } + + @Override + public Object transformFromProto(Object field) { + if (field == null) { + return null; + } + + if (field instanceof DynamicMessage) { + DynamicMessage msg = (DynamicMessage) field; + if (msg.getAllFields().isEmpty()) { + return null; + } + return msg.toByteArray(); + } + return null; + } + + @Override + public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) { + return transformFromProto(field); + } + + @Override + public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) { + if (!canHandle() || field == null) { + return builder; + } + + try { + DynamicMessage parsed = DynamicMessage.parseFrom(fieldDescriptor.getMessageType(), (byte[]) field); + builder.setField(fieldDescriptor, parsed); + return builder; + } catch (Exception e) { + throw new RuntimeException("Failed to parse protobuf bytes for field: " + fieldDescriptor.getFullName(), e); + } + } + + @Override + public Object transformFromPostProcessor(Object field) { + return field; + } + + @Override + public Object transformFromParquet(SimpleGroup simpleGroup) { + return null; + } + + @Override + public Object transformToJson(Object field) { + return null; + } + + @Override + public TypeInformation getTypeInformation() { + return Types.PRIMITIVE_ARRAY(Types.BYTE); + } +} diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java index ebfb96dcf..e3b1bbb96 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/core/FieldDescriptorCacheTest.java @@ -37,7 +37,7 @@ public void shouldReturnOriginalFieldIndex() { @Test public void shouldReturnOriginalFieldCount() { FieldDescriptorCache fieldDescriptorCache = new FieldDescriptorCache(TestBookingLogMessage.getDescriptor()); - assertEquals(49, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor())); + assertEquals(52, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor())); } @Test diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java index ac938b170..991227be0 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializerTest.java @@ -88,7 +88,7 @@ public void shouldAddExtraFieldsToRow() { Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes)); int size = row.getArity(); - assertEquals(51, size); + assertEquals(54, size); assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2)); assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime()); } @@ -290,7 +290,7 @@ public void shouldAddExtraFieldsToRowWhenStencilAutoRefreshEnabled() { Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes)); int size = row.getArity(); - assertEquals(51, size); + assertEquals(54, size); assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2)); assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime()); } diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java index 81aac42fe..1bb808812 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoTypeTest.java @@ -6,6 +6,7 @@ import com.gotocompany.dagger.consumer.TestBookingLogMessage; import com.gotocompany.dagger.consumer.TestNestedRepeatedMessage; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; @@ -130,16 +131,16 @@ public void shouldProcessArrayForStringData() { @Test public void shouldGiveAllNamesAndTypesIncludingStructFields() { ProtoType clevertapMessageProtoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator); - assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length); - assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length); + assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length); + assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length); } @Test public void shouldReturnRowTypeForStructFields() { ProtoType protoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]); - assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]); assertEquals("profile_data", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[35]); assertEquals("event_properties", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[36]); assertEquals("key_values", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[37]); @@ -155,7 +156,7 @@ public void shouldGiveAllNamesAndTypesIncludingPrimitiveArrayFields() { public void shouldGiveNameAndTypeForRepeatingStructType() { ProtoType testNestedRepeatedMessage = new ProtoType(TestNestedRepeatedMessage.class.getName(), "rowtime", stencilClientOrchestrator); assertEquals("metadata", ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldNames()[4]); - assertEquals(OBJECT_ARRAY(ROW()), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]); + assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]); } private int bookingLogFieldIndex(String propertyName) { diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java index 2f44f1c68..906a1ff18 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/RowFactoryTest.java @@ -40,11 +40,10 @@ public void shouldCreateRowForInputMap() { inputMap.put("created_at", "2016-01-18T08:55:26.16Z"); Row row = RowFactory.createRow(inputMap, descriptor); - Row expectedRow = new Row(49); + Row expectedRow = new Row(52); expectedRow.setField(5, "144614"); expectedRow.setField(6, "https://www.abcd.com/1234"); assertEquals(expectedRow, row); - } @Test @@ -52,7 +51,7 @@ public void shouldReturnAEmptyRowOfSizeEqualToNoOfFieldsInDescriptorForInputMap( Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Map inputMap = new HashMap<>(); Row row = RowFactory.createRow(inputMap, descriptor); - assertEquals(new Row(49), row); + assertEquals(new Row(52), row); } @Test @@ -74,7 +73,7 @@ public void shouldCreateRowWithPassedFieldsForInputMap() { public void shouldReturnEmptyRowIfNullPassedAsMapForInputMap() { Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Row row = RowFactory.createRow(null, descriptor); - assertEquals(new Row(49), row); + assertEquals(new Row(52), row); } @Test @@ -83,7 +82,7 @@ public void shouldCreateRowForDynamicMessage() throws InvalidProtocolBufferExcep DynamicMessage dynamicMessage = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), customerLogMessage.toByteArray()); Row row = RowFactory.createRow(dynamicMessage); assertNotNull(row); - assertEquals(49, row.getArity()); + assertEquals(52, row.getArity()); } @Test @@ -135,7 +134,7 @@ public void shouldCreateRowUsingCacheForDynamicMessage() throws InvalidProtocolB Row row = RowFactory.createRow(dynamicMessage, fieldDescriptorCache); assertNotNull(row); - assertEquals(49, row.getArity()); + assertEquals(52, row.getArity()); } @Test diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java index 155b5b05f..70b00ac05 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactoryTest.java @@ -4,14 +4,13 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler; +import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler; -import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler; import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler; -import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler; import com.gotocompany.dagger.consumer.TestBookingLogMessage; import com.gotocompany.dagger.consumer.TestFeedbackLogMessage; import com.gotocompany.dagger.consumer.TestGrpcResponse; @@ -107,14 +106,14 @@ public void shouldReturnRepeatedEnumHandlerIfRepeatedEnumFieldDescriptorPassed() public void shouldReturnRepeatedStructHandlerIfRepeatedStructFieldDescriptorPassed() { Descriptors.FieldDescriptor repeatedStructFieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("metadata"); TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(repeatedStructFieldDescriptor); - assertEquals(RepeatedStructMessageHandler.class, typeHandler.getClass()); + assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass()); } @Test public void shouldReturnStructHandlerIfStructFieldDescriptorPassed() { Descriptors.FieldDescriptor structFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(structFieldDescriptor); - assertEquals(StructMessageHandler.class, typeHandler.getClass()); + assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass()); } @Test diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java new file mode 100644 index 000000000..04ae42085 --- /dev/null +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandlerTest.java @@ -0,0 +1,169 @@ +package com.gotocompany.dagger.common.serde.typehandler.complex; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import com.gotocompany.dagger.consumer.TestBookingLogMessage; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.schema.GroupType; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class GoogleProtobufComplexMessageHandlerTest { + + @Test + public void shouldReturnTrueForCanHandleForStructFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForValueFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("tag"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForListValueFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("tags"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnTrueForCanHandleForRepeatedFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("labels"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertTrue(handler.canHandle()); + } + + @Test + public void shouldReturnFalseForCanHandleForMessageFieldDescriptor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("driver_pickup_location"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + assertFalse(handler.canHandle()); + } + + @Test + public void shouldReturnTypeInformation() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + TypeInformation actualTypeInformation = handler.getTypeInformation(); + TypeInformation expectedTypeInformation = Types.PRIMITIVE_ARRAY(Types.BYTE); + assertEquals(expectedTypeInformation, actualTypeInformation); + } + + @Test + public void shouldReturnByteArrayForTransformFromProto() { + Struct struct = Struct.newBuilder() + .putFields("structKey1", Value.newBuilder().setStringValue("structValue").build()) + .putFields("structKey2", Value.newBuilder().setNumberValue(23.0).build()) + .build(); + byte[] expectedBytes = struct.toByteArray(); + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + + DynamicMessage structAsDynamicMessage = DynamicMessage.newBuilder(fieldDescriptor.getMessageType()) + .mergeFrom(struct) + .build(); + DynamicMessage dynamicMessage = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()) + .setField(fieldDescriptor, structAsDynamicMessage) + .build(); + + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + + // when + Object fieldValue = dynamicMessage.getField(fieldDescriptor); + byte[] actualBytes1 = (byte[]) handler.transformFromProto(fieldValue); + + // when called using cache - as the implementation is same so asserting here itself + byte[] actualBytes2 = (byte[]) handler.transformFromProtoUsingCache(fieldValue, null); + + assertArrayEquals(expectedBytes, actualBytes1); + assertArrayEquals(expectedBytes, actualBytes2); + } + + @Test + public void shouldReturnTheBuilderSettingByteArrayValue() { + Struct struct = Struct.newBuilder() + .putFields("structKey1", Value.newBuilder().setStringValue("structValue").build()) + .putFields("structKey2", Value.newBuilder().setNumberValue(23.0).build()) + .build(); + byte[] byteArray = struct.toByteArray(); + + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); + + // when + DynamicMessage.Builder resultBuilder = handler.transformToProtoBuilder(builder, byteArray); + + assertTrue(resultBuilder.hasField(fieldDescriptor)); + + Object fieldValue = resultBuilder.getField(fieldDescriptor); + assertTrue(fieldValue instanceof DynamicMessage); + + DynamicMessage parsedMessage = (DynamicMessage) fieldValue; + + // parsed struct is NOT empty + assertFalse(parsedMessage.getAllFields().isEmpty()); + + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L53 field name is "fields" + Descriptors.FieldDescriptor fieldsDescriptor = parsedMessage.getDescriptorForType().findFieldByName("fields"); + + List fields = (List) parsedMessage.getField(fieldsDescriptor); + + // convert map entries to java map for easier assertions + Map structFields = new HashMap<>(); + + for (DynamicMessage entry : fields) { + String key = (String) entry.getField(entry.getDescriptorForType().findFieldByName("key")); + DynamicMessage value = (DynamicMessage) entry.getField(entry.getDescriptorForType().findFieldByName("value")); + structFields.put(key, value); + } + + // assert keys exist + assertTrue(structFields.containsKey("structKey1")); + assertTrue(structFields.containsKey("structKey2")); + + // assert values + DynamicMessage value1 = structFields.get("structKey1"); + DynamicMessage value2 = structFields.get("structKey2"); + + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L70 + Descriptors.FieldDescriptor stringValueFd = value1.getDescriptorForType().findFieldByName("string_value"); + // https://github.com/protocolbuffers/protobuf/blob/d124c2dc26841e5ee0b8d1505438fcf0660c9db0/src/google/protobuf/struct.proto#L68 + Descriptors.FieldDescriptor numberValueFd = value2.getDescriptorForType().findFieldByName("number_value"); + + assertEquals("structValue", value1.getField(stringValueFd)); + assertEquals(23.0, value2.getField(numberValueFd)); + } + + // Not implemented but adding basic test cases for test coverage + + @Test + public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup().named("TestGroupType"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + assertNull(handler.transformFromParquet(simpleGroup)); + } + + @Test + public void shouldReturnAsIsForTransformForPostProcessor() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data"); + GoogleProtobufComplexMessageHandler handler = new GoogleProtobufComplexMessageHandler(fieldDescriptor); + byte[] input = "test".getBytes(); + assertEquals(handler.transformFromPostProcessor(input), input); + } +} diff --git a/dagger-common/src/test/proto/TestLogMessage.proto b/dagger-common/src/test/proto/TestLogMessage.proto index 216e3c6a2..91f068cf3 100644 --- a/dagger-common/src/test/proto/TestLogMessage.proto +++ b/dagger-common/src/test/proto/TestLogMessage.proto @@ -119,6 +119,12 @@ message TestBookingLogMessage { repeated float float_array_field = 73; repeated int64 long_array_field = 74; + + google.protobuf.Value tag = 75; + + google.protobuf.ListValue tags = 76; + + repeated google.protobuf.Value labels = 77; } message TestPaymentOptionMetadata { diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 3105a8d71..256d668af 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -29,7 +29,7 @@ def flinkVersion = rootProject.flinkVersion version = rootProject.file('version.txt').text.trim() def minimalVersion = version -def dependenciesVersion = "0.5.3" +def dependenciesVersion = "0.5.4" description = """dagger to the heart!""" diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java index da606653a..ae77b2d7d 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/internal/processor/function/functions/JsonPayloadFunctionTest.java @@ -142,7 +142,7 @@ public void shouldThrowExceptionWhenStencilClientIsNull() { public void shouldGetJsonPayloadAsResult() { JsonPayloadFunction jsonPayloadFunction = new JsonPayloadFunction(commonInternalSourceConfig, commonSchemaConfig); - String expectedJsonPayload = "{\"service_type\":\"UNKNOWN\",\"order_number\":\"\",\"order_url\":\"\",\"status\":\"UNKNOWN\",\"event_timestamp\":{\"seconds\":0,\"nanos\":0},\"customer_id\":\"\",\"customer_url\":\"\",\"driver_id\":\"\",\"driver_url\":\"\",\"activity_source\":\"\",\"service_area_id\":\"\",\"amount_paid_by_cash\":0.0,\"driver_pickup_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"driver_dropoff_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"customer_email\":\"\",\"customer_name\":\"\",\"customer_phone\":\"\",\"driver_email\":\"\",\"driver_name\":\"\",\"driver_phone\":\"\",\"cancel_reason_id\":0,\"cancel_reason_description\":\"\",\"booking_creation_time\":{\"seconds\":0,\"nanos\":0},\"total_customer_discount\":0.0,\"gopay_customer_discount\":0.0,\"voucher_customer_discount\":0.0,\"pickup_time\":{\"seconds\":0,\"nanos\":0},\"driver_paid_in_cash\":0.0,\"driver_paid_in_credit\":0.0,\"vehicle_type\":\"UNKNOWN\",\"customer_total_fare_without_surge\":0,\"customer_dynamic_surge_enabled\":false,\"driver_total_fare_without_surge\":0,\"driver_dynamic_surge_enabled\":false,\"meta_array\":[],\"profile_data\":null,\"event_properties\":null,\"key_values\":null,\"cash_amount\":0.0,\"int_array_field\":[],\"metadata\":[],\"payment_option_metadata\":{\"masked_card\":\"\",\"network\":\"\"},\"test_enums\":[],\"routes\":[],\"customer_price\":0.0,\"boolean_array_field\":[],\"double_array_field\":[],\"float_array_field\":[],\"long_array_field\":[]}"; + String expectedJsonPayload = "{\"service_type\":\"UNKNOWN\",\"order_number\":\"\",\"order_url\":\"\",\"status\":\"UNKNOWN\",\"event_timestamp\":{\"seconds\":0,\"nanos\":0},\"customer_id\":\"\",\"customer_url\":\"\",\"driver_id\":\"\",\"driver_url\":\"\",\"activity_source\":\"\",\"service_area_id\":\"\",\"amount_paid_by_cash\":0.0,\"driver_pickup_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"driver_dropoff_location\":{\"name\":\"\",\"address\":\"\",\"latitude\":0.0,\"longitude\":0.0,\"type\":\"\",\"note\":\"\",\"place_id\":\"\",\"accuracy_meter\":0.0,\"gate_id\":\"\"},\"customer_email\":\"\",\"customer_name\":\"\",\"customer_phone\":\"\",\"driver_email\":\"\",\"driver_name\":\"\",\"driver_phone\":\"\",\"cancel_reason_id\":0,\"cancel_reason_description\":\"\",\"booking_creation_time\":{\"seconds\":0,\"nanos\":0},\"total_customer_discount\":0.0,\"gopay_customer_discount\":0.0,\"voucher_customer_discount\":0.0,\"pickup_time\":{\"seconds\":0,\"nanos\":0},\"driver_paid_in_cash\":0.0,\"driver_paid_in_credit\":0.0,\"vehicle_type\":\"UNKNOWN\",\"customer_total_fare_without_surge\":0,\"customer_dynamic_surge_enabled\":false,\"driver_total_fare_without_surge\":0,\"driver_dynamic_surge_enabled\":false,\"meta_array\":[],\"profile_data\":null,\"event_properties\":null,\"key_values\":null,\"cash_amount\":0.0,\"int_array_field\":[],\"metadata\":[],\"payment_option_metadata\":{\"masked_card\":\"\",\"network\":\"\"},\"test_enums\":[],\"routes\":[],\"customer_price\":0.0,\"boolean_array_field\":[],\"double_array_field\":[],\"float_array_field\":[],\"long_array_field\":[],\"tag\":null,\"tags\":null,\"labels\":null}"; String actualJsonPayload = (String) jsonPayloadFunction.getResult(commonRowManager); assertEquals(expectedJsonPayload, actualJsonPayload); } diff --git a/version.txt b/version.txt index 34a83616b..26acbf080 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.1 +0.12.2