Skip to content

Commit

Permalink
[BugFix] fix query timeout behavior when using query queue (backport #…
Browse files Browse the repository at this point in the history
…53677) (#53726)

Co-authored-by: Murphy <[email protected]>
  • Loading branch information
mergify[bot] and murphyatwork authored Dec 11, 2024
1 parent 68cccf3 commit d808a56
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.qe;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -574,6 +575,12 @@ public void setStartTime() {
returnRows = 0;
}

@VisibleForTesting
public void setStartTime(Instant start) {
startTime = start;
returnRows = 0;
}

public void updateReturnRows(int returnRows) {
this.returnRows += returnRows;
}
Expand Down
15 changes: 10 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryQueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U
MetricRepo.COUNTER_QUERY_QUEUE_TOTAL.increase(1L);
ResourceGroupMetricMgr.increaseQueuedQuery(context, 1L);

long timeoutMs = slotRequirement.getExpiredPendingTimeMs();
long deadlineEpochMs = slotRequirement.getExpiredPendingTimeMs();
LogicalSlot allocatedSlot = null;
while (allocatedSlot == null) {
// Check timeout.
long currentMs = System.currentTimeMillis();
if (currentMs >= timeoutMs) {
if (slotRequirement.isPendingTimeout()) {
MetricRepo.COUNTER_QUERY_QUEUE_TIMEOUT.increase(1L);
slotProvider.cancelSlotRequirement(slotRequirement);
String errMsg = String.format(PENDING_TIMEOUT_ERROR_MSG_FORMAT,
Expand All @@ -84,7 +84,7 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U

// Wait for slot allocated.
try {
allocatedSlot = slotFuture.get(timeoutMs - currentMs, TimeUnit.MILLISECONDS);
allocatedSlot = slotFuture.get(deadlineEpochMs - currentMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
LOG.warn("[Slot] failed to allocate resource to query [slot={}]", slotRequirement, e);
if (e.getCause() instanceof RecoverableException) {
Expand All @@ -94,7 +94,12 @@ public void maybeWait(ConnectContext context, DefaultCoordinator coord) throws U
} catch (TimeoutException e) {
// Check timeout in the next loop.
} catch (CancellationException e) {
throw new UserException("Cancelled");
// There are two threads checking timeout, one is current thread, the other is CheckTimer.
// So this thread can get be cancelled by CheckTimer
if (slotRequirement.isPendingTimeout()) {
continue;
}
throw new UserException("Cancelled", e);
}
}
} finally {
Expand All @@ -117,7 +122,7 @@ private LogicalSlot createSlot(ConnectContext context, DefaultCoordinator coord)
TWorkGroup group = coord.getJobSpec().getResourceGroup();
long groupId = group == null ? LogicalSlot.ABSENT_GROUP_ID : group.getId();

long nowMs = System.currentTimeMillis();
long nowMs = context.getStartTime();
long queryTimeoutSecond = coord.getJobSpec().getQueryOptions().getQuery_timeout();
long expiredPendingTimeMs =
nowMs + Math.min(GlobalVariable.getQueryQueuePendingTimeoutSecond(), queryTimeoutSecond) * 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public long getExpiredAllocatedTimeMs() {
return expiredAllocatedTimeMs;
}

public boolean isPendingExpired(long nowMs) {
return nowMs >= expiredPendingTimeMs;
public boolean isPendingTimeout() {
return System.currentTimeMillis() >= expiredPendingTimeMs;
}

public boolean isAllocatedExpired(long nowMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@
import org.junit.Ignore;
import org.junit.Test;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -95,6 +98,7 @@ public class QueryQueueManagerTest extends SchedulerTestBase {
private static final int ABSENT_MAX_CPU_CORES = -1;

private final QueryQueueManager manager = QueryQueueManager.getInstance();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private final Map<Long, ResourceGroup> mockedGroups = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -146,6 +150,8 @@ public SlotManager getSlotManager() {
};

MetricRepo.COUNTER_QUERY_QUEUE_PENDING.increase(-MetricRepo.COUNTER_QUERY_QUEUE_PENDING.getValue());

connectContext.setStartTime();
}

@After
Expand Down Expand Up @@ -612,6 +618,31 @@ public TReleaseSlotResponse releaseSlot(TReleaseSlotRequest request) throws TExc
Assert.assertThrows("pending timeout", UserException.class, () -> manager.maybeWait(connectContext, coord));
mockFrontendService(new MockFrontendServiceClient());
}
{
// 2.4 timeout by CheckTimer
Instant fakeStart = Instant.now().minusSeconds(3);
connectContext.setStartTime(fakeStart);
DefaultCoordinator coord =
getSchedulerWithQueryId("select /*+SET_VAR(query_timeout=5)*/ count(1) from lineitem");

// cancel the execution to simulate timeout
new MockUp<LogicalSlot>() {
private boolean first = true;

@Mock
public boolean isPendingTimeout() {
boolean res = !first;
first = false;
return res;
}
};
scheduler.schedule(() -> {
coord.cancel("simulate timeout");
}, 1, TimeUnit.SECONDS);

Assert.assertThrows("pending timeout", UserException.class,
() -> manager.maybeWait(connectContext, coord));
}

// 3. Finish the first `concurrencyLimit` non-group queries.
runningCoords.forEach(DefaultCoordinator::onFinished);
Expand Down

0 comments on commit d808a56

Please sign in to comment.