Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector] Fluss support scan.startup.timestamp is larger than max timestamp of bucket. #284

Merged
merged 2 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -84,7 +86,14 @@ protected static void beforeAll() {
FlinkTestBase.beforeAll();

String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv =
StreamExecutionEnvironment.getExecutionEnvironment(
new Configuration()
.set(
RestartStrategyOptions.RESTART_STRATEGY,
RestartStrategyOptions.RestartStrategyType
.NO_RESTART_STRATEGY
.getMainValue()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to configure this?

// create table environment
tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
// crate catalog using sql
Expand Down Expand Up @@ -631,6 +640,62 @@ void testReadPrimaryKeyPartitionedTable() throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}

@Test
void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
tEnv.executeSql("create table timestamp_table (a int, b varchar) ");
TablePath tablePath = TablePath.of(DEFAULT_DB, "timestamp_table");

// write first bath records
List<InternalRow> rows =
Arrays.asList(
row(DATA1_ROW_TYPE, new Object[] {1, "v1"}),
row(DATA1_ROW_TYPE, new Object[] {2, "v2"}),
row(DATA1_ROW_TYPE, new Object[] {3, "v3"}));

writeRows(tablePath, rows, true);
Thread.sleep(100);
// startup time between write first and second batch records.
long currentTimeMillis = System.currentTimeMillis();

// startup timestamp is larger than current time.
assertThatThrownBy(
() ->
tEnv.executeSql(
String.format(
"select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ",
currentTimeMillis
+ Duration.ofMinutes(5).toMillis()))
.await())
.hasStackTraceContaining(
String.format(
"the fetch timestamp %s is larger than the current timestamp",
currentTimeMillis + Duration.ofMinutes(5).toMillis()));

try (org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(
String.format(
"select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ",
currentTimeMillis))
.collect()) {
Thread.sleep(100);
// write second batch record.
rows =
Arrays.asList(
row(DATA1_ROW_TYPE, new Object[] {4, "v4"}),
row(DATA1_ROW_TYPE, new Object[] {5, "v5"}),
row(DATA1_ROW_TYPE, new Object[] {6, "v6"}));
writeRows(tablePath, rows, true);
List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]");
int expectRecords = expected.size();
List<String> actual = new ArrayList<>(expectRecords);
for (int i = 0; i < expectRecords; i++) {
String row = rowIter.next().toString();
actual.add(row);
}
assertThat(actual).containsExactlyElementsOf(expected);
}
}

// -------------------------------------------------------------------------------------
// Fluss look source tests
// -------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,16 @@ public boolean equals(Object o) {
ListOffsetsResultForBucket that = (ListOffsetsResultForBucket) o;
return offset == that.offset;
}

@Override
public String toString() {
return "ListOffsetsResultForBucket{"
+ "offset="
+ offset
+ ", tableBucket="
+ tableBucket
+ ", error="
+ error
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@Internal
public abstract class ResultForBucket {
protected final TableBucket tableBucket;
private final ApiError error;
protected final ApiError error;

public ResultForBucket(TableBucket tableBucket) {
this(tableBucket, ApiError.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import com.alibaba.fluss.utils.CloseableRegistry;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.IOUtils;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand Down Expand Up @@ -174,6 +175,7 @@ public final class Replica {
private final int tieredLogLocalSegments;
private final AtomicReference<Integer> leaderReplicaIdOpt = new AtomicReference<>();
private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock();
private final Clock clock;

/**
* storing the remote follower replicas' state, used to update leader's highWatermark and
Expand Down Expand Up @@ -213,7 +215,8 @@ public Replica(
ServerMetadataCache metadataCache,
FatalErrorHandler fatalErrorHandler,
BucketMetricGroup bucketMetricGroup,
TableDescriptor tableDescriptor)
TableDescriptor tableDescriptor,
Clock clock)
throws Exception {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
Expand Down Expand Up @@ -241,6 +244,7 @@ public Replica(
this.closeableRegistry = new CloseableRegistry();

this.logTablet = createLog(lazyHighWatermarkCheckpoint);
this.clock = clock;
registerMetrics();
}

Expand Down Expand Up @@ -357,7 +361,7 @@ public void makeLeader(NotifyLeaderAndIsrData data) throws IOException {

coordinatorEpoch = data.getCoordinatorEpoch();

long currentTimeMs = System.currentTimeMillis();
long currentTimeMs = clock.milliseconds();
// Updating the assignment and ISR state is safe if the bucket epoch is
// larger or equal to the current bucket epoch.
updateAssignmentAndIsr(data.getReplicas(), true, data.getIsr());
Expand Down Expand Up @@ -563,7 +567,7 @@ private void mayFlushKv(long newHighWatermark) {
*/
private Optional<CompletedSnapshot> initKvTablet() {
checkNotNull(kvManager);
long startTime = System.currentTimeMillis();
long startTime = clock.milliseconds();
LOG.info("Start to init kv tablet for {} of table {}.", tableBucket, physicalPath);

// todo: we may need to handle the following cases:
Expand Down Expand Up @@ -618,7 +622,7 @@ private Optional<CompletedSnapshot> initKvTablet() {
tableBucket, physicalPath),
e);
}
long endTime = System.currentTimeMillis();
long endTime = clock.milliseconds();
LOG.info(
"Init kv tablet for {} of {} finish, cost {} ms.",
physicalPath,
Expand All @@ -632,7 +636,7 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab
Path kvDbPath = kvTabletDir.resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING);
KvSnapshotDownloadSpec downloadSpec =
new KvSnapshotDownloadSpec(completedSnapshot.getKvSnapshotHandle(), kvDbPath);
long start = System.currentTimeMillis();
long start = clock.milliseconds();
LOG.info("Start to download kv snapshot {} to directory {}.", completedSnapshot, kvDbPath);
KvSnapshotDataDownloader kvSnapshotDataDownloader =
snapshotContext.getSnapshotDataDownloader();
Expand All @@ -641,7 +645,7 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab
} catch (Exception e) {
throw new IOException("Fail to download kv snapshot.", e);
}
long end = System.currentTimeMillis();
long end = clock.milliseconds();
LOG.info(
"Download kv snapshot {} to directory {} finish, cost {} ms.",
completedSnapshot,
Expand All @@ -664,7 +668,7 @@ private Optional<CompletedSnapshot> getLatestSnapshot(TableBucket tableBucket) {
}

private void recoverKvTablet(long startRecoverLogOffset) {
long start = System.currentTimeMillis();
long start = clock.milliseconds();
checkNotNull(kvTablet, "kv tablet should not be null.");
try {
KvRecoverHelper.KvRecoverContext recoverContext =
Expand All @@ -689,7 +693,7 @@ private void recoverKvTablet(long startRecoverLogOffset) {
tableBucket, physicalPath),
e);
}
long end = System.currentTimeMillis();
long end = clock.milliseconds();
LOG.info(
"Recover kv tablet for {} of table {} from log offset {} finish, cost {} ms.",
tableBucket,
Expand Down Expand Up @@ -800,8 +804,7 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in

// we may need to increment high watermark if isr could be down to 1 or the
// replica count is 1.
boolean hwIncreased =
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
boolean hwIncreased = maybeIncrementLeaderHW(logTablet, clock.milliseconds());

if (hwIncreased) {
tryCompleteDelayedOperations();
Expand Down Expand Up @@ -835,7 +838,7 @@ public LogAppendInfo putRecordsToLeader(
kv, "KvTablet for the replica to put kv records shouldn't be null.");
LogAppendInfo logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, schema);
// we may need to increment high watermark.
maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
maybeIncrementLeaderHW(logTablet, clock.milliseconds());
return logAppendInfo;
});
}
Expand All @@ -848,7 +851,7 @@ public LogReadInfo fetchRecords(FetchParams fetchParams) throws IOException {
physicalPath));
}
if (fetchParams.isFromFollower()) {
long followerFetchTimeMs = System.currentTimeMillis();
long followerFetchTimeMs = clock.milliseconds();
LogReadInfo logReadInfo =
inReadLock(
leaderIsrUpdateLock,
Expand Down Expand Up @@ -1204,12 +1207,7 @@ public long getOffset(RemoteLogManager remoteLogManager, ListOffsetsParam listOf
return logTablet.localLogStartOffset();
}
} else if (offsetType == ListOffsetsParam.LATEST_OFFSET_TYPE) {
// the request is come from client.
if (listOffsetsParam.getFollowerServerId() < 0) {
return logTablet.getHighWatermark();
} else {
return logTablet.localLogEndOffset();
}
return getLatestOffset(listOffsetsParam.getFollowerServerId());
} else {
throw new IllegalArgumentException(
"Invalid list offset type: " + offsetType);
Expand All @@ -1227,15 +1225,19 @@ private long getOffsetByTimestamp(
}

long fetchTimestamp = startTimestampOpt.getAsLong();
// 1. if the fetch timestamp is larger than the local max timestamp, we will
// throw an invalidTimestamp exception
// 1. If the fetch timestamp is larger than current timestamp, we will throw an
// invalidTimestamp exception. If the fetch timestamp is larger than the local max timestamp
// but no larger than current timestamp, we will latest offset.
long localMaxTimestamp = logTablet.localMaxTimestamp();
if (fetchTimestamp > localMaxTimestamp) {
long currentTimestamp = clock.milliseconds();
if (fetchTimestamp > localMaxTimestamp && fetchTimestamp <= currentTimestamp) {
return getLatestOffset(listOffsetsParam.getFollowerServerId());
} else if (fetchTimestamp > currentTimestamp) {
throw new InvalidTimestampException(
String.format(
"Get offset error for table bucket %s, "
+ "the fetch timestamp %s is larger than the max timestamp %s",
tableBucket, fetchTimestamp, localMaxTimestamp));
+ "the fetch timestamp %s is larger than the current timestamp %s",
tableBucket, fetchTimestamp, currentTimestamp));
}

// 2. we will try to find offset from remote storage.
Expand All @@ -1248,6 +1250,15 @@ private long getOffsetByTimestamp(
return logTablet.lookupOffsetForTimestamp(fetchTimestamp);
}

private long getLatestOffset(int followerServerId) {
// the request is come from client.
if (followerServerId < 0) {
return logTablet.getHighWatermark();
} else {
return logTablet.localLogEndOffset();
}
}

/**
* Truncate the local log of this bucket to the specified offset and checkpoint the recovery
* point to this offset.
Expand Down Expand Up @@ -1526,7 +1537,7 @@ private boolean handleAdjustIsrUpdate(

// We may need to increment high watermark since ISR could be down to 1.
try {
return maybeIncrementLeaderHW(logTablet, System.currentTimeMillis());
return maybeIncrementLeaderHW(logTablet, clock.milliseconds());
} catch (IOException e) {
LOG.error("Failed to increment leader HW", e);
return false;
Expand Down Expand Up @@ -1625,7 +1636,7 @@ private List<Integer> getOutOfSyncFollowerReplicas(long maxLagTime) {
if (!currentState.isInflight()) {
Set<Integer> candidateReplicas = new HashSet<>(currentState.isr());
candidateReplicas.remove(localTabletServerId);
long currentTimeMillis = System.currentTimeMillis();
long currentTimeMillis = clock.milliseconds();
long leaderEndOffset = logTablet.localLogEndOffset();
for (int replicaId : candidateReplicas) {
if (isFollowerOutOfSync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.concurrent.Scheduler;

import org.slf4j.Logger;
Expand Down Expand Up @@ -181,6 +182,8 @@ public class ReplicaManager {
// for metrics
private final TabletServerMetricGroup serverMetricGroup;

private final Clock clock;

public ReplicaManager(
Configuration conf,
Scheduler scheduler,
Expand All @@ -193,7 +196,8 @@ public ReplicaManager(
CoordinatorGateway coordinatorGateway,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup)
TabletServerMetricGroup serverMetricGroup,
Clock clock)
throws IOException {
this(
conf,
Expand All @@ -208,7 +212,8 @@ public ReplicaManager(
completedKvSnapshotCommitter,
fatalErrorHandler,
serverMetricGroup,
new RemoteLogManager(conf, zkClient, coordinatorGateway));
new RemoteLogManager(conf, zkClient, coordinatorGateway),
clock);
}

@VisibleForTesting
Expand All @@ -225,7 +230,8 @@ public ReplicaManager(
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
FatalErrorHandler fatalErrorHandler,
TabletServerMetricGroup serverMetricGroup,
RemoteLogManager remoteLogManager)
RemoteLogManager remoteLogManager,
Clock clock)
throws IOException {
this.conf = conf;
this.zkClient = zkClient;
Expand Down Expand Up @@ -262,6 +268,7 @@ public ReplicaManager(
zkClient, completedKvSnapshotCommitter, kvSnapshotResource, conf);
this.remoteLogManager = remoteLogManager;
this.serverMetricGroup = serverMetricGroup;
this.clock = clock;
registerMetrics();
}

Expand Down Expand Up @@ -1446,7 +1453,8 @@ protected Optional<Replica> maybeCreateReplica(NotifyLeaderAndIsrData data) {
metadataCache,
fatalErrorHandler,
bucketMetricGroup,
getTableDescriptor(tablePath, zkClient, schema));
getTableDescriptor(tablePath, zkClient, schema),
clock);
allReplicas.put(tb, new OnlineReplica(replica));
replicaOpt = Optional.of(replica);
} else if (hostedReplica instanceof OnlineReplica) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ protected void startServices() throws Exception {
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();

this.logManager =
LogManager.create(conf, zkClient, scheduler, SystemClock.getInstance());
SystemClock systemClock = SystemClock.getInstance();
this.logManager = LogManager.create(conf, zkClient, scheduler, systemClock);
logManager.startup();

this.kvManager = KvManager.create(conf, zkClient, logManager);
Expand Down Expand Up @@ -188,7 +188,8 @@ protected void startServices() throws Exception {
coordinatorGateway,
DefaultCompletedKvSnapshotCommitter.create(rpcClient, metadataCache),
this,
tabletServerMetricGroup);
tabletServerMetricGroup,
systemClock);
replicaManager.startup();

this.tabletService =
Expand Down
Loading