Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix query timeout behavior when using query queue (backport #53677) #53726

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.qe;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -573,6 +574,12 @@ public void setStartTime() {
returnRows = 0;
}

@VisibleForTesting
public void setStartTime(Instant start) {
startTime = start;
returnRows = 0;
}

public void updateReturnRows(int returnRows) {
this.returnRows += returnRows;
}
Expand Down
15 changes: 10 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryQueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U
MetricRepo.COUNTER_QUERY_QUEUE_TOTAL.increase(1L);
ResourceGroupMetricMgr.increaseQueuedQuery(context, 1L);

long timeoutMs = slotRequirement.getExpiredPendingTimeMs();
long deadlineEpochMs = slotRequirement.getExpiredPendingTimeMs();
LogicalSlot allocatedSlot = null;
while (allocatedSlot == null) {
// Check timeout.
long currentMs = System.currentTimeMillis();
if (currentMs >= timeoutMs) {
if (slotRequirement.isPendingTimeout()) {
MetricRepo.COUNTER_QUERY_QUEUE_TIMEOUT.increase(1L);
slotProvider.cancelSlotRequirement(slotRequirement);
String errMsg = String.format(PENDING_TIMEOUT_ERROR_MSG_FORMAT,
Expand All @@ -84,7 +84,7 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U

// Wait for slot allocated.
try {
allocatedSlot = slotFuture.get(timeoutMs - currentMs, TimeUnit.MILLISECONDS);
allocatedSlot = slotFuture.get(deadlineEpochMs - currentMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
LOG.warn("[Slot] failed to allocate resource to query [slot={}]", slotRequirement, e);
if (e.getCause() instanceof RecoverableException) {
Expand All @@ -94,7 +94,12 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U
} catch (TimeoutException e) {
// Check timeout in the next loop.
} catch (CancellationException e) {
throw new UserException("Cancelled");
// There are two threads checking timeout, one is current thread, the other is CheckTimer.
// So this thread can get be cancelled by CheckTimer
if (slotRequirement.isPendingTimeout()) {
continue;
}
throw new UserException("Cancelled", e);
}
}
} finally {
Expand All @@ -117,7 +122,7 @@ private LogicalSlot createSlot(ConnectContext context, DefaultCoordinator coord)
TWorkGroup group = coord.getJobSpec().getResourceGroup();
long groupId = group == null ? LogicalSlot.ABSENT_GROUP_ID : group.getId();

long nowMs = System.currentTimeMillis();
long nowMs = context.getStartTime();
long queryTimeoutSecond = coord.getJobSpec().getQueryOptions().getQuery_timeout();
long expiredPendingTimeMs =
nowMs + Math.min(GlobalVariable.getQueryQueuePendingTimeoutSecond(), queryTimeoutSecond) * 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public long getExpiredAllocatedTimeMs() {
return expiredAllocatedTimeMs;
}

public boolean isPendingExpired(long nowMs) {
return nowMs >= expiredPendingTimeMs;
public boolean isPendingTimeout() {
return System.currentTimeMillis() >= expiredPendingTimeMs;
}

public boolean isAllocatedExpired(long nowMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@
import org.junit.Ignore;
import org.junit.Test;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -95,6 +98,7 @@ public class QueryQueueManagerTest extends SchedulerTestBase {
private static final int ABSENT_MAX_CPU_CORES = -1;

private final QueryQueueManager manager = QueryQueueManager.getInstance();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private final Map<Long, ResourceGroup> mockedGroups = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -146,6 +150,8 @@ public SlotManager getSlotManager() {
};

MetricRepo.COUNTER_QUERY_QUEUE_PENDING.increase(-MetricRepo.COUNTER_QUERY_QUEUE_PENDING.getValue());

connectContext.setStartTime();
}

@After
Expand Down Expand Up @@ -612,6 +618,31 @@ public TReleaseSlotResponse releaseSlot(TReleaseSlotRequest request) throws TExc
Assert.assertThrows("pending timeout", UserException.class, () -> manager.maybeWait(connectContext, coord));
mockFrontendService(new MockFrontendServiceClient());
}
{
// 2.4 timeout by CheckTimer
Instant fakeStart = Instant.now().minusSeconds(3);
connectContext.setStartTime(fakeStart);
DefaultCoordinator coord =
getSchedulerWithQueryId("select /*+SET_VAR(query_timeout=5)*/ count(1) from lineitem");

// cancel the execution to simulate timeout
new MockUp<LogicalSlot>() {
private boolean first = true;

@Mock
public boolean isPendingTimeout() {
boolean res = !first;
first = false;
return res;
}
};
scheduler.schedule(() -> {
coord.cancel("simulate timeout");
}, 1, TimeUnit.SECONDS);

Assert.assertThrows("pending timeout", UserException.class,
() -> manager.maybeWait(connectContext, coord));
}

// 3. Finish the first `concurrencyLimit` non-group queries.
runningCoords.forEach(DefaultCoordinator::onFinished);
Expand Down
Loading