KAFKA-20212: Manage offsets in RocksDBStore#21578
KAFKA-20212: Manage offsets in RocksDBStore#21578eduwercamacaro wants to merge 9 commits intoapache:trunkfrom
Conversation
| public void commit(final DBAccessor accessor) throws RocksDBException { | ||
| accessor.flush(oldColumnFamily, newColumnFamily); | ||
| } |
There was a problem hiding this comment.
One of the primary goals of KIP-1035 is to avoid having to explicitly flush RocksDB, because doing so creates unnecessarily small tables on-disk, which then have to be compacted.
We can completely remove this method, but only if we set dbOptions.setAtomicFlush(true) in openDB (anywhere below the configSetter.setConfig line is fine, so that users can't accidentally override it).
Atomic Flush ensures that RocksDB will always flush all Column Families atomically, even when doing automatic flushes in the background (which is what we want).
This ensures that the records in the data column families will always correspond to the offsets in the offsets column family.
There was a problem hiding this comment.
Yes, you are right about that.
Since the code currently using the RocksdbStore depends on the flush mechanism every 10K records, I wrote this PR to maintain the same behavior, which is flushing memtables on every commit. I think we would need to change the ProcessorStateManager and the other section of the code that makes use of this mechanism to eliminate all of the flush calls from this StateStore.
I think all these code changes do nothing to the current behavior. It only adds the new CF on the state stores and prepares this store to manage the offsets.
If you prefer, we can implement that in this PR; however, it will complicate the review process because we would need to make modifications on the ProcessorStateManager and, I guess, in other parts of the code.
Let me know if there is anything I'm missing from the KIP-1035 vision, and I'll be happy to make the required adjustments.
There was a problem hiding this comment.
Ahh yes, I see what you're saying. There's no regression because we're currently flushing on commit. That's fine. I do think we should add the dbOptions.setAtomicFlush(true) in this PR though, as it's intrinsically linked to the offsets management, and it should (currently) have zero effect (because we're already flushing all CFs together), but it definitely will matter in the future, and I'd rather we didn't forget to add it then!
There was a problem hiding this comment.
Totally, that's a very important thing to discuss and one of the main goals of this KIP.
I just realized that setAtomicFlush(true) needs all column families to be flushed at the same time with dbAccessor.flush(...). But with the code I have now, I'm flushing the data and offset CF separately. That seems like something I should change.
There was a problem hiding this comment.
I just added setAtomicFlush(true) and the necessary adjustments on this commit. Thank you for pointing that out :)
| final byte[] value = longSerde.serializer().serialize(null, offset); | ||
| accessor.put(offsetColumnFamilyHandle, key, value); | ||
| } | ||
| accessor.flush(offsetColumnFamilyHandle); |
There was a problem hiding this comment.
We should never explicitly flush memtables.
| @Override | ||
| @SuppressWarnings("deprecation") | ||
| public boolean managesOffsets() { | ||
| return true; |
There was a problem hiding this comment.
I think we'll need to keep this as false until KIP-892 lands, because until we're able to buffer writes between commits, there's no way to guarantee that the committed offsets reflect the records written to the database.
To elaborate: between commits, new records are written to RocksDB. We can't guarantee when those records will be written to disk by RocksDB (due to background flushes), so if the application crashes between commits, some of the records on disk might be newer than the most recently written offsets; this is a problem even with atomic flush.
I still think we want this code, but we can't actually use it until KIP-892 lands, sadly
There was a problem hiding this comment.
The key issue is that, at present, under EOS we detect crashes and wipe state by looking for the absence of the .checkpoint file. Obviously with offsets stored in the store itself, we can't use this strategy.
I wonder if we can emulate it somehow by having ProcessorStateManager write a DIRTY file during initialization, and only deleting it during a clean shutdown? That should resolve this issue, and enable us to use StateStore managed offsets prior to KIP-892 landing.
There was a problem hiding this comment.
Honestly, I'm confused here.
I don't think that ProcessorStateManager needs to check if the offsets stored in the store are correct. This is because we would lose both the data and the offsets that were in the memtables if the application crashed before flushing all the records to disk. This means that when the instance starts up again, it will begin with the offsets that were saved successfully on the disk.
To put it another way, if store.managesOffsets() returns false, we could invoke the checkpoint file validation just as it is right now. And, for stores that manage the offsets, PSM could just trust that the offsets are correct, so it doesn't need to throw TaskCorruptedException anymore unless there is an exception when getting the offsets from the store.
There was a problem hiding this comment.
This is because we would lose both the data and the offsets that were in the memtables if the application crashed before flushing all the records to disk. This means that when the instance starts up again, it will begin with the offsets that were saved successfully on the disk.
The problem looks something like this:
commit()called due to commit interval. All written records and offsets persisted to disk, offsets on-disk are in-sync with records on-disk.- Records continue being processed, with various writes to the store (
put,delete, etc.). - RocksDB flushes its memtables automatically. This can't be turned off, it's a fundamental part of the Rocks design. While it's flushing the records and offsets together, the offsets are still the same ones from the previous commit in step 1. Therefore the records on-disk are now newer than the offsets on-disk.
- CRASH!
- During recovery, we read the offsets from the store and find some; we assume the store is not corrupt and proceed normally. Under EOS this is a regression, because we're supposed to wipe the stores if the records on-disk might be newer than the "checkpointed" offsets.
KIP-892 fixes this by staging all writes between commit in a WriteBuffer. When RocksDB flushes its memtables, it doesn't touch these WriteBuffers, so the offsets and records on-disk are always kept in-sync, even when Rocks flushes automatically in the background.
| accessor.commit(dbAccessor, changelogOffsets); | ||
| verify(dbAccessor).flush(offsetsCF); | ||
| verify(dbAccessor, times(2)).flush(any(ColumnFamilyHandle[].class)); | ||
| verify(dbAccessor).put(eq(offsetsCF), eq(topicSerializer.serialize(null, tp0.toString())), eq(offsetSerializer.serialize(null, 10L))); | ||
| verify(dbAccessor).put(eq(offsetsCF), eq(topicSerializer.serialize(null, tp1.toString())), eq(offsetSerializer.serialize(null, 20L))); |
There was a problem hiding this comment.
I think a better test here would be to verify that reading offsets using accessor.getCommittedOffset returns the offsets written using accessor.commit. That way, you're not leaning so heavily on mocking the internal implementation details.
There was a problem hiding this comment.
I really like this idea, but since dbAccessor is a mock, I don't see how we can avoid mocking so much of the internal implementation details. Maybe I should create a dummy DBAccessor for tests? not sure..
| @Test | ||
| public void shouldCommitOffsets() { | ||
| final TopicPartition tp0 = new TopicPartition("topic-0", 0); | ||
| final TopicPartition tp1 = new TopicPartition("topic-1", 0); | ||
| final Map<TopicPartition, Long> offsetsToCommit = Map.of(tp0, 100L, tp1, 200L); | ||
| rocksDBStore = getRocksDBStore(); | ||
| rocksDBStore.init(context, rocksDBStore); | ||
| rocksDBStore.commit(offsetsToCommit); | ||
| rocksDBStore.close(); | ||
| rocksDBStore.init(context, rocksDBStore); | ||
| assertEquals(100L, rocksDBStore.committedOffset(tp0)); | ||
| assertEquals(200L, rocksDBStore.committedOffset(tp1)); | ||
| rocksDBStore.close(); | ||
| } | ||
|
|
There was a problem hiding this comment.
As per my above comment: I don't think we should actually enable using these offsets until KIP-892 lands. So perhaps here we should have a test that verifies that offsets aren't used/stored?
| userSpecifiedOptions.setErrorIfExists(false); | ||
| userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); | ||
| // Supports offset managements: KAFKA-20212 | ||
| userSpecifiedOptions.setAtomicFlush(true); |
There was a problem hiding this comment.
This needs to be moved below line 232, so that users can't (accidentally) override it in their config.
There was a problem hiding this comment.
I see. Thanks
I believe we should handle this in a way that is similar to how we handle WAL configs. In the case of WAL-related configs, we are logging a warning telling that the config will be ignored.
private void logIgnoreWalOption(final String option) {
log.warn("WAL is explicitly disabled by Streams in RocksDB. Setting option '{}' will be ignored", option);
}what do you think?
Enables RocksDBStore to manage offsets.
This PR adds a new ColumnFamily for storing offsets committed by Kafka
Streams.
Unit tests have been added to verify the new behavior.