Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-971] Support to get session variables when using JDBC to connect to Livy thrift server. #390

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.livy.test

import java.sql.Date

import org.apache.livy.sessions.SessionState
import org.apache.livy.test.framework.BaseThriftIntegrationTestSuite

class JdbcIT extends BaseThriftIntegrationTestSuite {
Expand Down Expand Up @@ -61,6 +62,14 @@ class JdbcIT extends BaseThriftIntegrationTestSuite {
assert(resultSet.getString(3) == "{1:\"a\",2:\"b\"}")
assert(!resultSet.next())
}

checkQuery(c, "DESC LIVY SESSION") { resultSet =>
resultSet.next()
assert(resultSet.getString("id").toInt >= 0)
assert(resultSet.getString("appId").startsWith("application_"))
assert(resultSet.getString("state").nonEmpty)
assert(resultSet.getString("logs").contains(resultSet.getString("appId")))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class SparkYarnApp private[utils] (
// We cannot kill the YARN app without the app id.
// There's a chance the YARN app hasn't been submitted during a livy-server failure.
// We don't want a stuck session that can't be deleted. Emit a warning and move on.
case _: TimeoutException | _: InterruptedException =>
case _: TimeoutException | _: InterruptedException | _: IllegalStateException =>
warn("Deleting a session while its YARN application is not found.")
yarnAppMonitorThread.interrupt()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.livy.thriftserver

import java.util
import java.util.{Map => JMap}
import java.util.{Locale, Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
Expand Down Expand Up @@ -106,6 +106,14 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
op
}

def newDescLivySessionOperation(sessionHandle: SessionHandle,
statement: String, sessionManager: LivyThriftSessionManager): Operation = {
val op = new DescLivySessionOperation(sessionHandle, sessionManager)
addOperation(op, sessionHandle)
debug(s"Create DescLivySessionOperation for $statement with session=$sessionHandle")
op
}

def getOperationLogRowSet(
opHandle: OperationHandle,
orientation: FetchOrientation,
Expand Down Expand Up @@ -136,9 +144,15 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
confOverlay: util.Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle = {
executeOperation(sessionHandle, {
newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout)
})
info(s"execute statement $statement")
val descLivySessionRegex = "^(DESC|DESCRIBE)\\s+LIVY\\s+SESSION$".r
val operationCreator = statement.trim.toUpperCase(Locale.ENGLISH) match {
case descLivySessionRegex(_*) =>
newDescLivySessionOperation(sessionHandle, statement, livyThriftSessionManager)
case _ =>
newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout)
}
executeOperation(sessionHandle, operationCreator)
}

@throws[HiveSQLException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,15 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
delegationToken: String): SessionHandle = {
val sessionHandle = new SessionHandle(protocol)
incrementConnections(username, ipAddress, SessionInfo.getForwardedAddresses)
val nextId = server.livySessionManager.nextId()
sessionInfo.put(sessionHandle,
new SessionInfo(username, ipAddress, SessionInfo.getForwardedAddresses, protocol))
new SessionInfo(nextId, username, ipAddress, SessionInfo.getForwardedAddresses, protocol))
val (initStatements, createInteractiveRequest, sessionId) =
LivyThriftSessionManager.processSessionConf(sessionConf, supportUseDatabase)
val createLivySession = () => {
createInteractiveRequest.kind = Spark
val newSession = InteractiveSession.create(
server.livySessionManager.nextId(),
nextId,
createInteractiveRequest.name,
username,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.livy.Logging

case class SessionInfo(username: String,
case class SessionInfo(
sessionId: Int,
username: String,
ipAddress: String,
forwardedAddresses: util.List[String],
protocolVersion: TProtocolVersion) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.thriftserver.operation

import org.apache.hive.service.cli.{FetchOrientation, OperationState, OperationType, SessionHandle}

import org.apache.livy.thriftserver.LivyThriftSessionManager
import org.apache.livy.thriftserver.serde.ThriftResultSet
import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}

class DescLivySessionOperation(sessionHandle: SessionHandle,
sessionManager: LivyThriftSessionManager)
extends Operation(sessionHandle, OperationType.EXECUTE_STATEMENT) {

private var hasNext: Boolean = true

override protected def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
setState(OperationState.RUNNING)
setState(OperationState.FINISHED)
}

override def cancel(stateAfterCancel: OperationState): Unit = {
setState(OperationState.CANCELED)
}

override def close(): Unit = {
setState(OperationState.CLOSED)
}

override def getResultSetSchema: Schema = {
assertState(Seq(OperationState.FINISHED))
DescLivySessionOperation.SCHEMA
}

override def getNextRowSet(orientation: FetchOrientation,
maxRows: Long): ThriftResultSet = {
validateFetchOrientation(orientation)
assertState(Seq(OperationState.FINISHED))
setHasResultSet(true)

val sessionVar = ThriftResultSet(this.getResultSetSchema,
sessionManager.getSessionInfo(sessionHandle).protocolVersion)

val session = try {
sessionManager.getLivySession(sessionHandle)
} catch {
case e: Exception =>
val sessionInfo = sessionManager.getSessionInfo(sessionHandle)
if (sessionManager.server.livySessionManager.get(sessionInfo.sessionId).isDefined) {
sessionManager.server.livySessionManager.get(sessionInfo.sessionId).get
} else {
error(s"Can't find session which id is ${sessionInfo.sessionId} in sessionManager.")
throw e
}
}

if (hasNext) {
sessionVar.addRow(
Array(
session.id.toString,
session.appId.orNull,
session.state.state,
session.logLines().mkString("\n")
)
)
hasNext = false
}
sessionVar
}
}

object DescLivySessionOperation {
val SCHEMA: Schema = Schema(
Field("id", BasicDataType("string"), "Livy session id."),
Field("appId", BasicDataType("string"), "Spark application id."),
Field("state", BasicDataType("string"), "Livy session state"),
Field("logs", BasicDataType("string"), "Spark application logs.")
)
}