-
Notifications
You must be signed in to change notification settings - Fork 84
Streaming sample only transfers the asset every fifth modification #417
Description
Bug Report
Describe the Bug
While trying out the streaming http-to-http sample, after the negotiation concluded successfully and the transfer has been started, the files/assets are only transferred once the extension has accumulated 5 modifications.
Expected Behavior
After starting the transfer, as soon as a file is created or an existing file is modified, the file should be transferred to the selected data destination.
Observed Behavior
After starting the transfer, if a file is created or modified once, no asset transfer happens: at the fifth modification (file creation counts as 2 modifications, I assume because first a file is created empty and then content is added) then 5 file transfers happen, one for each modification, and only the newest version of the files are transferred.
Steps to Reproduce
Steps to reproduce the behavior:
- Start a Connector with the streaming extension loaded
- Create a streaming asset, policy, contract, and negotiate then start the transfer
- Modify an existing file in the specified folder or create a new one
- No file transfer is executed
- Perform 5 modifications, to the same file or different ones
- At the fifth modification, 5 file transfer happen
Context Information
Tried with an EDC Connector 0.10.0 on the WSL (Windows 10) and on Ubuntu, in both cases inside docker containers, adding the extension from the samples and also a slightly customized one with the same file structure (extension, factory and data source).
Detailed Description
I was interested in tracking a single file instead of a folder and implemented this modified method:
@Override
public StreamResult<Stream<Part>> openPartStream() {
try {
Stream<Part> stream = openRecordsStream(watchService)
.filter(Objects::nonNull)
.flatMap(it -> it.pollEvents().stream())
.filter(it -> {
// Use the context (Path) of the event and compare with our file’s name.
String eventFileName = it.context().toString();
String sourceFileName = sourceFile.toPath().getFileName().toString();
monitor.debug("Processing event for file " + eventFileName + " (filter: " + sourceFile.getName() + "): " + eventFileName.equals(sourceFileName));
return eventFileName.equals(sourceFileName);
})
.map(it -> it.context().toString())
.map(it -> Paths.get(sourceFolder.toURI()).resolve(it))
.map(StreamingPart::new);
return StreamResult.success(stream);
} catch (Exception e) {
monitor.warning("Failed to open part stream: " + e.getMessage(), e);
return StreamResult.failure(new StreamFailure(List.of(e.getMessage()), StreamFailure.Reason.GENERAL_ERROR));
}
}I added a debug line for each modification, and the line does show up in the log each time I make a change to a file, but still the transfer only happens when 5 cumulative modifications happen:
connector_provider | DEBUG 2025-06-18T10:34:48.093194226 TransferProcess: ID 347a5688-016c-4984-965f-86966d1e6243. send transfer start to http://local_proxy:4443/connector_consumer/protocol
connector_provider | DEBUG 2025-06-18T10:34:48.098634968 [TransferProcessManagerImpl] TransferProcess 347a5688-016c-4984-965f-86966d1e6243 is now in state STARTED
connector_provider | DEBUG 2025-06-18T10:34:48.696326236 DataFlow: ID 347a5688-016c-4984-965f-86966d1e6243. start data flow
connector_provider | DEBUG 2025-06-18T10:34:48.696560269 [File monitoring streaming data source factory] Creating FileMonitoringStreaming data source...
connector_provider | DEBUG 2025-06-18T10:34:48.696652043 [File monitoring streaming data source factory] File /sourcefolder/file_asset.txt is valid.
connector_provider | DEBUG 2025-06-18T10:34:48.700191929 Transferring from FileStreaming to HttpData.
connector_provider | DEBUG 2025-06-18T10:35:04.382790103 [FileMonitoringStreamingDataSource] Processing event for file file_asset.txt (filter: file_asset.txt): true
connector_provider | DEBUG 2025-06-18T10:35:10.59523144 [FileMonitoringStreamingDataSource] Processing event for file file_asset.txt (filter: file_asset.txt): true
connector_provider | DEBUG 2025-06-18T10:35:14.318823737 [FileMonitoringStreamingDataSource] Processing event for file file_asset.txt (filter: file_asset.txt): true
connector_provider | DEBUG 2025-06-18T10:35:19.493022572 [FileMonitoringStreamingDataSource] Processing event for file file_asset.txt (filter: file_asset.txt): true
connector_provider | DEBUG 2025-06-18T10:35:24.086583221 [FileMonitoringStreamingDataSource] Processing event for file file_asset.txt (filter: file_asset.txt): true
It's possible that I missed some configuration where it is possible to set this behaviour, but it looks to me like the Connector caches internally the events until a threshold (of 5, in this case) is hit. Am I missing something to control this or did I not understand the intended usage of the streaming case?