Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface TypeHandler {
*/
boolean canHandle();

// ---------- Flink -> Proto ----------
/**
* Transform to protobuf message builder.
*
Expand All @@ -34,6 +35,8 @@ public interface TypeHandler {
*/
Object transformFromPostProcessor(Object field);


// ---------- Proto -> Flink ----------
/**
* Transform from protobuf message.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.gotocompany.dagger.common.serde.typehandler;

import com.google.protobuf.Descriptors;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

Expand Down Expand Up @@ -64,6 +65,7 @@ private static List<TypeHandler> getSpecificHandlers(Descriptors.FieldDescriptor
new MapHandler(fieldDescriptor),
new TimestampHandler(fieldDescriptor),
new EnumHandler(fieldDescriptor),
new GoogleProtobufComplexMessageHandler(fieldDescriptor),
new StructMessageHandler(fieldDescriptor),
new RepeatedStructMessageHandler(fieldDescriptor),
new RepeatedPrimitiveHandler(fieldDescriptor),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.gotocompany.dagger.common.serde.typehandler.complex;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.gotocompany.dagger.common.core.FieldDescriptorCache;
import com.gotocompany.dagger.common.serde.typehandler.TypeHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.parquet.example.data.simple.SimpleGroup;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* A TypeHandler to handle some of the complex Google Protobuf message types
* that are dynamic and recursive in nature.
* <p>
* <a href="https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/struct.proto">github-link</a>
* <p>
* Struct is primarily used to represent JSON object types.
* Value can represent any primitive, a Struct, or an array type.
* ListValue represents a JSON array type.
* NullValue is an enum used to represent null.
* <p>
* This implementation converts these message types to Protobuf's byte-array
* representation. While outputting the data, the byte array is converted back
* to the original structure using the associated field descriptor.
*/
public class GoogleProtobufComplexMessageHandler implements TypeHandler {

private static final Set<String> RECOGNIZED_COMPLEX_TYPES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"google.protobuf.Struct",
"google.protobuf.Value",
"google.protobuf.ListValue",
"google.protobuf.NullValue"
)));

private final Descriptors.FieldDescriptor fieldDescriptor;

public GoogleProtobufComplexMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}

@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE
&& RECOGNIZED_COMPLEX_TYPES.contains(fieldDescriptor.getMessageType().getFullName());
}

@Override
public Object transformFromProto(Object field) {
if (field == null) {
return null;
}

if (field instanceof DynamicMessage) {
DynamicMessage msg = (DynamicMessage) field;
if (msg.getAllFields().isEmpty()) {
return null;
}
return msg.toByteArray();
}
return null;
}

@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return transformFromProto(field);
}

@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
return builder;
}

try {
DynamicMessage parsed = DynamicMessage.parseFrom(fieldDescriptor.getMessageType(), (byte[]) field);
builder.setField(fieldDescriptor, parsed);
return builder;
} catch (Exception e) {
throw new RuntimeException("Failed to parse protobuf bytes for field: " + fieldDescriptor.getFullName(), e);
}
}

@Override
public Object transformFromPostProcessor(Object field) {
return field;
}

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
}

@Override
public Object transformToJson(Object field) {
return null;
}

@Override
public TypeInformation getTypeInformation() {
return Types.PRIMITIVE_ARRAY(Types.BYTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void shouldReturnOriginalFieldIndex() {
@Test
public void shouldReturnOriginalFieldCount() {
FieldDescriptorCache fieldDescriptorCache = new FieldDescriptorCache(TestBookingLogMessage.getDescriptor());
assertEquals(49, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor()));
assertEquals(52, fieldDescriptorCache.getOriginalFieldCount(TestBookingLogMessage.getDescriptor()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void shouldAddExtraFieldsToRow() {
Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes));

int size = row.getArity();
assertEquals(51, size);
assertEquals(54, size);
assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2));
assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime());
}
Expand Down Expand Up @@ -290,7 +290,7 @@ public void shouldAddExtraFieldsToRowWhenStencilAutoRefreshEnabled() {
Row row = protoDeserializer.deserialize(new ConsumerRecord<>("test-topic", 0, 0, null, protoBytes));

int size = row.getArity();
assertEquals(51, size);
assertEquals(54, size);
assertTrue("Didn't add field at the penultimate index", (Boolean) row.getField(size - 2));
assertEquals(1595548800000L, ((java.sql.Timestamp) row.getField(size - 1)).getTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.gotocompany.dagger.consumer.TestBookingLogMessage;
import com.gotocompany.dagger.consumer.TestNestedRepeatedMessage;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -130,16 +131,16 @@ public void shouldProcessArrayForStringData() {
@Test
public void shouldGiveAllNamesAndTypesIncludingStructFields() {
ProtoType clevertapMessageProtoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator);
assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length);
assertEquals(51, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length);
assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldNames().length);
assertEquals(54, ((RowTypeInfo) clevertapMessageProtoType.getRowType()).getFieldTypes().length);
}

@Test
public void shouldReturnRowTypeForStructFields() {
ProtoType protoType = new ProtoType(TestBookingLogMessage.class.getName(), "rowtime", stencilClientOrchestrator);
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]);
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]);
assertEquals(ROW(), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]);
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[35]);
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[36]);
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) protoType.getRowType()).getFieldTypes()[37]);
assertEquals("profile_data", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[35]);
assertEquals("event_properties", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[36]);
assertEquals("key_values", ((RowTypeInfo) protoType.getRowType()).getFieldNames()[37]);
Expand All @@ -155,7 +156,7 @@ public void shouldGiveAllNamesAndTypesIncludingPrimitiveArrayFields() {
public void shouldGiveNameAndTypeForRepeatingStructType() {
ProtoType testNestedRepeatedMessage = new ProtoType(TestNestedRepeatedMessage.class.getName(), "rowtime", stencilClientOrchestrator);
assertEquals("metadata", ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldNames()[4]);
assertEquals(OBJECT_ARRAY(ROW()), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]);
assertEquals(Types.PRIMITIVE_ARRAY(Types.BYTE), ((RowTypeInfo) testNestedRepeatedMessage.getRowType()).getFieldTypes()[4]);
}

private int bookingLogFieldIndex(String propertyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ public void shouldCreateRowForInputMap() {
inputMap.put("created_at", "2016-01-18T08:55:26.16Z");
Row row = RowFactory.createRow(inputMap, descriptor);

Row expectedRow = new Row(49);
Row expectedRow = new Row(52);
expectedRow.setField(5, "144614");
expectedRow.setField(6, "https://www.abcd.com/1234");
assertEquals(expectedRow, row);

}

@Test
public void shouldReturnAEmptyRowOfSizeEqualToNoOfFieldsInDescriptorForInputMap() {
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
Map<String, Object> inputMap = new HashMap<>();
Row row = RowFactory.createRow(inputMap, descriptor);
assertEquals(new Row(49), row);
assertEquals(new Row(52), row);
}

@Test
Expand All @@ -74,7 +73,7 @@ public void shouldCreateRowWithPassedFieldsForInputMap() {
public void shouldReturnEmptyRowIfNullPassedAsMapForInputMap() {
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
Row row = RowFactory.createRow(null, descriptor);
assertEquals(new Row(49), row);
assertEquals(new Row(52), row);
}

@Test
Expand All @@ -83,7 +82,7 @@ public void shouldCreateRowForDynamicMessage() throws InvalidProtocolBufferExcep
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(TestBookingLogMessage.getDescriptor(), customerLogMessage.toByteArray());
Row row = RowFactory.createRow(dynamicMessage);
assertNotNull(row);
assertEquals(49, row.getArity());
assertEquals(52, row.getArity());
}

@Test
Expand Down Expand Up @@ -135,7 +134,7 @@ public void shouldCreateRowUsingCacheForDynamicMessage() throws InvalidProtocolB

Row row = RowFactory.createRow(dynamicMessage, fieldDescriptorCache);
assertNotNull(row);
assertEquals(49, row.getArity());
assertEquals(52, row.getArity());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.gotocompany.dagger.common.serde.typehandler.complex.EnumHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.GoogleProtobufComplexMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.MapHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.MessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.StructMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.complex.TimestampHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
import com.gotocompany.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;
import com.gotocompany.dagger.consumer.TestBookingLogMessage;
import com.gotocompany.dagger.consumer.TestFeedbackLogMessage;
import com.gotocompany.dagger.consumer.TestGrpcResponse;
Expand Down Expand Up @@ -107,14 +106,14 @@ public void shouldReturnRepeatedEnumHandlerIfRepeatedEnumFieldDescriptorPassed()
public void shouldReturnRepeatedStructHandlerIfRepeatedStructFieldDescriptorPassed() {
Descriptors.FieldDescriptor repeatedStructFieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("metadata");
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(repeatedStructFieldDescriptor);
assertEquals(RepeatedStructMessageHandler.class, typeHandler.getClass());
assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass());
}

@Test
public void shouldReturnStructHandlerIfStructFieldDescriptorPassed() {
Descriptors.FieldDescriptor structFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("profile_data");
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(structFieldDescriptor);
assertEquals(StructMessageHandler.class, typeHandler.getClass());
assertEquals(GoogleProtobufComplexMessageHandler.class, typeHandler.getClass());
}

@Test
Expand Down
Loading