diff --git a/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/ContactRawSupport.scala b/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/ContactRawSupport.scala index 1bd5bb66..58d74b3e 100644 --- a/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/ContactRawSupport.scala +++ b/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/ContactRawSupport.scala @@ -25,50 +25,53 @@ trait ContactRawSupport { * Contact * */ - override def contactAlias(contactId: String): String = { + override def contactAlias(contactId: String): Future[String] = { val request = Contact.ContactAliasRequest.newBuilder() .setId(contactId) .build() - val response = grpcClient.contactAlias(request) - response.getAlias.getValue + asyncCallback(PuppetGrpc.getContactAliasMethod,request){response=> + response.getAlias.getValue + } } - override def contactAlias(contactId: String, alias: String): Unit = { + override def contactAlias(contactId: String, alias: String): Future[Unit] = { val stringValue = StringValue.newBuilder().setValue(alias).build() val request = Contact.ContactAliasRequest.newBuilder() .setId(contactId) .setAlias(stringValue) .build() - grpcClient.contactAlias(request) + asyncCallback(PuppetGrpc.getContactAliasMethod,request){reponse=> } } - override def contactList(): Array[String] = { + override def contactList(): Future[Array[String]] = { val request = Contact.ContactListRequest.newBuilder().build() - - val response = grpcClient.contactList(request) - response.getIdsList.toArray(Array[String]()) + asyncCallback(PuppetGrpc.getContactListMethod,request){response=> + response.getIdsList.toArray(Array[String]()) + } } - override def contactAvatar(contactId: String): ResourceBox = { + override def contactAvatar(contactId: String): Future[ResourceBox] = { val request = Contact.ContactAvatarRequest.newBuilder() .setId(contactId) .build() - val response = grpcClient.contactAvatar(request) - val filebox = response.getFilebox.getValue - val root = Puppet.objectMapper.readTree(filebox) - val boxType = ResourceBoxType.apply(root.get("boxType").asInt()) - boxType match{ - case ResourceBox.ResourceBoxType.Url => - ResourceBox.fromUrl(root.get("remoteUrl").asText()) - case ResourceBox.ResourceBoxType.Base64 => - ResourceBox.fromBase64(root.get("name").asText(),root.get("base64").asText()) - case other => - throw new UnsupportedOperationException(s"other ${other} type not supported!") + asyncCallback(PuppetGrpc.getContactAvatarMethod,request) { response => + val response = grpcClient.contactAvatar(request) + val filebox = response.getFilebox.getValue + val root = Puppet.objectMapper.readTree(filebox) + val boxType = ResourceBoxType.apply(root.get("boxType").asInt()) + boxType match { + case ResourceBox.ResourceBoxType.Url => + ResourceBox.fromUrl(root.get("remoteUrl").asText()) + case ResourceBox.ResourceBoxType.Base64 => + ResourceBox.fromBase64(root.get("name").asText(), root.get("base64").asText()) + case other => + throw new UnsupportedOperationException(s"other ${other} type not supported!") + } } } - override def contactAvatar(contactId: String, file: ResourceBox): ResourceBox = { + override def contactAvatar(contactId: String, file: ResourceBox): Future[ResourceBox] = { val toJsonString = file.toJson() val value = StringValue.newBuilder().setValue(toJsonString) @@ -78,9 +81,9 @@ trait ContactRawSupport { .setFilebox(value) .build() - grpcClient.contactAvatar(request) - - file + asyncCallback(PuppetGrpc.getContactAvatarMethod,request){response=> + file + } } override protected def contactRawPayload(contactID: String): Future[ContactPayload] = { diff --git a/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/GrpcSupport.scala b/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/GrpcSupport.scala index b6ae417e..e8251354 100644 --- a/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/GrpcSupport.scala +++ b/wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/GrpcSupport.scala @@ -140,7 +140,7 @@ trait GrpcSupport { } type ClientCallback[RespT,T]=RespT => T - protected def asyncCall[ReqT,RespT](call: MethodDescriptor[ReqT, RespT], req: ReqT): Unit = { + protected def asyncCall[ReqT,RespT](call: MethodDescriptor[ReqT, RespT], req: ReqT): Future[RespT]= { asyncCallback(call,req)(resp=> resp) } def asyncCallback[ReqT, RespT,T](callMethod: MethodDescriptor[ReqT, RespT], req: ReqT)(callback:ClientCallback[RespT,T]): Future[T]= { diff --git a/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala b/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala index 59cc05e6..e69c204e 100644 --- a/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala +++ b/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala @@ -6,6 +6,7 @@ import com.google.protobuf.ByteString import com.google.zxing.client.j2se.BufferedImageLuminanceSource import com.google.zxing.common.HybridBinarizer import com.google.zxing.{BinaryBitmap, DecodeHintType, MultiFormatReader} +import com.typesafe.scalalogging.LazyLogging import javax.imageio.ImageIO import wechaty.padplus.PuppetPadplus import wechaty.padplus.grpc.PadPlusServerOuterClass.{ApiType, ResponseType, StreamResponse} @@ -29,23 +30,18 @@ import scala.util.Try * @author Jun Tsai * @since 2020-06-21 */ -trait ContactRawSupport { +trait ContactRawSupport extends LazyLogging{ self: PuppetPadplus => - /** - * - * Contact - * - */ - override def contactAlias(contactId: String): String = ??? + override def contactAlias(contactId: String): Future[String] = ??? - override def contactAlias(contactId: String, alias: String): Unit = ??? + override def contactAlias(contactId: String, alias: String): Future[Nothing] = ??? - override def contactAvatar(contactId: String): ResourceBox = ??? + override def contactAvatar(contactId: String): Future[ResourceBox] = ??? - override def contactAvatar(contactId: String, file: ResourceBox): ResourceBox = ??? + override def contactAvatar(contactId: String, file: ResourceBox): Future[ResourceBox] = ??? - override def contactList(): Array[String] = ??? + override def contactList(): Future[Array[String]] = ??? /** * contact diff --git a/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala b/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala index afe18ce7..6d82880f 100644 --- a/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala +++ b/wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala @@ -11,7 +11,8 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.model.{CannedAccessControlList, GeneratePresignedUrlRequest, ObjectMetadata, PutObjectRequest} import com.fasterxml.jackson.databind.JsonNode import com.typesafe.scalalogging.LazyLogging -import io.grpc.{ManagedChannel, ManagedChannelBuilder} +import io.grpc.stub.{ClientCalls, StreamObserver} +import io.grpc.{ManagedChannel, ManagedChannelBuilder, MethodDescriptor} import wechaty.padplus.PuppetPadplus import wechaty.padplus.grpc.PadPlusServerGrpc import wechaty.padplus.grpc.PadPlusServerOuterClass._ @@ -36,8 +37,8 @@ trait GrpcSupport { private val HEARTBEAT_COUNTER = new AtomicLong() private val HOSTIE_KEEPALIVE_TIMEOUT = 15 * 1000L private val DEFAULT_WATCHDOG_TIMEOUT = 60L - protected var grpcClient: PadPlusServerGrpc.PadPlusServerBlockingStub= _ - private var eventStream: PadPlusServerGrpc.PadPlusServerStub = _ +// protected var grpcClient: PadPlusServerGrpc.PadPlusServerBlockingStub= _ + private var asyncGrpcClient: PadPlusServerGrpc.PadPlusServerStub = _ protected var channel: ManagedChannel = _ protected implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global @@ -93,15 +94,15 @@ trait GrpcSupport { private def internalStartGrpc() { logger.info("start grpc client ....") - this.grpcClient = PadPlusServerGrpc.newBlockingStub(channel) +// this.grpcClient = PadPlusServerGrpc.newBlockingStub(channel) + this.asyncGrpcClient = PadPlusServerGrpc.newStub(channel) // startStream() logger.info("start grpc client done") } private[wechaty] def startStream() { - this.eventStream = PadPlusServerGrpc.newStub(channel) val initConfig = InitConfig.newBuilder().setToken(option.token.get).build() - this.eventStream.init(initConfig, this) + this.asyncGrpcClient.init(initConfig, this) } protected def stopGrpc(): Unit = { @@ -110,7 +111,7 @@ trait GrpcSupport { stopStream() //stop grpc client - this.grpcClient.request(RequestObject.newBuilder().setApiType(ApiType.CLOSE).setToken(option.token.get).build()) +// this.grpcClient.request(RequestObject.newBuilder().setApiType(ApiType.CLOSE).setToken(option.token.get).build()) this.channel.shutdownNow() } } @@ -175,18 +176,45 @@ trait GrpcSupport { case t if t =:= typeOf[Nothing] => case _ => CallbackHelper.pushCallbackToPool(traceId,callbackDelegate) } - val response = grpcClient.request(request.build()) - logger.debug(s"request $apiType response $response") - - if(response.getResult != "success"){ - //fail? - logger.error("fail to request grpc,response {}",response) - p.failure(new IllegalAccessException("fail to request ,grpc result:"+response)) + val future = asyncCall(PadPlusServerGrpc.getRequestMethod,request.build()) + future.flatMap{rep=> + if(rep.getResult != "success"){ + p.failure(new IllegalAccessException("fail to request ,grpc result:"+rep)) + } + p.future } - - p.future +// val response = grpcClient.request(request.build()) +// logger.debug(s"request $apiType response $response") +// +// if(response.getResult != "success"){ +// //fail? +// logger.error("fail to request grpc,response {}",response) +// p.failure(new IllegalAccessException("fail to request ,grpc result:"+response)) +// } +// +// p.future + } + type ClientCallback[RespT,T]=RespT => T + protected def asyncCall[ReqT,RespT](call: MethodDescriptor[ReqT, RespT], req: ReqT): Future[RespT]= { + asyncCallback(call,req)(resp => resp) } - private val ACCESS_KEY_ID = "AKIA3PQY2OQG5FEXWMH6" + def asyncCallback[ReqT, RespT,T](callMethod: MethodDescriptor[ReqT, RespT], req: ReqT)(callback:ClientCallback[RespT,T]): Future[T]= { + val call = channel.newCall(callMethod,asyncGrpcClient.getCallOptions) + val promise = Promise[T] + ClientCalls.asyncUnaryCall(call,req,new StreamObserver[RespT] { + override def onNext(value: RespT): Unit = { + val result = callback(value) + promise.success(result) + } + override def onError(t: Throwable): Unit = promise.failure(t) + override def onCompleted(): Unit = { + if(!promise.isCompleted) promise.failure(new IllegalStateException("server completed")) + } + }) + promise.future + } + + private val ACCESS_KEY_ID = "AKIA3PQY2OQG5FEXWMH6" private val BUCKET= "macpro-message-file" private val EXPIRE_TIME= 3600 * 24 * 3 private val PATH= "image-message" diff --git a/wechaty-puppet/src/main/scala/wechaty/puppet/support/ContactSupport.scala b/wechaty-puppet/src/main/scala/wechaty/puppet/support/ContactSupport.scala index 7cde449f..807d8856 100644 --- a/wechaty-puppet/src/main/scala/wechaty/puppet/support/ContactSupport.scala +++ b/wechaty-puppet/src/main/scala/wechaty/puppet/support/ContactSupport.scala @@ -23,14 +23,16 @@ trait ContactSupport { * Contact * */ - def contactAlias(contactId: String): String + protected def contactRawPayload(contactId: String): Future[ContactPayload] + + def contactAlias(contactId: String): Future[String] - def contactAlias(contactId: String, alias: String): Unit + def contactAlias(contactId: String, alias: String): Future[Unit] - def contactAvatar (contactId: String) : ResourceBox - def contactAvatar (contactId: String, file: ResourceBox) : ResourceBox + def contactAvatar (contactId: String) : Future[ResourceBox] + def contactAvatar (contactId: String, file: ResourceBox) : Future[ResourceBox] - def contactList(): Array[String] + def contactList(): Future[Array[String]] def contactPayload(contactId: String): Future[ContactPayload] = { if (Puppet.isBlank(contactId)) { @@ -61,9 +63,5 @@ trait ContactSupport { this.cacheContactPayload.invalidate(contactId) } - /** - * contact - */ - protected def contactRawPayload(contactId: String): Future[ContactPayload] } diff --git a/wechaty/src/main/scala/wechaty/user/Contact.scala b/wechaty/src/main/scala/wechaty/user/Contact.scala index 5ce443a1..08972a26 100644 --- a/wechaty/src/main/scala/wechaty/user/Contact.scala +++ b/wechaty/src/main/scala/wechaty/user/Contact.scala @@ -6,7 +6,7 @@ import wechaty.puppet.ResourceBox import wechaty.puppet.schemas.Contact.{ContactGender, ContactPayload, ContactType} import wechaty.puppet.schemas.Puppet -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.implicitConversions @@ -193,7 +193,7 @@ class Contact(contactId: String)(implicit resolver: PuppetResolver) extends Conv else this.payload.city } - def avatar: ResourceBox = { + def avatar: Future[ResourceBox] = { resolver.puppet.contactAvatar(this.id) }