New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset #11829
Conversation
I am wondering if we should make this public API (and add to the KIP?)
Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?
@@ -1092,13 +1102,55 @@ public boolean commitRequested() { | |||
return commitRequested; | |||
} | |||
|
|||
static String encodeTimestampAndProcessorMetadata(final TopicPartition partition, | |||
final long partitionTime, final ProcessorMetadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting/indention
public class ProcessorMetadata { | ||
|
||
// Does this need to be thread safe? I think not since there's one per task | ||
private final Map<Bytes, byte[]> globalMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I can follow? Why is this a key-value pair? And why are the types <Bytes, byte[]>
?
The OffsetAndMetadata
interface takes a String metadata
argument. That is why we encode the current streamTime
we store as metadata as String
encoded using base64.
If we assume that there might be multiple processors inside a task, I understand that we need a mapping from processor name to metadata, but the map should be <String,String>
(or maybe <String,Long>
)?
And we would encode it as concatenation of <streamTime><numberOfEntries><key,value><key,value>
with key being the processor name, and value being the encoded timestamp (if we use Long
in the map, we convert the long to base64 internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a key-value pair
My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.
And why are the types <Bytes, byte[]>
Why not <String, Long>
? I think Long
is not flexible enough. For emit final use case, Long
is fine.
Why not <String, String>
? I think it's more steps for processor to serialize it to String
in some cases. e.g. For Long
, processor needs to serialize it to byte[]
and then String
? So why not pass byte[]
directly and serialize them finally to String
for OffsetAndMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be pre-mature optimization to make it generic. I would stick to just what we need. That is also why I think <String,Long>
might just be good enough as we intend to only use it for "emit final".
So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?
My point is, that we should avoid to change the format/serialization twice, but only once. Either we accept long
and let the runtime do long -> String
directly. Or we just use String
and the runtime has nothing to do but to concatenate string, and the processor needs to do the long -> String
conversion. In both cases it's a single translation step. Introducing byte[]
add a second translation step: what would be the advantage of having two steps instead of one?
globalMetadata = new HashMap<>(); | ||
} | ||
|
||
public static ProcessorMetadata deserialize(final byte[] ProcessorMetadata, final TopicPartition partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we pass in TopicPartition
? Tasks are per partition already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. TopicPartition
can be dropped.
return null; | ||
} | ||
|
||
public void merge(final ProcessorMetadata other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of merge
? It seems it would be sufficient for a processor to just set it's own metadata?
The runtime can actually add the name of the processor internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For globalMetadata
, I'm thinking this should be committed to all TopicPartition
. We need to merge the metadata from all TopicPartition
to get correct globalMetadata
in case some commits fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be committed to all TopicPartition
Sound like we would commit it redundantly? Why store the same metadata for multiple partitions?
We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?
Not sure if I can follow. If the commit fails, we would fall back to the previous offset including the previous metadata, right?
// TODO: merge with other data | ||
} | ||
|
||
public byte[] serialize(final TopicPartition partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in StreamTask
line 1107 for serializing this object to byte[]
before put into OffsetAndMetadata
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) { | ||
final TopicPartition partition = entry.getKey(); | ||
final OffsetAndMetadata metadata = entry.getValue(); | ||
|
||
if (metadata != null) { | ||
final long committedTimestamp = decodeTimestamp(metadata.metadata()); | ||
final KeyValue<Long, ProcessorMetadata> committedTimestampAndMeta = decodeTimestampAndMeta(metadata.metadata(), partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to add a proper POJO maybe StreamsMetadata
or something that wraps the streamTime
Long plus ProcessorMetadata
instead of using KeyValue
? We might add new fields later on what is easier to do for a new POJO.
|
||
return KeyValue.pair(timestamp, metadata); | ||
} | ||
|
||
long decodeTimestamp(final String encryptedString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this will be removed as it's replace by decodeTimestampAndMeta
that can handle old and new format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
I am wondering if we should make this public API (and add to the KIP?)
Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?
I'm thinking of creating internal interface to reduce the scope of the KIP. Yeah, we should design it in a way for easy public API support
public class ProcessorMetadata { | ||
|
||
// Does this need to be thread safe? I think not since there's one per task | ||
private final Map<Bytes, byte[]> globalMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a key-value pair
My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.
And why are the types <Bytes, byte[]>
Why not <String, Long>
? I think Long
is not flexible enough. For emit final use case, Long
is fine.
Why not <String, String>
? I think it's more steps for processor to serialize it to String
in some cases. e.g. For Long
, processor needs to serialize it to byte[]
and then String
? So why not pass byte[]
directly and serialize them finally to String
for OffsetAndMetadata
?
// TODO: merge with other data | ||
} | ||
|
||
public byte[] serialize(final TopicPartition partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in StreamTask
line 1107 for serializing this object to byte[]
before put into OffsetAndMetadata
globalMetadata = new HashMap<>(); | ||
} | ||
|
||
public static ProcessorMetadata deserialize(final byte[] ProcessorMetadata, final TopicPartition partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. TopicPartition
can be dropped.
return null; | ||
} | ||
|
||
public void merge(final ProcessorMetadata other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For globalMetadata
, I'm thinking this should be committed to all TopicPartition
. We need to merge the metadata from all TopicPartition
to get correct globalMetadata
in case some commits fail?
|
||
return KeyValue.pair(timestamp, metadata); | ||
} | ||
|
||
long decodeTimestamp(final String encryptedString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
I plan to make changes. Don't review |
...ams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void setProcessorMetadata(final ProcessorMetadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need both "per key" and "full" getters/setters -- could we limit it to only one of both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in restoration to set processor metadata as a whole after deserialization and merging. The per key setter is used by processor. Maybe we can get rid of getProcessorMetadata
if it's not used.
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java
Show resolved
Hide resolved
final long partitionTime = partitionTimes.get(partition); | ||
committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime))); | ||
committableOffsets.put(partition, new OffsetAndMetadata(offset, | ||
TopicPartitionMetadata.with(partitionTime, processorContext.getProcessorMetadata()).encode())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we need to guard this write and make a case decision during rolling upgrade -- it might only be save to use the new format after all instances are upgrade? Or do we consider using the new "emit final" feature a "breaking change" anyway and existing apps can only use it after a reset anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing windowed aggregation not using "emit final" can't upgrade to it since store format is different. Yes, it's a breaking changing if user wants to use it.
@@ -537,6 +542,7 @@ public void closeDirty() { | |||
public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) { | |||
super.updateInputPartitions(topicPartitions, allTopologyNodesToSourceTopics); | |||
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue); | |||
processorContext.getProcessorMetadata().setNeedsCommit(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is setting the commit flag enough? Or would we need to commit the current metadata into the new partition(s) before starting to process any data? Atm, it seems we would only commit to the new partitions on the first commit after data was processed. Wondering if this is a race condition we need to close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If new partitions are added, I think it's ok to not commit immediately since old partitions still have original metadata and they can be fetched/merged.
If there's partition deletion and system crashed before writing to new partitions, metadata will be gone. But I'm not sure if we need to handle that since user deleted old partitions containing metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it might be a rare case. If we think it's not worth it and the risk to lose the metadata is tiny (and it's complicated to force a commit right away) I am fine with not closing this race condition.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopicPartitionMetadata.java
Outdated
Show resolved
Hide resolved
buffer.put(LATEST_MAGIC_BYTE); | ||
buffer.putLong(partitionTime); | ||
buffer.put(serializedMeta); | ||
return Base64.getEncoder().encodeToString(buffer.array()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still wondering, why we encode processorMetadata
first as byte[]
and convert it to String
using Base64 here? Why not convert the Map
to String
directly`? The key is already String, so we would only use Base64 to convert the Long-values to String and concatenate all kv-pairs in some format (eg, CSV?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is after converting single partitionTime to String, the length is not deterministic and we need other meta to encode it. Also I don't want to use Base64 multiple times.
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorMetadataTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
Outdated
Show resolved
Hide resolved
...s/src/test/java/org/apache/kafka/streams/processor/internals/TopicPartitionMetadataTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopicPartitionMetadata.java
Outdated
Show resolved
Hide resolved
LGTM. We just need a Jira for KIP-825 before we can merge this PR.
Thanks @mjsax ! Created Jira: https://issues.apache.org/jira/browse/KAFKA-13785 |
Add processor metadata to be committed with offset to broker. Adding this so that we can store last processed window time gracefully.
Committer Checklist (excluded from commit message)