Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix java language level issues #457

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions api/src/main/java/org/apache/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,8 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException {
for (String file : confFiles) {
URL url = classLoader().getResource(file);
if (url != null) {
Reader r = new InputStreamReader(url.openStream(), UTF_8);
try {
try (Reader r = new InputStreamReader(url.openStream(), UTF_8)) {
config.load(r);
} finally {
r.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.livy.client.common;

import java.io.ByteArrayOutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;

import com.esotericsoftware.kryo.Kryo;
Expand All @@ -41,23 +42,20 @@ public class Serializer {
private final ThreadLocal<Kryo> kryos;

public Serializer(final Class<?>... klasses) {
this.kryos = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
int count = 0;
for (Class<?> klass : klasses) {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(
new StdInstantiatorStrategy()));
kryo.register(java.lang.invoke.SerializedLambda.class);
kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
this.kryos = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
int count = 0;
for (Class<?> klass : klasses) {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
};
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(
new StdInstantiatorStrategy()));
kryo.register(SerializedLambda.class);
kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
});
}

public Object deserialize(ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,10 @@ class HttpClient implements LivyClient {

// Because we only have one connection to the server, we don't need more than a single
// threaded executor here.
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "HttpClient-" + sessionId);
t.setDaemon(true);
return t;
}
this.executor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "HttpClient-" + sessionId);
t.setDaemon(true);
return t;
});

this.serializer = new Serializer();
Expand Down Expand Up @@ -146,31 +143,25 @@ public Future<?> addFile(URI uri) {
}

private Future<?> uploadResource(final File file, final String command, final String paramName) {
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
return null;
}
Callable<Void> task = () -> {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
return null;
};
return executor.submit(task);
}

private Future<?> addResource(final String command, final URI resource) {
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
ClientMessage msg = new AddResource(resource.toString());
conn.post(msg, Void.class, "/%d/%s", sessionId, command);
return null;
}
Callable<Void> task = () -> {
ClientMessage msg = new AddResource(resource.toString());
conn.post(msg, Void.class, "/%d/%s", sessionId, command);
return null;
};
return executor.submit(task);
}

private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) {
final ByteBuffer serializedJob = serializer.serialize(job);
JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer);
JobHandleImpl<T> handle = new JobHandleImpl<>(config, conn, sessionId, executor, serializer);
handle.start(command, serializedJob);
return handle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ boolean isSpnegoEnabled() {
// Maps deprecated key to DeprecatedConf with the same key.
// There are no deprecated configs without alternatives currently.
private static final Map<String, DeprecatedConf> deprecatedConfigs
= Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>());
= Collections.unmodifiableMap(new HashMap<>());

protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
return configsWithAlternatives;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,32 @@ protected Throwable error() {
}

void start(final String command, final ByteBuffer serializedJob) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark");
JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command);

if (isCancelPending) {
sendCancelRequest(status.id);
}

jobId = status.id;
Runnable task = () -> {
try {
ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark");
JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command);

pollTask = executor.schedule(new JobPollTask(initialPollInterval),
initialPollInterval, TimeUnit.MILLISECONDS);
} catch (Exception e) {
setResult(null, e, State.FAILED);
if (isCancelPending) {
sendCancelRequest(status.id);
}

jobId = status.id;

pollTask = executor.schedule(new JobPollTask(initialPollInterval),
initialPollInterval, TimeUnit.MILLISECONDS);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
};
executor.submit(task);
}

private void sendCancelRequest(final long id) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
executor.submit(() -> {
try {
conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
});
}
Expand Down
94 changes: 34 additions & 60 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,7 @@ public void onFailure(Throwable error) throws Exception {

// Set up a timeout to fail the promise if we don't hear back from the context
// after a configurable timeout.
Runnable timeoutTask = new Runnable() {
@Override
public void run() {
connectTimeout(handler);
}
};
Runnable timeoutTask = () -> connectTimeout(handler);
this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (Exception e) {
Expand Down Expand Up @@ -226,14 +221,11 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
} else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
// Mostly for testing things quickly. Do not do this in production.
LOG.warn("!!!! Running remote driver in-process. !!!!");
Runnable child = new Runnable() {
@Override
public void run() {
try {
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
throw Utils.propagate(e);
}
Runnable child = () -> {
try {
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
throw Utils.propagate(e);
}
};
return new ChildProcess(conf, promise, child, confFile);
Expand Down Expand Up @@ -286,11 +278,8 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf");
if (sparkDefaults.isFile()) {
Properties sparkConf = new Properties();
Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8);
try {
try (Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8)) {
sparkConf.load(r);
} finally {
r.close();
}

for (String key : sparkConf.stringPropertyNames()) {
Expand All @@ -305,11 +294,8 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
//file.deleteOnExit();

Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
try {
try (Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8)) {
confView.store(writer, "Livy App Context Configuration");
} finally {
writer.close();
}

return file;
Expand Down Expand Up @@ -352,12 +338,9 @@ private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
LOG.warn("Connection established but promise is already finalized.");
}

ctx.executor().submit(new Runnable() {
@Override
public void run() {
dispose();
ContextLauncher.this.dispose(false);
}
ctx.executor().submit(() -> {
dispose();
ContextLauncher.this.dispose(false);
});
}

Expand Down Expand Up @@ -386,25 +369,22 @@ public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, F
this.child = childProc;
this.confFile = confFile;

Runnable monitorTask = new Runnable() {
@Override
public void run() {
try {
RSCClientFactory.childProcesses().incrementAndGet();
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
} finally {
RSCClientFactory.childProcesses().decrementAndGet();
Runnable monitorTask = () -> {
try {
RSCClientFactory.childProcesses().incrementAndGet();
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
} finally {
RSCClientFactory.childProcesses().decrementAndGet();
}
};
this.monitor = monitor(monitorTask, childId);
Expand Down Expand Up @@ -441,25 +421,19 @@ public void detach() {
}

private Thread monitor(final Runnable task, int childId) {
Runnable wrappedTask = new Runnable() {
@Override
public void run() {
try {
task.run();
} finally {
confFile.delete();
}
Runnable wrappedTask = () -> {
try {
task.run();
} finally {
confFile.delete();
}
};
Thread thread = new Thread(wrappedTask);
thread.setDaemon(true);
thread.setName("ContextLauncher-" + childId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Child task threw exception.", e);
fail(e);
}
thread.setUncaughtExceptionHandler((t, e) -> {
LOG.warn("Child task threw exception.", e);
fail(e);
});
thread.start();
return thread;
Expand Down
19 changes: 8 additions & 11 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ private class ClientProtocol extends BaseProtocol {

<T> JobHandleImpl<T> submit(Job<T> job) {
final String jobId = UUID.randomUUID().toString();
Object msg = new JobRequest<T>(jobId, job);
Object msg = new JobRequest<>(jobId, job);

final Promise<T> promise = eventLoopGroup.next().newPromise();
final JobHandleImpl<T> handle = new JobHandleImpl<T>(RSCClient.this,
final JobHandleImpl<T> handle = new JobHandleImpl<>(RSCClient.this,
promise, jobId);
jobs.put(jobId, handle);

Expand All @@ -358,15 +358,12 @@ public void onFailure(Throwable error) throws Exception {
promise.tryFailure(error);
}
});
promise.addListener(new GenericFutureListener<Promise<T>>() {
@Override
public void operationComplete(Promise<T> p) {
if (jobId != null) {
jobs.remove(jobId);
}
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
promise.addListener((GenericFutureListener<Promise<T>>) p -> {
if (jobId != null) {
jobs.remove(jobId);
}
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
});
return handle;
Expand Down
2 changes: 1 addition & 1 deletion rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public String findLocalAddress() throws IOException {
// Maps deprecated key to DeprecatedConf with the same key.
// There are no deprecated configs without alternatives currently.
private static final Map<String, DeprecatedConf> deprecatedConfigs
= Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>());
= Collections.unmodifiableMap(new HashMap<>());

protected Map<String, DeprecatedConf> getConfigsWithAlternatives() {
return configsWithAlternatives;
Expand Down
13 changes: 5 additions & 8 deletions rsc/src/main/java/org/apache/livy/rsc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,11 @@ public static String stackTraceAsString(Throwable t) {
}

public static <T> void addListener(Future<T> future, final FutureListener<T> lsnr) {
future.addListener(new GenericFutureListener<Future<T>>() {
@Override
public void operationComplete(Future<T> f) throws Exception {
if (f.isSuccess()) {
lsnr.onSuccess(f.get());
} else {
lsnr.onFailure(f.cause());
}
future.addListener((GenericFutureListener<Future<T>>) f -> {
if (f.isSuccess()) {
lsnr.onSuccess(f.get());
} else {
lsnr.onFailure(f.cause());
}
});
}
Expand Down
Loading