Skip to content

Commit

Permalink
broadcast replState in SqlJob
Browse files Browse the repository at this point in the history
  • Loading branch information
jianzhenwu committed Jun 18, 2023
1 parent 5cbbdf9 commit 322af8e
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 4 deletions.
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/livy/JobContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ public interface JobContext {
* @throws Exception If SparkSession does not exist
*/
<E> E sparkSession() throws Exception;

/**
* Broadcast repl-state to clients.
* @param sessionState {@link org.apache.livy.sessions.SessionState}
*
*/
void broadcastReplState(String sessionState);
}
2 changes: 1 addition & 1 deletion rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public ReplCompleteRequest() {
}
}

protected static class ReplState {
public static class ReplState {

public final String state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.livy.JobContext;
import org.apache.livy.rsc.BaseProtocol;
import org.apache.livy.rsc.RSCConf;
import org.apache.livy.rsc.Utils;
import org.apache.livy.sessions.SessionState;

class JobContextImpl implements JobContext {

Expand Down Expand Up @@ -150,4 +152,9 @@ public void addFile(String path) {
public void addJarOrPyFile(String path) throws Exception {
driver.addJarOrPyFile(path);
}

@Override
public void broadcastReplState(String sessionState) {
driver.broadcast(new BaseProtocol.ReplState(SessionState.apply(sessionState).state()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ <T> void jobFinished(String jobId, T result, Throwable error) {

void jobStarted(String jobId) {
broadcast(new JobStarted(jobId));
broadcast(new ReplState(SessionState.Busy$.MODULE$.state()));
}

public void idleIfActiveJobsEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public SqlJob(
@Override
public Void call(JobContext ctx) throws Exception {
ctx.sc().setJobGroup(statementId, statement);
ctx.broadcastReplState("busy");
try {
executeSql(ctx);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -204,12 +205,13 @@ public void testSessionState() throws Exception {
waitFor(new RegisterSessionJob(s1));
await().atMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> rscClient.getReplStateChangedHistoryInTest()
.equals(Arrays.asList("busy", "idle")));
.equals(Collections.singletonList("idle")));

waitFor(newSqlJob(s1, st1, "select 1"));
await().atMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> rscClient.getReplStateChangedHistoryInTest()
.equals(Arrays.asList("busy", "idle", "busy", "idle")));
.equals(Arrays.asList("idle", "busy", "idle")));

// Tear down the session.
waitFor(new UnregisterSessionJob(s1));
rscClient.setTest(false);
Expand Down

0 comments on commit 322af8e

Please sign in to comment.