added support for deaggregation for aggregated records#62
added support for deaggregation for aggregated records#62psanzay wants to merge 2 commits intotwitchscience:masterfrom
Conversation
|
|
||
| records = output.Records | ||
| records, err = deaggregator.DeaggregateRecords(output.Records) | ||
| if err != nil { |
There was a problem hiding this comment.
Not sure about swallowing this error. What are the possible errors from deaggregator?
There was a problem hiding this comment.
@garethlewin deaggregator throws error in case if it fails to unmarshal the aggregated record.
err := proto.Unmarshal(messageData, aggRecord) if err != nil { return nil, err }
Should we not return the actual payload without deaggregation in case of such error, as it can be possible the records have been aggregated using some custom logic not via amazon's aggregation format, so for those scenarios we should return the records as pushed. It should be up to user to deaggregate them in that case.
|
Hi Sorry I haven't been ignoring this, I'm just at a bit of a analysis paralysis option here. This change would make #49 very difficult (or more accurately #49 makes this more difficult). I am really also not sure how to handle erroneous situations. As I see it there are 3 options, and I dislike all 3: A) On error just send in the entire blob, this means clients now have to anticipate this happening and deal with the situation, which means that they have to be aware of deaggregation. B) On error swallow the record. This means data will be dropped, this seems very bad. C) On error return an error from kinsumer and error. The problem with this is that a checkpoint won't be created (or we are basically back to option B) ) and thus kinsumer will never be able to handle that shard again until the record expires off it. I am wondering what the benefits of implicit deaggregation are here vs having the clients do it on their side (which is what we do at Twitch, but then we use our own aggregation method and not the one that KCL supplies. |
Created pull request template to comply with SOC2.
This PR enables support for deaggregation of the records if the stream has aggregated records.