Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not flush when MaxBufferedRecords reached #726

Closed
evanzhang87 opened this issue May 10, 2024 · 0 comments · Fixed by #736
Closed

Can not flush when MaxBufferedRecords reached #726

evanzhang87 opened this issue May 10, 2024 · 0 comments · Fixed by #736
Labels

Comments

@evanzhang87
Copy link

#591
According this, I think when records will be flushed when linger is hit or batch is full.

	seeds := strings.Split(brokers, ",")
	cl, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ProducerLinger(time.Second*10),
		kgo.MaxBufferedRecords(5),
	)
	if err != nil {
		panic(err)
	}
	defer cl.Close()
	ctx := context.Background()

	record := &kgo.Record{Topic: "cbd_yzhang_test", Value: []byte("this is a message")}
	wg := sync.WaitGroup{}
	for i := 0; i < 11; i++ {
		fmt.Println("send msg", time.Now())
		wg.Add(1)
		cl.Produce(ctx, record, func(r *kgo.Record, err error) {
			if err != nil {
				fmt.Println(err)
			}
			wg.Done()
		})
	}
	wg.Wait()

But it always flush per 10s, when I reach buffer size, it will just block and do not flush.

send msg 2024-05-10 14:49:39.630297 +0800 CST m=+0.001410626
send msg 2024-05-10 14:49:39.630429 +0800 CST m=+0.001543085
send msg 2024-05-10 14:49:39.630432 +0800 CST m=+0.001545793
send msg 2024-05-10 14:49:39.630433 +0800 CST m=+0.001547251
send msg 2024-05-10 14:49:39.630435 +0800 CST m=+0.001548543
send msg 2024-05-10 14:49:39.630436 +0800 CST m=+0.001549710
send msg 2024-05-10 14:49:49.722552 +0800 CST m=+10.094001585
send msg 2024-05-10 14:49:49.722603 +0800 CST m=+10.094051835
send msg 2024-05-10 14:49:49.722609 +0800 CST m=+10.094058335
send msg 2024-05-10 14:49:49.722614 +0800 CST m=+10.094063085
send msg 2024-05-10 14:49:49.722618 +0800 CST m=+10.094067001

twmb added a commit that referenced this issue May 23, 2024
Currently if your linger is long and your max records small, it is
possible to linger even if the client cannot buffer any more records.

Now, once max records or bytes is hit, we wakeup anything lingering and
avoid entering the linger state again -- until we are no longer over
max.

Closes #726.
@twmb twmb added the has pr label May 23, 2024
@twmb twmb closed this as completed in #736 May 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants