Permalink
* document importance of reader close * Update reader.go Co-authored-by: Jay Wren <[email protected]> Co-authored-by: Achille <[email protected]>
1552 lines (1317 sloc)
43.2 KB
package kafka | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"math" | |
"sort" | |
"strconv" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
const ( | |
LastOffset int64 = -1 // The most recent offset available for a partition. | |
FirstOffset int64 = -2 // The least recent offset available for a partition. | |
) | |
const ( | |
// defaultCommitRetries holds the number commit attempts to make | |
// before giving up | |
defaultCommitRetries = 3 | |
) | |
const ( | |
// defaultFetchMinBytes of 1 byte means that fetch requests are answered as | |
// soon as a single byte of data is available or the fetch request times out | |
// waiting for data to arrive. | |
defaultFetchMinBytes = 1 | |
) | |
var ( | |
errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set") | |
errNotAvailableWithGroup = errors.New("unavailable when GroupID is set") | |
) | |
const ( | |
// defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before | |
// polling for new messages | |
defaultReadBackoffMin = 100 * time.Millisecond | |
defaultReadBackoffMax = 1 * time.Second | |
) | |
// Reader provides a high-level API for consuming messages from kafka. | |
// | |
// A Reader automatically manages reconnections to a kafka server, and | |
// blocking methods have context support for asynchronous cancellations. | |
// | |
// Note that it is important to call `Close()` on a `Reader` when a process exits. | |
// The kafka server needs a graceful disconnect to stop it from continuing to | |
// attempt to send messages to the connected clients. The given example will not | |
// call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or | |
// SIGTERM (as docker stop or a kubernetes restart does). This can result in a | |
// delay when a new reader on the same topic connects (e.g. new process started | |
// or new container running). Use a `signal.Notify` handler to close the reader on | |
// process shutdown. | |
type Reader struct { | |
// immutable fields of the reader | |
config ReaderConfig | |
// communication channels between the parent reader and its subreaders | |
msgs chan readerMessage | |
// mutable fields of the reader (synchronized on the mutex) | |
mutex sync.Mutex | |
join sync.WaitGroup | |
cancel context.CancelFunc | |
stop context.CancelFunc | |
done chan struct{} | |
commits chan commitRequest | |
version int64 // version holds the generation of the spawned readers | |
offset int64 | |
lag int64 | |
closed bool | |
// Without a group subscription (when Reader.config.GroupID == ""), | |
// when errors occur, the Reader gets a synthetic readerMessage with | |
// a non-nil err set. With group subscriptions however, when an error | |
// occurs in Reader.run, there's no reader running (sic, cf. reader vs. | |
// Reader) and there's no way to let the high-level methods like | |
// FetchMessage know that an error indeed occurred. If an error in run | |
// occurs, it will be non-block-sent to this unbuffered channel, where | |
// the high-level methods can select{} on it and notify the caller. | |
runError chan error | |
// reader stats are all made of atomic values, no need for synchronization. | |
once uint32 | |
stctx context.Context | |
// reader stats are all made of atomic values, no need for synchronization. | |
// Use a pointer to ensure 64-bit alignment of the values. | |
stats *readerStats | |
} | |
// useConsumerGroup indicates whether the Reader is part of a consumer group. | |
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" } | |
func (r *Reader) getTopics() []string { | |
if len(r.config.GroupTopics) > 0 { | |
return r.config.GroupTopics[:] | |
} | |
return []string{r.config.Topic} | |
} | |
// useSyncCommits indicates whether the Reader is configured to perform sync or | |
// async commits. | |
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 } | |
func (r *Reader) unsubscribe() { | |
r.cancel() | |
r.join.Wait() | |
// it would be interesting to drain the r.msgs channel at this point since | |
// it will contain buffered messages for partitions that may not be | |
// re-assigned to this reader in the next consumer group generation. | |
// however, draining the channel could race with the client calling | |
// ReadMessage, which could result in messages delivered and/or committed | |
// with gaps in the offset. for now, we will err on the side of caution and | |
// potentially have those messages be reprocessed in the next generation by | |
// another consumer to avoid such a race. | |
} | |
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { | |
offsets := make(map[topicPartition]int64) | |
for topic, assignments := range allAssignments { | |
for _, assignment := range assignments { | |
key := topicPartition{ | |
topic: topic, | |
partition: int32(assignment.ID), | |
} | |
offsets[key] = assignment.Offset | |
} | |
} | |
r.mutex.Lock() | |
r.start(offsets) | |
r.mutex.Unlock() | |
r.withLogger(func(l Logger) { | |
l.Printf("subscribed to topics and partitions: %+v", offsets) | |
}) | |
} | |
func (r *Reader) waitThrottleTime(throttleTimeMS int32) { | |
if throttleTimeMS == 0 { | |
return | |
} | |
t := time.NewTimer(time.Duration(throttleTimeMS) * time.Millisecond) | |
defer t.Stop() | |
select { | |
case <-r.stctx.Done(): | |
return | |
case <-t.C: | |
} | |
} | |
// commitOffsetsWithRetry attempts to commit the specified offsets and retries | |
// up to the specified number of times | |
func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) { | |
const ( | |
backoffDelayMin = 100 * time.Millisecond | |
backoffDelayMax = 5 * time.Second | |
) | |
for attempt := 0; attempt < retries; attempt++ { | |
if attempt != 0 { | |
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) { | |
return | |
} | |
} | |
if err = gen.CommitOffsets(offsetStash); err == nil { | |
return | |
} | |
} | |
return // err will not be nil | |
} | |
// offsetStash holds offsets by topic => partition => offset | |
type offsetStash map[string]map[int]int64 | |
// merge updates the offsetStash with the offsets from the provided messages | |
func (o offsetStash) merge(commits []commit) { | |
for _, c := range commits { | |
offsetsByPartition, ok := o[c.topic] | |
if !ok { | |
offsetsByPartition = map[int]int64{} | |
o[c.topic] = offsetsByPartition | |
} | |
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset { | |
offsetsByPartition[c.partition] = c.offset | |
} | |
} | |
} | |
// reset clears the contents of the offsetStash | |
func (o offsetStash) reset() { | |
for key := range o { | |
delete(o, key) | |
} | |
} | |
// commitLoopImmediate handles each commit synchronously | |
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) { | |
offsets := offsetStash{} | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case req := <-r.commits: | |
offsets.merge(req.commits) | |
req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) | |
offsets.reset() | |
} | |
} | |
} | |
// commitLoopInterval handles each commit asynchronously with a period defined | |
// by ReaderConfig.CommitInterval | |
func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { | |
ticker := time.NewTicker(r.config.CommitInterval) | |
defer ticker.Stop() | |
// the offset stash should not survive rebalances b/c the consumer may | |
// receive new assignments. | |
offsets := offsetStash{} | |
commit := func() { | |
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { | |
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) }) | |
} else { | |
offsets.reset() | |
} | |
} | |
for { | |
select { | |
case <-ctx.Done(): | |
// drain the commit channel in order to prepare the final commit. | |
for hasCommits := true; hasCommits; { | |
select { | |
case req := <-r.commits: | |
offsets.merge(req.commits) | |
default: | |
hasCommits = false | |
} | |
} | |
commit() | |
return | |
case <-ticker.C: | |
commit() | |
case req := <-r.commits: | |
offsets.merge(req.commits) | |
} | |
} | |
} | |
// commitLoop processes commits off the commit chan | |
func (r *Reader) commitLoop(ctx context.Context, gen *Generation) { | |
r.withLogger(func(l Logger) { | |
l.Printf("started commit for group %s\n", r.config.GroupID) | |
}) | |
defer r.withLogger(func(l Logger) { | |
l.Printf("stopped commit for group %s\n", r.config.GroupID) | |
}) | |
if r.config.CommitInterval == 0 { | |
r.commitLoopImmediate(ctx, gen) | |
} else { | |
r.commitLoopInterval(ctx, gen) | |
} | |
} | |
// run provides the main consumer group management loop. Each iteration performs the | |
// handshake to join the Reader to the consumer group. | |
// | |
// This function is responsible for closing the consumer group upon exit. | |
func (r *Reader) run(cg *ConsumerGroup) { | |
defer close(r.done) | |
defer cg.Close() | |
r.withLogger(func(l Logger) { | |
l.Printf("entering loop for consumer group, %v\n", r.config.GroupID) | |
}) | |
for { | |
// Limit the number of attempts at waiting for the next | |
// consumer generation. | |
var err error | |
var gen *Generation | |
for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ { | |
gen, err = cg.Next(r.stctx) | |
if err == nil { | |
break | |
} | |
if err == r.stctx.Err() { | |
return | |
} | |
r.stats.errors.observe(1) | |
r.withErrorLogger(func(l Logger) { | |
l.Printf(err.Error()) | |
}) | |
// Continue with next attempt... | |
} | |
if err != nil { | |
// All attempts have failed. | |
select { | |
case r.runError <- err: | |
// If somebody's receiving on the runError, let | |
// them know the error occurred. | |
default: | |
// Otherwise, don't block to allow healing. | |
} | |
continue | |
} | |
r.stats.rebalances.observe(1) | |
r.subscribe(gen.Assignments) | |
gen.Start(func(ctx context.Context) { | |
r.commitLoop(ctx, gen) | |
}) | |
gen.Start(func(ctx context.Context) { | |
// wait for the generation to end and then unsubscribe. | |
select { | |
case <-ctx.Done(): | |
// continue to next generation | |
case <-r.stctx.Done(): | |
// this will be the last loop because the reader is closed. | |
} | |
r.unsubscribe() | |
}) | |
} | |
} | |
// ReaderConfig is a configuration object used to create new instances of | |
// Reader. | |
type ReaderConfig struct { | |
// The list of broker addresses used to connect to the kafka cluster. | |
Brokers []string | |
// GroupID holds the optional consumer group id. If GroupID is specified, then | |
// Partition should NOT be specified e.g. 0 | |
GroupID string | |
// GroupTopics allows specifying multiple topics, but can only be used in | |
// combination with GroupID, as it is a consumer-group feature. As such, if | |
// GroupID is set, then either Topic or GroupTopics must be defined. | |
GroupTopics []string | |
// The topic to read messages from. | |
Topic string | |
// Partition to read messages from. Either Partition or GroupID may | |
// be assigned, but not both | |
Partition int | |
// An dialer used to open connections to the kafka server. This field is | |
// optional, if nil, the default dialer is used instead. | |
Dialer *Dialer | |
// The capacity of the internal message queue, defaults to 100 if none is | |
// set. | |
QueueCapacity int | |
// MinBytes indicates to the broker the minimum batch size that the consumer | |
// will accept. Setting a high minimum when consuming from a low-volume topic | |
// may result in delayed delivery when the broker does not have enough data to | |
// satisfy the defined minimum. | |
// | |
// Default: 1 | |
MinBytes int | |
// MaxBytes indicates to the broker the maximum batch size that the consumer | |
// will accept. The broker will truncate a message to satisfy this maximum, so | |
// choose a value that is high enough for your largest message size. | |
// | |
// Default: 1MB | |
MaxBytes int | |
// Maximum amount of time to wait for new data to come when fetching batches | |
// of messages from kafka. | |
// | |
// Default: 10s | |
MaxWait time.Duration | |
// ReadLagInterval sets the frequency at which the reader lag is updated. | |
// Setting this field to a negative value disables lag reporting. | |
ReadLagInterval time.Duration | |
// GroupBalancers is the priority-ordered list of client-side consumer group | |
// balancing strategies that will be offered to the coordinator. The first | |
// strategy that all group members support will be chosen by the leader. | |
// | |
// Default: [Range, RoundRobin] | |
// | |
// Only used when GroupID is set | |
GroupBalancers []GroupBalancer | |
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer | |
// group heartbeat update. | |
// | |
// Default: 3s | |
// | |
// Only used when GroupID is set | |
HeartbeatInterval time.Duration | |
// CommitInterval indicates the interval at which offsets are committed to | |
// the broker. If 0, commits will be handled synchronously. | |
// | |
// Default: 0 | |
// | |
// Only used when GroupID is set | |
CommitInterval time.Duration | |
// PartitionWatchInterval indicates how often a reader checks for partition changes. | |
// If a reader sees a partition change (such as a partition add) it will rebalance the group | |
// picking up new partitions. | |
// | |
// Default: 5s | |
// | |
// Only used when GroupID is set and WatchPartitionChanges is set. | |
PartitionWatchInterval time.Duration | |
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be | |
// polling the brokers and rebalancing if any partition changes happen to the topic. | |
WatchPartitionChanges bool | |
// SessionTimeout optionally sets the length of time that may pass without a heartbeat | |
// before the coordinator considers the consumer dead and initiates a rebalance. | |
// | |
// Default: 30s | |
// | |
// Only used when GroupID is set | |
SessionTimeout time.Duration | |
// RebalanceTimeout optionally sets the length of time the coordinator will wait | |
// for members to join as part of a rebalance. For kafka servers under higher | |
// load, it may be useful to set this value higher. | |
// | |
// Default: 30s | |
// | |
// Only used when GroupID is set | |
RebalanceTimeout time.Duration | |
// JoinGroupBackoff optionally sets the length of time to wait between re-joining | |
// the consumer group after an error. | |
// | |
// Default: 5s | |
JoinGroupBackoff time.Duration | |
// RetentionTime optionally sets the length of time the consumer group will be saved | |
// by the broker | |
// | |
// Default: 24h | |
// | |
// Only used when GroupID is set | |
RetentionTime time.Duration | |
// StartOffset determines from whence the consumer group should begin | |
// consuming when it finds a partition without a committed offset. If | |
// non-zero, it must be set to one of FirstOffset or LastOffset. | |
// | |
// Default: FirstOffset | |
// | |
// Only used when GroupID is set | |
StartOffset int64 | |
// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before | |
// polling for new messages | |
// | |
// Default: 100ms | |
ReadBackoffMin time.Duration | |
// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before | |
// polling for new messages | |
// | |
// Default: 1s | |
ReadBackoffMax time.Duration | |
// If not nil, specifies a logger used to report internal changes within the | |
// reader. | |
Logger Logger | |
// ErrorLogger is the logger used to report errors. If nil, the reader falls | |
// back to using Logger instead. | |
ErrorLogger Logger | |
// IsolationLevel controls the visibility of transactional records. | |
// ReadUncommitted makes all records visible. With ReadCommitted only | |
// non-transactional and committed records are visible. | |
IsolationLevel IsolationLevel | |
// Limit of how many attempts will be made before delivering the error. | |
// | |
// The default is to try 3 times. | |
MaxAttempts int | |
} | |
// Validate method validates ReaderConfig properties. | |
func (config *ReaderConfig) Validate() error { | |
if len(config.Brokers) == 0 { | |
return errors.New("cannot create a new kafka reader with an empty list of broker addresses") | |
} | |
if config.Partition < 0 || config.Partition >= math.MaxInt32 { | |
return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition)) | |
} | |
if config.MinBytes < 0 { | |
return errors.New(fmt.Sprintf("invalid negative minimum batch size (min = %d)", config.MinBytes)) | |
} | |
if config.MaxBytes < 0 { | |
return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes)) | |
} | |
if config.GroupID != "" { | |
if config.Partition != 0 { | |
return errors.New("either Partition or GroupID may be specified, but not both") | |
} | |
if len(config.Topic) == 0 && len(config.GroupTopics) == 0 { | |
return errors.New("either Topic or GroupTopics must be specified with GroupID") | |
} | |
} else if len(config.Topic) == 0 { | |
return errors.New("cannot create a new kafka reader with an empty topic") | |
} | |
if config.MinBytes > config.MaxBytes { | |
return errors.New(fmt.Sprintf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes)) | |
} | |
if config.ReadBackoffMax < 0 { | |
return errors.New(fmt.Sprintf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax)) | |
} | |
if config.ReadBackoffMin < 0 { | |
return errors.New(fmt.Sprintf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin)) | |
} | |
return nil | |
} | |
// ReaderStats is a data structure returned by a call to Reader.Stats that exposes | |
// details about the behavior of the reader. | |
type ReaderStats struct { | |
Dials int64 `metric:"kafka.reader.dial.count" type:"counter"` | |
Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"` | |
Messages int64 `metric:"kafka.reader.message.count" type:"counter"` | |
Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"` | |
Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"` | |
Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"` | |
Errors int64 `metric:"kafka.reader.error.count" type:"counter"` | |
DialTime DurationStats `metric:"kafka.reader.dial.seconds"` | |
ReadTime DurationStats `metric:"kafka.reader.read.seconds"` | |
WaitTime DurationStats `metric:"kafka.reader.wait.seconds"` | |
FetchSize SummaryStats `metric:"kafka.reader.fetch.size"` | |
FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"` | |
Offset int64 `metric:"kafka.reader.offset" type:"gauge"` | |
Lag int64 `metric:"kafka.reader.lag" type:"gauge"` | |
MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"` | |
MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"` | |
MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"` | |
QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"` | |
QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"` | |
ClientID string `tag:"client_id"` | |
Topic string `tag:"topic"` | |
Partition string `tag:"partition"` | |
// The original `Fetches` field had a typo where the metric name was called | |
// "kafak..." instead of "kafka...", in order to offer time to fix monitors | |
// that may be relying on this mistake we are temporarily introducing this | |
// field. | |
DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"` | |
} | |
// readerStats is a struct that contains statistics on a reader. | |
type readerStats struct { | |
dials counter | |
fetches counter | |
messages counter | |
bytes counter | |
rebalances counter | |
timeouts counter | |
errors counter | |
dialTime summary | |
readTime summary | |
waitTime summary | |
fetchSize summary | |
fetchBytes summary | |
offset gauge | |
lag gauge | |
partition string | |
} | |
// NewReader creates and returns a new Reader configured with config. | |
// The offset is initialized to FirstOffset. | |
func NewReader(config ReaderConfig) *Reader { | |
if err := config.Validate(); err != nil { | |
panic(err) | |
} | |
if config.GroupID != "" { | |
if len(config.GroupBalancers) == 0 { | |
config.GroupBalancers = []GroupBalancer{ | |
RangeGroupBalancer{}, | |
RoundRobinGroupBalancer{}, | |
} | |
} | |
} | |
if config.Dialer == nil { | |
config.Dialer = DefaultDialer | |
} | |
if config.MaxBytes == 0 { | |
config.MaxBytes = 1e6 // 1 MB | |
} | |
if config.MinBytes == 0 { | |
config.MinBytes = defaultFetchMinBytes | |
} | |
if config.MaxWait == 0 { | |
config.MaxWait = 10 * time.Second | |
} | |
if config.ReadLagInterval == 0 { | |
config.ReadLagInterval = 1 * time.Minute | |
} | |
if config.ReadBackoffMin == 0 { | |
config.ReadBackoffMin = defaultReadBackoffMin | |
} | |
if config.ReadBackoffMax == 0 { | |
config.ReadBackoffMax = defaultReadBackoffMax | |
} | |
if config.ReadBackoffMax < config.ReadBackoffMin { | |
panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin)) | |
} | |
if config.QueueCapacity == 0 { | |
config.QueueCapacity = 100 | |
} | |
if config.MaxAttempts == 0 { | |
config.MaxAttempts = 3 | |
} | |
// when configured as a consumer group; stats should report a partition of -1 | |
readerStatsPartition := config.Partition | |
if config.GroupID != "" { | |
readerStatsPartition = -1 | |
} | |
// when configured as a consume group, start version as 1 to ensure that only | |
// the rebalance function will start readers | |
version := int64(0) | |
if config.GroupID != "" { | |
version = 1 | |
} | |
stctx, stop := context.WithCancel(context.Background()) | |
r := &Reader{ | |
config: config, | |
msgs: make(chan readerMessage, config.QueueCapacity), | |
cancel: func() {}, | |
commits: make(chan commitRequest, config.QueueCapacity), | |
stop: stop, | |
offset: FirstOffset, | |
stctx: stctx, | |
stats: &readerStats{ | |
dialTime: makeSummary(), | |
readTime: makeSummary(), | |
waitTime: makeSummary(), | |
fetchSize: makeSummary(), | |
fetchBytes: makeSummary(), | |
// Generate the string representation of the partition number only | |
// once when the reader is created. | |
partition: strconv.Itoa(readerStatsPartition), | |
}, | |
version: version, | |
} | |
if r.useConsumerGroup() { | |
r.done = make(chan struct{}) | |
r.runError = make(chan error) | |
cg, err := NewConsumerGroup(ConsumerGroupConfig{ | |
ID: r.config.GroupID, | |
Brokers: r.config.Brokers, | |
Dialer: r.config.Dialer, | |
Topics: r.getTopics(), | |
GroupBalancers: r.config.GroupBalancers, | |
HeartbeatInterval: r.config.HeartbeatInterval, | |
PartitionWatchInterval: r.config.PartitionWatchInterval, | |
WatchPartitionChanges: r.config.WatchPartitionChanges, | |
SessionTimeout: r.config.SessionTimeout, | |
RebalanceTimeout: r.config.RebalanceTimeout, | |
JoinGroupBackoff: r.config.JoinGroupBackoff, | |
RetentionTime: r.config.RetentionTime, | |
StartOffset: r.config.StartOffset, | |
Logger: r.config.Logger, | |
ErrorLogger: r.config.ErrorLogger, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
go r.run(cg) | |
} | |
return r | |
} | |
// Config returns the reader's configuration. | |
func (r *Reader) Config() ReaderConfig { | |
return r.config | |
} | |
// Close closes the stream, preventing the program from reading any more | |
// messages from it. | |
func (r *Reader) Close() error { | |
atomic.StoreUint32(&r.once, 1) | |
r.mutex.Lock() | |
closed := r.closed | |
r.closed = true | |
r.mutex.Unlock() | |
r.cancel() | |
r.stop() | |
r.join.Wait() | |
if r.done != nil { | |
<-r.done | |
} | |
if !closed { | |
close(r.msgs) | |
} | |
return nil | |
} | |
// ReadMessage reads and return the next message from the r. The method call | |
// blocks until a message becomes available, or an error occurs. The program | |
// may also specify a context to asynchronously cancel the blocking operation. | |
// | |
// The method returns io.EOF to indicate that the reader has been closed. | |
// | |
// If consumer groups are used, ReadMessage will automatically commit the | |
// offset when called. Note that this could result in an offset being committed | |
// before the message is fully processed. | |
// | |
// If more fine grained control of when offsets are committed is required, it | |
// is recommended to use FetchMessage with CommitMessages instead. | |
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) { | |
m, err := r.FetchMessage(ctx) | |
if err != nil { | |
return Message{}, err | |
} | |
if r.useConsumerGroup() { | |
if err := r.CommitMessages(ctx, m); err != nil { | |
return Message{}, err | |
} | |
} | |
return m, nil | |
} | |
// FetchMessage reads and return the next message from the r. The method call | |
// blocks until a message becomes available, or an error occurs. The program | |
// may also specify a context to asynchronously cancel the blocking operation. | |
// | |
// The method returns io.EOF to indicate that the reader has been closed. | |
// | |
// FetchMessage does not commit offsets automatically when using consumer groups. | |
// Use CommitMessages to commit the offset. | |
func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { | |
r.activateReadLag() | |
for { | |
r.mutex.Lock() | |
if !r.closed && r.version == 0 { | |
r.start(r.getTopicPartitionOffset()) | |
} | |
version := r.version | |
r.mutex.Unlock() | |
select { | |
case <-ctx.Done(): | |
return Message{}, ctx.Err() | |
case err := <-r.runError: | |
return Message{}, err | |
case m, ok := <-r.msgs: | |
if !ok { | |
return Message{}, io.EOF | |
} | |
if m.version >= version { | |
r.mutex.Lock() | |
switch { | |
case m.error != nil: | |
case version == r.version: | |
r.offset = m.message.Offset + 1 | |
r.lag = m.watermark - r.offset | |
} | |
r.mutex.Unlock() | |
switch m.error { | |
case nil: | |
case io.EOF: | |
// io.EOF is used as a marker to indicate that the stream | |
// has been closed, in case it was received from the inner | |
// reader we don't want to confuse the program and replace | |
// the error with io.ErrUnexpectedEOF. | |
m.error = io.ErrUnexpectedEOF | |
} | |
return m.message, m.error | |
} | |
} | |
} | |
} | |
// CommitMessages commits the list of messages passed as argument. The program | |
// may pass a context to asynchronously cancel the commit operation when it was | |
// configured to be blocking. | |
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { | |
if !r.useConsumerGroup() { | |
return errOnlyAvailableWithGroup | |
} | |
var errch <-chan error | |
var creq = commitRequest{ | |
commits: makeCommits(msgs...), | |
} | |
if r.useSyncCommits() { | |
ch := make(chan error, 1) | |
errch, creq.errch = ch, ch | |
} | |
select { | |
case r.commits <- creq: | |
case <-ctx.Done(): | |
return ctx.Err() | |
case <-r.stctx.Done(): | |
// This context is used to ensure we don't allow commits after the | |
// reader was closed. | |
return io.ErrClosedPipe | |
} | |
if !r.useSyncCommits() { | |
return nil | |
} | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case err := <-errch: | |
return err | |
} | |
} | |
// ReadLag returns the current lag of the reader by fetching the last offset of | |
// the topic and partition and computing the difference between that value and | |
// the offset of the last message returned by ReadMessage. | |
// | |
// This method is intended to be used in cases where a program may be unable to | |
// call ReadMessage to update the value returned by Lag, but still needs to get | |
// an up to date estimation of how far behind the reader is. For example when | |
// the consumer is not ready to process the next message. | |
// | |
// The function returns a lag of zero when the reader's current offset is | |
// negative. | |
func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) { | |
if r.useConsumerGroup() { | |
return 0, errNotAvailableWithGroup | |
} | |
type offsets struct { | |
first int64 | |
last int64 | |
} | |
offch := make(chan offsets, 1) | |
errch := make(chan error, 1) | |
go func() { | |
var off offsets | |
var err error | |
for _, broker := range r.config.Brokers { | |
var conn *Conn | |
if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil { | |
continue | |
} | |
deadline, _ := ctx.Deadline() | |
conn.SetDeadline(deadline) | |
off.first, off.last, err = conn.ReadOffsets() | |
conn.Close() | |
if err == nil { | |
break | |
} | |
} | |
if err != nil { | |
errch <- err | |
} else { | |
offch <- off | |
} | |
}() | |
select { | |
case off := <-offch: | |
switch cur := r.Offset(); { | |
case cur == FirstOffset: | |
lag = off.last - off.first | |
case cur == LastOffset: | |
lag = 0 | |
default: | |
lag = off.last - cur | |
} | |
case err = <-errch: | |
case <-ctx.Done(): | |
err = ctx.Err() | |
} | |
return | |
} | |
// Offset returns the current absolute offset of the reader, or -1 | |
// if r is backed by a consumer group. | |
func (r *Reader) Offset() int64 { | |
if r.useConsumerGroup() { | |
return -1 | |
} | |
r.mutex.Lock() | |
offset := r.offset | |
r.mutex.Unlock() | |
r.withLogger(func(log Logger) { | |
log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset) | |
}) | |
return offset | |
} | |
// Lag returns the lag of the last message returned by ReadMessage, or -1 | |
// if r is backed by a consumer group. | |
func (r *Reader) Lag() int64 { | |
if r.useConsumerGroup() { | |
return -1 | |
} | |
r.mutex.Lock() | |
lag := r.lag | |
r.mutex.Unlock() | |
return lag | |
} | |
// SetOffset changes the offset from which the next batch of messages will be | |
// read. The method fails with io.ErrClosedPipe if the reader has already been closed. | |
// | |
// From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first | |
// or last available offset in the partition. Please note while -1 and -2 were accepted | |
// to indicate the first or last offset in previous versions, the meanings of the numbers | |
// were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol | |
// specification. | |
func (r *Reader) SetOffset(offset int64) error { | |
if r.useConsumerGroup() { | |
return errNotAvailableWithGroup | |
} | |
var err error | |
r.mutex.Lock() | |
if r.closed { | |
err = io.ErrClosedPipe | |
} else if offset != r.offset { | |
r.withLogger(func(log Logger) { | |
log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d", | |
r.config.Partition, r.config.Topic, r.offset, offset) | |
}) | |
r.offset = offset | |
if r.version != 0 { | |
r.start(r.getTopicPartitionOffset()) | |
} | |
r.activateReadLag() | |
} | |
r.mutex.Unlock() | |
return err | |
} | |
// SetOffsetAt changes the offset from which the next batch of messages will be | |
// read given the timestamp t. | |
// | |
// The method fails if the unable to connect partition leader, or unable to read the offset | |
// given the ts, or if the reader has been closed. | |
func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error { | |
r.mutex.Lock() | |
if r.closed { | |
r.mutex.Unlock() | |
return io.ErrClosedPipe | |
} | |
r.mutex.Unlock() | |
for _, broker := range r.config.Brokers { | |
conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition) | |
if err != nil { | |
continue | |
} | |
deadline, _ := ctx.Deadline() | |
conn.SetDeadline(deadline) | |
offset, err := conn.ReadOffset(t) | |
conn.Close() | |
if err != nil { | |
return err | |
} | |
return r.SetOffset(offset) | |
} | |
return fmt.Errorf("error setting offset for timestamp %+v", t) | |
} | |
// Stats returns a snapshot of the reader stats since the last time the method | |
// was called, or since the reader was created if it is called for the first | |
// time. | |
// | |
// A typical use of this method is to spawn a goroutine that will periodically | |
// call Stats on a kafka reader and report the metrics to a stats collection | |
// system. | |
func (r *Reader) Stats() ReaderStats { | |
stats := ReaderStats{ | |
Dials: r.stats.dials.snapshot(), | |
Fetches: r.stats.fetches.snapshot(), | |
Messages: r.stats.messages.snapshot(), | |
Bytes: r.stats.bytes.snapshot(), | |
Rebalances: r.stats.rebalances.snapshot(), | |
Timeouts: r.stats.timeouts.snapshot(), | |
Errors: r.stats.errors.snapshot(), | |
DialTime: r.stats.dialTime.snapshotDuration(), | |
ReadTime: r.stats.readTime.snapshotDuration(), | |
WaitTime: r.stats.waitTime.snapshotDuration(), | |
FetchSize: r.stats.fetchSize.snapshot(), | |
FetchBytes: r.stats.fetchBytes.snapshot(), | |
Offset: r.stats.offset.snapshot(), | |
Lag: r.stats.lag.snapshot(), | |
MinBytes: int64(r.config.MinBytes), | |
MaxBytes: int64(r.config.MaxBytes), | |
MaxWait: r.config.MaxWait, | |
QueueLength: int64(len(r.msgs)), | |
QueueCapacity: int64(cap(r.msgs)), | |
ClientID: r.config.Dialer.ClientID, | |
Topic: r.config.Topic, | |
Partition: r.stats.partition, | |
} | |
// TODO: remove when we get rid of the deprecated field. | |
stats.DeprecatedFetchesWithTypo = stats.Fetches | |
return stats | |
} | |
func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 { | |
key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} | |
return map[topicPartition]int64{key: r.offset} | |
} | |
func (r *Reader) withLogger(do func(Logger)) { | |
if r.config.Logger != nil { | |
do(r.config.Logger) | |
} | |
} | |
func (r *Reader) withErrorLogger(do func(Logger)) { | |
if r.config.ErrorLogger != nil { | |
do(r.config.ErrorLogger) | |
} else { | |
r.withLogger(do) | |
} | |
} | |
func (r *Reader) activateReadLag() { | |
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) { | |
// read lag will only be calculated when not using consumer groups | |
// todo discuss how capturing read lag should interact with rebalancing | |
if !r.useConsumerGroup() { | |
go r.readLag(r.stctx) | |
} | |
} | |
} | |
func (r *Reader) readLag(ctx context.Context) { | |
ticker := time.NewTicker(r.config.ReadLagInterval) | |
defer ticker.Stop() | |
for { | |
timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2) | |
lag, err := r.ReadLag(timeout) | |
cancel() | |
if err != nil { | |
r.stats.errors.observe(1) | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err) | |
}) | |
} else { | |
r.stats.lag.observe(lag) | |
} | |
select { | |
case <-ticker.C: | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { | |
if r.closed { | |
// don't start child reader if parent Reader is closed | |
return | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
r.cancel() // always cancel the previous reader | |
r.cancel = cancel | |
r.version++ | |
r.join.Add(len(offsetsByPartition)) | |
for key, offset := range offsetsByPartition { | |
go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) { | |
defer join.Done() | |
(&reader{ | |
dialer: r.config.Dialer, | |
logger: r.config.Logger, | |
errorLogger: r.config.ErrorLogger, | |
brokers: r.config.Brokers, | |
topic: key.topic, | |
partition: int(key.partition), | |
minBytes: r.config.MinBytes, | |
maxBytes: r.config.MaxBytes, | |
maxWait: r.config.MaxWait, | |
backoffDelayMin: r.config.ReadBackoffMin, | |
backoffDelayMax: r.config.ReadBackoffMax, | |
version: r.version, | |
msgs: r.msgs, | |
stats: r.stats, | |
isolationLevel: r.config.IsolationLevel, | |
maxAttempts: r.config.MaxAttempts, | |
}).run(ctx, offset) | |
}(ctx, key, offset, &r.join) | |
} | |
} | |
// A reader reads messages from kafka and produces them on its channels, it's | |
// used as an way to asynchronously fetch messages while the main program reads | |
// them using the high level reader API. | |
type reader struct { | |
dialer *Dialer | |
logger Logger | |
errorLogger Logger | |
brokers []string | |
topic string | |
partition int | |
minBytes int | |
maxBytes int | |
maxWait time.Duration | |
backoffDelayMin time.Duration | |
backoffDelayMax time.Duration | |
version int64 | |
msgs chan<- readerMessage | |
stats *readerStats | |
isolationLevel IsolationLevel | |
maxAttempts int | |
} | |
type readerMessage struct { | |
version int64 | |
message Message | |
watermark int64 | |
error error | |
} | |
func (r *reader) run(ctx context.Context, offset int64) { | |
// This is the reader's main loop, it only ends if the context is canceled | |
// and will keep attempting to reader messages otherwise. | |
// | |
// Retrying indefinitely has the nice side effect of preventing Read calls | |
// on the parent reader to block if connection to the kafka server fails, | |
// the reader keeps reporting errors on the error channel which will then | |
// be surfaced to the program. | |
// If the reader wasn't retrying then the program would block indefinitely | |
// on a Read call after reading the first error. | |
for attempt := 0; true; attempt++ { | |
if attempt != 0 { | |
if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) { | |
return | |
} | |
} | |
r.withLogger(func(log Logger) { | |
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset) | |
}) | |
conn, start, err := r.initialize(ctx, offset) | |
switch err { | |
case nil: | |
case OffsetOutOfRange: | |
// This would happen if the requested offset is passed the last | |
// offset on the partition leader. In that case we're just going | |
// to retry later hoping that enough data has been produced. | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange) | |
}) | |
continue | |
default: | |
// Perform a configured number of attempts before | |
// reporting first errors, this helps mitigate | |
// situations where the kafka server is temporarily | |
// unavailable. | |
if attempt >= r.maxAttempts { | |
r.sendError(ctx, err) | |
} else { | |
r.stats.errors.observe(1) | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err) | |
}) | |
} | |
continue | |
} | |
// Resetting the attempt counter ensures that if a failure occurs after | |
// a successful initialization we don't keep increasing the backoff | |
// timeout. | |
attempt = 0 | |
// Now we're sure to have an absolute offset number, may anything happen | |
// to the connection we know we'll want to restart from this offset. | |
offset = start | |
errcount := 0 | |
readLoop: | |
for { | |
if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) { | |
conn.Close() | |
return | |
} | |
switch offset, err = r.read(ctx, offset, conn); err { | |
case nil: | |
errcount = 0 | |
case io.EOF: | |
// done with this batch of messages...carry on. note that this | |
// block relies on the batch repackaging real io.EOF errors as | |
// io.UnexpectedEOF. otherwise, we would end up swallowing real | |
// errors here. | |
errcount = 0 | |
case UnknownTopicOrPartition: | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers) | |
}) | |
conn.Close() | |
// The next call to .initialize will re-establish a connection to the proper | |
// topic/partition broker combo. | |
r.stats.rebalances.observe(1) | |
break readLoop | |
case NotLeaderForPartition: | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset) | |
}) | |
conn.Close() | |
// The next call to .initialize will re-establish a connection to the proper | |
// partition leader. | |
r.stats.rebalances.observe(1) | |
break readLoop | |
case RequestTimedOut: | |
// Timeout on the kafka side, this can be safely retried. | |
errcount = 0 | |
r.withLogger(func(log Logger) { | |
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset) | |
}) | |
r.stats.timeouts.observe(1) | |
continue | |
case OffsetOutOfRange: | |
first, last, err := r.readOffsets(conn) | |
if err != nil { | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err) | |
}) | |
conn.Close() | |
break readLoop | |
} | |
switch { | |
case offset < first: | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset) | |
}) | |
offset, errcount = first, 0 | |
continue // retry immediately so we don't keep falling behind due to the backoff | |
case offset < last: | |
errcount = 0 | |
continue // more messages have already become available, retry immediately | |
default: | |
// We may be reading past the last offset, will retry later. | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset) | |
}) | |
} | |
case context.Canceled: | |
// Another reader has taken over, we can safely quit. | |
conn.Close() | |
return | |
case errUnknownCodec: | |
// The compression codec is either unsupported or has not been | |
// imported. This is a fatal error b/c the reader cannot | |
// proceed. | |
r.sendError(ctx, err) | |
break readLoop | |
default: | |
if _, ok := err.(Error); ok { | |
r.sendError(ctx, err) | |
} else { | |
r.withErrorLogger(func(log Logger) { | |
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err) | |
}) | |
r.stats.errors.observe(1) | |
conn.Close() | |
break readLoop | |
} | |
} | |
errcount++ | |
} | |
} | |
} | |
func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) { | |
for i := 0; i != len(r.brokers) && conn == nil; i++ { | |
var broker = r.brokers[i] | |
var first, last int64 | |
t0 := time.Now() | |
conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition) | |
t1 := time.Now() | |
r.stats.dials.observe(1) | |
r.stats.dialTime.observeDuration(t1.Sub(t0)) | |
if err != nil { | |
continue | |
} | |
if first, last, err = r.readOffsets(conn); err != nil { | |
conn.Close() | |
conn = nil | |
break | |
} | |
switch { | |
case offset == FirstOffset: | |
offset = first | |
case offset == LastOffset: | |
offset = last | |
case offset < first: | |
offset = first | |
} | |
r.withLogger(func(log Logger) { | |
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset) | |
}) | |
if start, err = conn.Seek(offset, SeekAbsolute); err != nil { | |
conn.Close() | |
conn = nil | |
break | |
} | |
conn.SetDeadline(time.Time{}) | |
} | |
return | |
} | |
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { | |
r.stats.fetches.observe(1) | |
r.stats.offset.observe(offset) | |
t0 := time.Now() | |
conn.SetReadDeadline(t0.Add(r.maxWait)) | |
batch := conn.ReadBatchWith(ReadBatchConfig{ | |
MinBytes: r.minBytes, | |
MaxBytes: r.maxBytes, | |
IsolationLevel: r.isolationLevel, | |
}) | |
highWaterMark := batch.HighWaterMark() | |
t1 := time.Now() | |
r.stats.waitTime.observeDuration(t1.Sub(t0)) | |
var msg Message | |
var err error | |
var size int64 | |
var bytes int64 | |
const safetyTimeout = 10 * time.Second | |
deadline := time.Now().Add(safetyTimeout) | |
conn.SetReadDeadline(deadline) | |
for { | |
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) { | |
deadline = now.Add(safetyTimeout) | |
conn.SetReadDeadline(deadline) | |
} | |
if msg, err = batch.ReadMessage(); err != nil { | |
batch.Close() | |
break | |
} | |
n := int64(len(msg.Key) + len(msg.Value)) | |
r.stats.messages.observe(1) | |
r.stats.bytes.observe(n) | |
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil { | |
batch.Close() | |
break | |
} | |
offset = msg.Offset + 1 | |
r.stats.offset.observe(offset) | |
r.stats.lag.observe(highWaterMark - offset) | |
size++ | |
bytes += n | |
} | |
conn.SetReadDeadline(time.Time{}) | |
t2 := time.Now() | |
r.stats.readTime.observeDuration(t2.Sub(t1)) | |
r.stats.fetchSize.observe(size) | |
r.stats.fetchBytes.observe(bytes) | |
return offset, err | |
} | |
func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) { | |
conn.SetDeadline(time.Now().Add(10 * time.Second)) | |
return conn.ReadOffsets() | |
} | |
func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error { | |
select { | |
case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}: | |
return nil | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
func (r *reader) sendError(ctx context.Context, err error) error { | |
select { | |
case r.msgs <- readerMessage{version: r.version, error: err}: | |
return nil | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
func (r *reader) withLogger(do func(Logger)) { | |
if r.logger != nil { | |
do(r.logger) | |
} | |
} | |
func (r *reader) withErrorLogger(do func(Logger)) { | |
if r.errorLogger != nil { | |
do(r.errorLogger) | |
} else { | |
r.withLogger(do) | |
} | |
} | |
// extractTopics returns the unique list of topics represented by the set of | |
// provided members | |
func extractTopics(members []GroupMember) []string { | |
var visited = map[string]struct{}{} | |
var topics []string | |
for _, member := range members { | |
for _, topic := range member.Topics { | |
if _, seen := visited[topic]; seen { | |
continue | |
} | |
topics = append(topics, topic) | |
visited[topic] = struct{}{} | |
} | |
} | |
sort.Strings(topics) | |
return topics | |
} |