Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Flink Source throw the fetch timestamp %s is larger than the max timestamp %s. #274

Open
1 task done
loserwang1024 opened this issue Dec 26, 2024 · 0 comments · May be fixed by #284
Open
1 task done
Labels
feature New feature or request

Comments

@loserwang1024
Copy link
Collaborator

loserwang1024 commented Dec 26, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, if the table bucket is not updated for two days, the max timestamp also will be earlier value.If I specified the source start from current, the exception will be thrown.

Solution

just like what kafka source [1] and paimon do.

kafka:

// First get the current end offsets of the partitions. This is going to be used
        // in case we cannot find a suitable offsets based on the timestamp, i.e. the message
        // meeting the requirement of the timestamp have not been produced to Kafka yet, in
        // this case, we just use the latest offset.
        // We need to get the latest offsets before querying offsets by time to ensure that
        // no message is going to be missed.

paimon

/**
     * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
     * returned if all snapshots are equal to or later than the timestamp mills.
     */
    public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
    }

[1] https://github.com/apache/flink-connector-kafka/blob/f6a077a9dd8d1d5e43fc545cc9baab227d8438a0/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java#L51
[2] https://github.com/apache/paimon/blob/ba4576657edf1dde60ac321423d4fa6032169119/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java#L306

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
1 participant