Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper way to save position after Canal OnRow event #816

Open
agorman opened this issue Aug 30, 2023 · 4 comments
Open

Proper way to save position after Canal OnRow event #816

agorman opened this issue Aug 30, 2023 · 4 comments

Comments

@agorman
Copy link

agorman commented Aug 30, 2023

What is the recommended way to save position to the database after each OnRow event?

The reason I'd like to save the position is so I can restart the program and have it start where it left off. I'm guessing this is a common use case and others are already doing this.

I've tried OnPosSynced but because I'm saving the position to the same database it causes it to fire endlessly.

My current implementation is to use e.Header.LogPos something like below. It appears to work but I'm worried there are cases I'm not considering.

func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
    // process event

    pos := c.SyncedPosition()
    pos.Pos = e.Header.LogPos

    h.savePosToDatabase(pos)
}

Any issues with the current implementation or better ways to go about this?

@lance6716
Copy link
Collaborator

lance6716 commented Aug 30, 2023

I'm not a canel user (we directly use BinlogSyncer) but that's not much difference. Considering DML, I think it's XIDEvent rather than RowsEvent that can be used to (re)start a binlog replication. So OnRow does not provide the position to caller.

@cameron-p-m
Copy link
Contributor

Initial position GTIDSet can be retrieved from:

SELECT @@GLOBAL.GTID_EXECUTED

Then start with:

func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {

Maintain the GTIDSet in memory and then keep adding the GTID next from the Event handler to it:
https://github.com/go-mysql-org/go-mysql/blob/cfe601218a91b198994d7aad75ef09bcd96dbd2b/canal/handler.go#L17C2-L17C2

To resume from the place you stopped, call StartFromGTID again next time.

@agorman
Copy link
Author

agorman commented Sep 3, 2023

@cameron-p-m thanks, I don't have GTID_MODE=ON so I can't use GTIDSet. I believe it would have the same issue there where every time I save the GTIDSet back to the database it would fire the event.

@lance6716 it appears that e.Header.LogPos from the OnRow event works well for my needs.

Thanks!

@tonyhal
Copy link

tonyhal commented Dec 4, 2024

GTIDSet 的最终位置可以从以下位置搜索:

SELECT @@GLOBAL.GTID_EXECUTED

然后开始:

func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {

在内存中维护 GTIDSet,然后继续将事件处理程序中的 GTID 添加到其中: https://github.com/go-mysql-org/go-mysql/blob/cfe601218a91b198994d7aad75ef09bcd96dbd2b/canal/handler.go#L17C2-L17C2

要从停止的地方继续,请在下次再次调用StartFromGTID。

这里有个问题 大批量插入 或者更新 他会让你记录 最后面的 GTID点 程序异常之后 你会从最新点开始读取 会丢失数据

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants