Skip to content
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Add support for tuples and tuples with collections or UDTs in the Bulk writer (CASSANALYTICS-51)
* Setup CI Pipeline with GitHub Actions (CASSANALYTICS-106)
* Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,55 @@ public boolean containsUdt(String fieldName)
return columnsWithUdts.contains(fieldName);
}

/**
* Checks if a type is a tuple type (possibly wrapped in frozen)
* @param type the CQL type to check
* @return true if the type is a tuple or tuple inside a frozen
*/
public static boolean isTupleType(CqlField.CqlType type)
{
return unwrapIfFrozen(type) instanceof CqlField.CqlTuple;
}

/**
* Checks if a collection type contains tuples
* @param type the CQL type to check
* @return true if the type is a collection containing tuples
*/
public static boolean containsTuples(CqlField.CqlType type)
{
CqlField.CqlType unwrapped = unwrapIfFrozen(type);

if (unwrapped instanceof CqlField.CqlList || unwrapped instanceof CqlField.CqlSet)
{
CqlField.CqlCollection collection = (CqlField.CqlCollection) unwrapped;
return isTupleType(collection.type()) || containsTuples(collection.type());
}

if (unwrapped instanceof CqlField.CqlMap)
{
CqlField.CqlMap map = (CqlField.CqlMap) unwrapped;
return isTupleType(map.keyType()) || containsTuples(map.keyType())
|| isTupleType(map.valueType()) || containsTuples(map.valueType());
}

return false;
}

/**
* Unwraps frozen wrapper if present
* @param type the type to unwrap
* @return the inner type if frozen, otherwise the original type
*/
public static CqlField.CqlType unwrapIfFrozen(CqlField.CqlType type)
{
if (type instanceof CqlField.CqlFrozen)
{
return ((CqlField.CqlFrozen) type).inner();
}
return type;
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,47 @@ private void writeRow(Tuple2<DecoratedKey, Object[]> keyAndRowData,
private Map<String, Object> getBindValuesForColumns(Map<String, Object> map, String[] columnNames, Object[] values)
{
Preconditions.checkArgument(values.length == columnNames.length,
"Number of values does not match the number of columns " + values.length + ", " + columnNames.length);
"Number of values does not match the number of columns " + values.length + ", " + columnNames.length);

for (int i = 0; i < columnNames.length; i++)
{
if (cqlTable.containsUdt(columnNames[i]))
String columnName = columnNames[i];
Object columnValue = values[i];

// convert all BridgeUDTValue types in the column to UDTValue
if (cqlTable.containsUdt(columnName))
{
map.put(columnNames[i], maybeConvertUdt(values[i]));
columnValue = maybeConvertUdt(columnValue);
}
else

// Convert tuples to TupleValue for CQLSSTableWriter
// - Direct tuple columns: Object[] → TupleValue
// - Collections with tuples: List<Object[]> → List<TupleValue> (via collection's convertForCqlWriter)
CqlField field = cqlTable.getField(columnName);
if (field != null && columnValue != null)
{
map.put(columnNames[i], values[i]);
CqlField.CqlType fieldType = field.type();

// Check if this is a direct tuple
if (CqlTable.isTupleType(fieldType))
{
CqlField.CqlTuple cqlTuple = (CqlField.CqlTuple) CqlTable.unwrapIfFrozen(fieldType);
columnValue = cqlTuple.convertForCqlWriter(columnValue, writerContext.bridge().getVersion(), false);
}
// Check if this is a collection that contains tuples
else if (CqlTable.containsTuples(fieldType))
{
CqlField.CqlType collectionType = CqlTable.unwrapIfFrozen(fieldType);
if (collectionType instanceof CqlField.CqlCollection)
{
columnValue = ((CqlField.CqlCollection) collectionType).convertForCqlWriter(columnValue, writerContext.bridge().getVersion(), false);
}
}
}

map.put(columnName, columnValue);
}

return map;
}

Expand All @@ -414,6 +442,19 @@ private Object maybeConvertUdt(Object value)
return resultList;
}

// Tuples come here as Object[]
if (value instanceof Object[])
{
Object[] valueArray = (Object[]) value;
Object[] resultArray = new Object[valueArray.length];
for (int i = 0; i < valueArray.length; i++)
{
resultArray[i] = maybeConvertUdt(valueArray[i]);
}

return resultArray;
}

if (value instanceof Set && !((Set<?>) value).isEmpty())
{
Set<Object> resultSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ public static Converter<?> getConverter(CqlField.CqlType cqlType)
case SET:
return new SetConverter<>((CqlField.CqlCollection) cqlType);
case TUPLE:
if (cqlType.internalType() == CqlField.CqlType.InternalType.Tuple)
{
assert cqlType instanceof CqlField.CqlTuple;
return new TupleConverter((CqlField.CqlTuple) cqlType);
}
LOGGER.warn("Unable to match type={}. Defaulting to NoOp Converter", cqlName);
return NO_OP_CONVERTER;
default:
if (cqlType.internalType() == CqlField.CqlType.InternalType.Udt)
Expand Down Expand Up @@ -879,4 +885,51 @@ private Map<String, Object> makeUdtMap(GenericRowWithSchema row)
return result;
}
}

public static class TupleConverter extends NullableConverter<Object[]>
{
private final List<Converter<?>> converters;

TupleConverter(CqlField.CqlTuple cqlTuple)
{
// Each field of Tuple can be of different type
// hence, prepare list of converters
this.converters = new ArrayList<>();
for (CqlField.CqlType type : cqlTuple.types())
{
converters.add(getConverter(type));
}
}

@Override
public Object[] convertInternal(Object object)
{
if (object instanceof org.apache.spark.sql.Row)
{
// Handle generic Row type (may come from nested structures)
return makeTupleFromRow((org.apache.spark.sql.Row) object);
}

throw new RuntimeException("Unsupported conversion for Tuple from " + object.getClass().getTypeName());
}

@Override
public String toString()
{
return "Tuple";
}

private Object[] makeTupleFromRow(org.apache.spark.sql.Row row)
{
Object[] result = new Object[row.size()];
for (int i = 0; i < row.size(); i++)
{
Converter<?> converter = converters.get(i);
Object val = row.isNullAt(i) ? null : row.get(i);
// Recursively convert inner values (handles nested UDTs, Tuples, Collections)
result[i] = val == null ? null : converter.convert(val);
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public static CqlField.CqlCollection mockListCqlType(String collectionCqlType)
return mockCollectionCqlType(LIST, mockCqlType(collectionCqlType));
}

@NotNull
public static CqlField.CqlCollection mockListCqlType(CqlField.CqlType collectionType)
{
return mockCollectionCqlType(LIST, collectionType);
}

@NotNull
public static CqlField.CqlCollection mockCollectionCqlType(String cqlName, CqlField.CqlType collectionType)
{
Expand All @@ -111,6 +117,16 @@ public static CqlField.CqlCollection mockCollectionCqlType(String cqlName, CqlFi
return mock;
}

@NotNull
public static CqlField.CqlTuple mockTupleCqlType(List<CqlField.CqlType> types)
{
CqlField.CqlTuple mock = mock(CqlField.CqlTuple.class);
when(mock.name()).thenReturn(SqlToCqlTypeConverter.TUPLE);
when(mock.internalType()).thenReturn(CqlField.CqlType.InternalType.Tuple);
when(mock.types()).thenReturn(types);
return mock;
}

@NotNull
public static CqlField.CqlType mockMapCqlType(String keyCqlName, String valueCqlName)
{
Expand Down
Loading
Loading