diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index 12c966be7..7114cf829 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -105,11 +105,11 @@ public JobBuilder registerSourceWithPreProcessors() { DataStream dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay)); StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark()); - DataStream rowSingleOutputStreamOperator = streamWatermarkAssigner + DataStream dataStreamWithWaterMark = streamWatermarkAssigner .assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark); TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType()); - StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames()); + StreamInfo streamInfo = new StreamInfo(dataStreamWithWaterMark, tableSchema.getFieldNames()); streamInfo = addPreProcessor(streamInfo, tableName); Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));