The Wayback Machine - https://web.archive.org/web/20220409163853/https://github.com/apache/kafka/pull/11829
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset #11829

Merged
merged 5 commits into from Mar 31, 2022

Conversation

Copy link
Contributor

@lihaosky lihaosky commented Mar 2, 2022

Add processor metadata to be committed with offset to broker. Adding this so that we can store last processed window time gracefully.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lihaosky lihaosky changed the title [RFC][3/N] add processor metadata to be committed with offset [RFC][2/N][emit final] add processor metadata to be committed with offset Mar 3, 2022
Copy link
Member

@mjsax mjsax left a comment

I am wondering if we should make this public API (and add to the KIP?)

Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?

@@ -1092,13 +1102,55 @@ public boolean commitRequested() {
return commitRequested;
}

static String encodeTimestampAndProcessorMetadata(final TopicPartition partition,
final long partitionTime, final ProcessorMetadata metadata) {
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting/indention

public class ProcessorMetadata {

// Does this need to be thread safe? I think not since there's one per task
private final Map<Bytes, byte[]> globalMetadata;
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow? Why is this a key-value pair? And why are the types <Bytes, byte[]>?

The OffsetAndMetadata interface takes a String metadata argument. That is why we encode the current streamTime we store as metadata as String encoded using base64.

If we assume that there might be multiple processors inside a task, I understand that we need a mapping from processor name to metadata, but the map should be <String,String> (or maybe <String,Long>)?

And we would encode it as concatenation of <streamTime><numberOfEntries><key,value><key,value> with key being the processor name, and value being the encoded timestamp (if we use Long in the map, we convert the long to base64 internally?

Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a key-value pair

My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.

And why are the types <Bytes, byte[]>

Why not <String, Long>? I think Long is not flexible enough. For emit final use case, Long is fine.
Why not <String, String>? I think it's more steps for processor to serialize it to String in some cases. e.g. For Long, processor needs to serialize it to byte[] and then String? So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

Copy link
Member

@mjsax mjsax Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be pre-mature optimization to make it generic. I would stick to just what we need. That is also why I think <String,Long> might just be good enough as we intend to only use it for "emit final".

So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

My point is, that we should avoid to change the format/serialization twice, but only once. Either we accept long and let the runtime do long -> String directly. Or we just use String and the runtime has nothing to do but to concatenate string, and the processor needs to do the long -> String conversion. In both cases it's a single translation step. Introducing byte[] add a second translation step: what would be the advantage of having two steps instead of one?

globalMetadata = new HashMap<>();
}

public static ProcessorMetadata deserialize(final byte[] ProcessorMetadata, final TopicPartition partition) {
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we pass in TopicPartition? Tasks are per partition already?

Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. TopicPartition can be dropped.

return null;
}

public void merge(final ProcessorMetadata other) {
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of merge ? It seems it would be sufficient for a processor to just set it's own metadata?

The runtime can actually add the name of the processor internally?

Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For globalMetadata, I'm thinking this should be committed to all TopicPartition. We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?

Copy link
Member

@mjsax mjsax Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be committed to all TopicPartition

Sound like we would commit it redundantly? Why store the same metadata for multiple partitions?

We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?

Not sure if I can follow. If the commit fails, we would fall back to the previous offset including the previous metadata, right?

// TODO: merge with other data
}

public byte[] serialize(final TopicPartition partition) {
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in StreamTask line 1107 for serializing this object to byte[] before put into OffsetAndMetadata

for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
final TopicPartition partition = entry.getKey();
final OffsetAndMetadata metadata = entry.getValue();

if (metadata != null) {
final long committedTimestamp = decodeTimestamp(metadata.metadata());
final KeyValue<Long, ProcessorMetadata> committedTimestampAndMeta = decodeTimestampAndMeta(metadata.metadata(), partition);
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to add a proper POJO maybe StreamsMetadata or something that wraps the streamTime Long plus ProcessorMetadata instead of using KeyValue ? We might add new fields later on what is easier to do for a new POJO.


return KeyValue.pair(timestamp, metadata);
}

long decodeTimestamp(final String encryptedString) {
Copy link
Member

@mjsax mjsax Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this will be removed as it's replace by decodeTimestampAndMeta that can handle old and new format?

Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

Copy link
Contributor Author

@lihaosky lihaosky left a comment

I am wondering if we should make this public API (and add to the KIP?)
Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?

I'm thinking of creating internal interface to reduce the scope of the KIP. Yeah, we should design it in a way for easy public API support

public class ProcessorMetadata {

// Does this need to be thread safe? I think not since there's one per task
private final Map<Bytes, byte[]> globalMetadata;
Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a key-value pair

My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.

And why are the types <Bytes, byte[]>

Why not <String, Long>? I think Long is not flexible enough. For emit final use case, Long is fine.
Why not <String, String>? I think it's more steps for processor to serialize it to String in some cases. e.g. For Long, processor needs to serialize it to byte[] and then String? So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

// TODO: merge with other data
}

public byte[] serialize(final TopicPartition partition) {
Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in StreamTask line 1107 for serializing this object to byte[] before put into OffsetAndMetadata

globalMetadata = new HashMap<>();
}

public static ProcessorMetadata deserialize(final byte[] ProcessorMetadata, final TopicPartition partition) {
Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. TopicPartition can be dropped.

return null;
}

public void merge(final ProcessorMetadata other) {
Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For globalMetadata, I'm thinking this should be committed to all TopicPartition. We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?


return KeyValue.pair(timestamp, metadata);
}

long decodeTimestamp(final String encryptedString) {
Copy link
Contributor Author

@lihaosky lihaosky Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

@lihaosky
Copy link
Contributor Author

@lihaosky lihaosky commented Mar 24, 2022

I plan to make changes. Don't review

@lihaosky lihaosky changed the title [RFC][2/N][emit final] add processor metadata to be committed with offset [2/N][emit final] add processor metadata to be committed with offset Mar 29, 2022
}

@Override
public void setProcessorMetadata(final ProcessorMetadata metadata) {
Copy link
Member

@mjsax mjsax Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both "per key" and "full" getters/setters -- could we limit it to only one of both?

Copy link
Contributor Author

@lihaosky lihaosky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in restoration to set processor metadata as a whole after deserialization and merging. The per key setter is used by processor. Maybe we can get rid of getProcessorMetadata if it's not used.

final long partitionTime = partitionTimes.get(partition);
committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
committableOffsets.put(partition, new OffsetAndMetadata(offset,
TopicPartitionMetadata.with(partitionTime, processorContext.getProcessorMetadata()).encode()));
Copy link
Member

@mjsax mjsax Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need to guard this write and make a case decision during rolling upgrade -- it might only be save to use the new format after all instances are upgrade? Or do we consider using the new "emit final" feature a "breaking change" anyway and existing apps can only use it after a reset anyway?

Copy link
Contributor Author

@lihaosky lihaosky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing windowed aggregation not using "emit final" can't upgrade to it since store format is different. Yes, it's a breaking changing if user wants to use it.

@@ -537,6 +542,7 @@ public void closeDirty() {
public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
super.updateInputPartitions(topicPartitions, allTopologyNodesToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
processorContext.getProcessorMetadata().setNeedsCommit(true);
Copy link
Member

@mjsax mjsax Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting the commit flag enough? Or would we need to commit the current metadata into the new partition(s) before starting to process any data? Atm, it seems we would only commit to the new partitions on the first commit after data was processed. Wondering if this is a race condition we need to close?

Copy link
Contributor Author

@lihaosky lihaosky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If new partitions are added, I think it's ok to not commit immediately since old partitions still have original metadata and they can be fetched/merged.

If there's partition deletion and system crashed before writing to new partitions, metadata will be gone. But I'm not sure if we need to handle that since user deleted old partitions containing metadata.

Copy link
Member

@mjsax mjsax Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it might be a rare case. If we think it's not worth it and the risk to lose the metadata is tiny (and it's complicated to force a commit right away) I am fine with not closing this race condition.

buffer.put(LATEST_MAGIC_BYTE);
buffer.putLong(partitionTime);
buffer.put(serializedMeta);
return Base64.getEncoder().encodeToString(buffer.array());
Copy link
Member

@mjsax mjsax Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wondering, why we encode processorMetadata first as byte[] and convert it to String using Base64 here? Why not convert the Map to String directly`? The key is already String, so we would only use Base64 to convert the Long-values to String and concatenate all kv-pairs in some format (eg, CSV?)

Copy link
Contributor Author

@lihaosky lihaosky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is after converting single partitionTime to String, the length is not deterministic and we need other meta to encode it. Also I don't want to use Base64 multiple times.

@mjsax mjsax added streams kip labels Mar 31, 2022
Copy link
Member

@mjsax mjsax left a comment

LGTM. We just need a Jira for KIP-825 before we can merge this PR.

@lihaosky
Copy link
Contributor Author

@lihaosky lihaosky commented Mar 31, 2022

@lihaosky lihaosky changed the title [2/N][emit final] add processor metadata to be committed with offset KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset Mar 31, 2022
@mjsax mjsax merged commit 6b2a0bc into apache:trunk Mar 31, 2022
3 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip streams
Projects
None yet
2 participants