The Wayback Machine - https://web.archive.org/web/20211006153901/https://github.com/segmentio/kafka-go/pull/668
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry the writing of messages on transient network errors #668

Merged
merged 1 commit into from Jun 20, 2021

Conversation

@efaif
Copy link
Contributor

@efaif efaif commented May 19, 2021

After upgrading to 0.4 (v0.4.16) from 0.3 we noticed that if a broker goes down in our kafka cluster, the ongoing writes to its partitions are failing with network connection errors and without any retry attempts.
That causes these writes to fail although they would have succeeded if retried due to the other healthy brokers in the cluster taking the partitions leadership.

I think that although network connection errors are not considered temporary, they should be treated as transient errors in this scenario, as the writer is usually working against a cluster of brokers that tolerates server failures (up to the configured replication factor).

In this PR I’ve added another condition to the breaking retry loop check that validates that the received error is not a transient network error before exiting the loop.

I’ve reproduced the issue locally and validated that this change indeed fixes it.

I would love to hear your thoughts.

@achille-roussel achille-roussel self-assigned this May 21, 2021
@achille-roussel
Copy link
Contributor

@achille-roussel achille-roussel commented May 21, 2021

@efaif thanks for the fix.

We're under the impression that the syscall errors would already be wrapped by the network layer, and isTemporary should match them (see https://golang.org/src/net/net.go?s=16228:16259#L515).

Would you be able to share logs that show which errors you were getting in your program?

@efaif
Copy link
Contributor Author

@efaif efaif commented May 21, 2021

@achille-roussel thanks for the response.

Sure, below are the program logs containing the writer error logs (tagged by producer.kafkaWriter) and the errors received by the writer’s Completion function (tagged by producer).
I’ve also logged the received net.OpError’s Op value.

...
...
...
2021-05-21T22:08:48.17       INFO    producer        Start producing...
2021-05-21T22:08:57.51       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): [6] Not Leader For Partition: the client attempted to send messages to a replica that is not the leader for some partition, the client's metadata are likely out of date   {"topic": "test-topic"}
2021-05-21T22:08:57.55       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): [6] Not Leader For Partition: the client attempted to send messages to a replica that is not the leader for some partition, the client's metadata are likely out of date   {"topic": "test-topic"}
2021-05-21T22:08:57.59       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: unexpected EOF        {"topic": "test-topic"}
2021-05-21T22:08:57.59       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: unexpected EOF", "topic": "test-topic"}
2021-05-21T22:08:57.63       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: write tcp 192.168.0.133:54773->192.168.0.133:9092: write: broken pipe {"topic": "test-topic"}
2021-05-21T22:08:57.63       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: write tcp 192.168.0.133:54773->192.168.0.133:9092: write: broken pipe", "topic": "test-topic"}
2021-05-21T22:08:57.63       ERROR   producer        Got OpError.Op = 'write'  
2021-05-21T22:08:57.63       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: read tcp 192.168.0.133:54771->192.168.0.133:9092: read: connection reset by peer {"topic": "test-topic"}
2021-05-21T22:08:57.63       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: read tcp 192.168.0.133:54771->192.168.0.133:9092: read: connection reset by peer", "topic": "test-topic"}
2021-05-21T22:08:57.63       ERROR   producer        Got OpError.Op = 'read' 
2021-05-21T22:08:57.64       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused      {"topic": "test-topic"}
2021-05-21T22:08:57.64       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused", "topic": "test-topic"}
2021-05-21T22:08:57.64       ERROR   producer        Got OpError.Op = 'dial' 
2021-05-21T22:08:57.65       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused      {"topic": "test-topic"}
2021-05-21T22:08:57.65       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused", "topic": "test-topic"}
2021-05-21T22:08:57.65       ERROR   producer        Got OpError.Op = 'dial' 
2021-05-21T22:08:57.67       ERROR   producer.kafkaWriter    error writing messages to test-topic (partition 1): kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused      {"topic": "test-topic"}
2021-05-21T22:08:57.67       ERROR   producer        Failed to produce messages batch        {"error": "kafka.(*Client).Produce: dial tcp 192.168.0.133:9092: connect: connection refused", "topic": "test-topic"}
2021-05-21T22:08:57.67       ERROR   producer        Got OpError.Op = 'dial' 

You can notice that the writes that failed with a connection error have not been retried and that the Completion function was immediately invoked.
It seems that the net.OpError’s Op is not “accept” for any of the connection reset errors and therefore they were not treated as temporary.

@efaif
Copy link
Contributor Author

@efaif efaif commented Jun 1, 2021

@achille-roussel is there anything I can do to help progress the PR?

@achille-roussel
Copy link
Contributor

@achille-roussel achille-roussel commented Jun 4, 2021

Thanks for providing the extra context.

I guess what I was wondering is whether we could simply test for io.ErrUnexpectedEOF, it seemed like the other syscall errors should be covered by the isTemporary check already.

No concerns with merging the change otherwise.

@efaif
Copy link
Contributor Author

@efaif efaif commented Jun 5, 2021

@achille-roussel thanks for the response.

After a more thorough debugging I found that the syscall connection errors are not covered by the isTemporary check and are not treated as temporary by the network layer:

When a net.OpError that wraps os.SyscallError (like ECONNREFUSED or ECONNRESET) is received, the isTemporary check calls net.OpError's Temporary.
net.OpError's Temporary treats a connection error as temporary only when its Op is "accept":
https://golang.org/src/net/net.go?s=16559:16575#L515

// Treat ECONNRESET and ECONNABORTED as temporary errors when
// they come from calling accept. See issue 6163.
if e.Op == "accept" && isConnError(e.Err) {
	return true
}

This doesn't apply in our case where the Op is "write", "read" or "dial" as can be seen in the logs posted in a previous comment above.
Therefore, net.OpError's Temporary calls os.SyscallError's Err's Temporary:
https://golang.org/src/net/net.go?s=16708:16721#L515

if ne, ok := e.Err.(*os.SyscallError); ok {
	t, ok := ne.Err.(temporary)
	return ok && t.Temporary()
}

Which ends up here:
https://golang.org/src/syscall/syscall_unix.go?s=3288:3390#L138

func (e Errno) Temporary() bool {
	return e == EINTR || e == EMFILE || e == ENFILE || e.Timeout()
}

This calls syscall.Errno's Timeout:
https://golang.org/src/syscall/syscall_unix.go?s=3388:3482#L142

func (e Errno) Timeout() bool {
	return e == EAGAIN || e == EWOULDBLOCK || e == ETIMEDOUT
}

As you can notice neither ECONNREFUSED nor ECONNRESET nor EPIPE are treated as temporary in this case.

@dmarkhas
Copy link
Contributor

@dmarkhas dmarkhas commented Jun 9, 2021

We're also seeing "Unexpected EOF" write failures when a broker goes down, can we get this merged please?

@dmarkhas
Copy link
Contributor

@dmarkhas dmarkhas commented Jun 20, 2021

@achille-roussel hey, how can we expedite this?

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Changes are looking good, thanks for the contribution!

@achille-roussel achille-roussel merged commit 7113876 into segmentio:master Jun 20, 2021
11 checks passed
@dmarkhas
Copy link
Contributor

@dmarkhas dmarkhas commented Jun 30, 2021

Hey, can you tag this release please so we can use it? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants