From 165e29fc52b20eb43b83cd51170a1941d2e73e9e Mon Sep 17 00:00:00 2001 From: patelh Date: Wed, 26 Feb 2020 23:03:58 -0800 Subject: [PATCH] Log zk retry, reduce default to 10 --- app/kafka/manager/base/CuratorAwareActor.scala | 12 +++++++++++- app/kafka/manager/model/model.scala | 4 ++-- build.sbt | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/app/kafka/manager/base/CuratorAwareActor.scala b/app/kafka/manager/base/CuratorAwareActor.scala index 6cbaabc7e..4afb4e092 100644 --- a/app/kafka/manager/base/CuratorAwareActor.scala +++ b/app/kafka/manager/base/CuratorAwareActor.scala @@ -5,12 +5,22 @@ package kafka.manager.base +import akka.actor.ActorLogging import kafka.manager.model.CuratorConfig +import org.apache.curator.RetrySleeper import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.BoundedExponentialBackoffRetry import scala.util.Try +class LoggingRetryPolicy(curatorConfig: CuratorConfig, actorLogging: ActorLogging + ) extends BoundedExponentialBackoffRetry(curatorConfig.baseSleepTimeMs + , curatorConfig.maxSleepTimeMs, curatorConfig.zkMaxRetry) { + override def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper: RetrySleeper): Boolean = { + actorLogging.log.info(s"retryCount=$retryCount maxRetries=${curatorConfig.zkMaxRetry} zkConnect=${curatorConfig.zkConnect}") + super.allowRetry(retryCount, elapsedTimeMs, sleeper) + } +} trait CuratorAwareActor extends BaseActor { @@ -23,7 +33,7 @@ trait CuratorAwareActor extends BaseActor { protected def getCurator(config: CuratorConfig) : CuratorFramework = { val curator: CuratorFramework = CuratorFrameworkFactory.newClient( config.zkConnect, - new BoundedExponentialBackoffRetry(config.baseSleepTimeMs, config.maxSleepTimeMs, config.zkMaxRetry)) + new LoggingRetryPolicy(config, this)) curator } diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index b7189bb0e..a7dd4a7c1 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -16,7 +16,7 @@ import scalaz.Validation.FlatMap._ /** * @author hiral */ -case class CuratorConfig(zkConnect: String, zkMaxRetry: Int = 100, baseSleepTimeMs : Int = 100, maxSleepTimeMs: Int = 1000) +case class CuratorConfig(zkConnect: String, zkMaxRetry: Int = 10, baseSleepTimeMs : Int = 100, maxSleepTimeMs: Int = 1000) sealed trait KafkaVersion case object Kafka_0_8_1_1 extends KafkaVersion { @@ -185,7 +185,7 @@ object ClusterConfig { def apply(name: String , version : String , zkHosts: String - , zkMaxRetry: Int = 100 + , zkMaxRetry: Int = 10 , jmxEnabled: Boolean , jmxUser: Option[String] , jmxPass: Option[String] diff --git a/build.sbt b/build.sbt index 74c2f48bb..c7873b62d 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ name := """cmak""" /* For packaging purposes, -SNAPSHOT MUST contain a digit */ -version := "3.0.0.1" +version := "3.0.0.2" scalaVersion := "2.12.10"