diff --git a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt index 78205b7c5cde..d869e6543e86 100644 --- a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt +++ b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt @@ -45,6 +45,7 @@ import javax.net.ssl.SSLSocket import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager import javax.net.ssl.X509TrustManager +import kotlin.time.Duration.Companion.milliseconds import mockwebserver3.SocketPolicy.DisconnectAfterRequest import mockwebserver3.SocketPolicy.DisconnectAtEnd import mockwebserver3.SocketPolicy.DisconnectAtStart @@ -374,6 +375,12 @@ class MockWebServer : Closeable { @Throws(Exception::class) private fun acceptConnections() { while (true) { + val socketPolicy = dispatcher.peek().socketPolicy + + if (socketPolicy is SocketPolicy.DelayAccept) { + Thread.sleep(100.milliseconds.inWholeMilliseconds) + } + val socket: Socket try { socket = serverSocket!!.accept() @@ -382,7 +389,6 @@ class MockWebServer : Closeable { return } - val socketPolicy = dispatcher.peek().socketPolicy if (socketPolicy === DisconnectAtStart) { dispatchBookkeepingRequest(0, socket) socket.close() diff --git a/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt b/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt index f74df36c906b..be24ff0f0936 100644 --- a/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt +++ b/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt @@ -16,6 +16,7 @@ package mockwebserver3 +import kotlin.time.Duration import okhttp3.ExperimentalOkHttpApi /** @@ -59,6 +60,11 @@ sealed interface SocketPolicy { */ object DisconnectAtStart : SocketPolicy + /** + * Delay before accepting on the ServerSocket. + */ + class DelayAccept(val delay: Duration) : SocketPolicy + /** * Close connection after reading the request but before writing the response. Use this to * simulate late connection pool failures. diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt index 275d09f6a0a7..8d222346e88e 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt @@ -25,9 +25,9 @@ import java.util.logging.Level import java.util.logging.LogManager import java.util.logging.LogRecord import java.util.logging.Logger -import kotlin.concurrent.withLock import okhttp3.internal.buildConnectionPool import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.connection.RealConnectionPool import okhttp3.internal.http2.Http2 import okhttp3.internal.taskRunnerInternal @@ -234,7 +234,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback { // a test timeout failure. val waitTime = (entryTime + 1_000_000_000L - System.nanoTime()) if (!queue.idleLatch().await(waitTime, TimeUnit.NANOSECONDS)) { - TaskRunner.INSTANCE.lock.withLock { + TaskRunner.INSTANCE.withLock { TaskRunner.INSTANCE.cancelAll() } fail("Queue still active after 1000 ms") diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index 88dfd7936742..b90fba17e258 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -13,6 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + package okhttp3.internal.concurrent import assertk.assertThat @@ -23,9 +25,9 @@ import java.util.concurrent.BlockingQueue import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.logging.Logger -import kotlin.concurrent.withLock import okhttp3.OkHttpClient import okhttp3.TestUtil.threadFactory +import okhttp3.internal.connection.Locks.withLock /** * Runs a [TaskRunner] in a controlled environment so that everything is sequential and @@ -166,7 +168,7 @@ class TaskFaker : Closeable { fun advanceUntil(newTime: Long) { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { check(currentTask == TestThreadSerialTask) nanoTime = newTime yieldUntil(ResumePriority.AfterOtherTasks) @@ -177,7 +179,7 @@ class TaskFaker : Closeable { fun assertNoMoreTasks() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { assertThat(activeThreads).isEqualTo(0) } } @@ -207,7 +209,7 @@ class TaskFaker : Closeable { fun runNextTask() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { val contextSwitchCountBefore = contextSwitchCount yieldUntil(ResumePriority.BeforeOtherTasks) { contextSwitchCount > contextSwitchCountBefore @@ -217,7 +219,7 @@ class TaskFaker : Closeable { /** Sleep until [durationNanos] elapses. For use by the task threads. */ fun sleep(durationNanos: Long) { - taskRunner.lock.withLock { + taskRunner.withLock { val sleepUntil = nanoTime + durationNanos yieldUntil { nanoTime >= sleepUntil } } @@ -229,7 +231,7 @@ class TaskFaker : Closeable { */ fun yield() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { yieldUntil() } } @@ -328,7 +330,7 @@ class TaskFaker : Closeable { runnable.run() require(currentTask == this) { "unexpected current task: $currentTask" } } finally { - taskRunner.lock.withLock { + taskRunner.withLock { activeThreads-- startNextTask() } @@ -354,7 +356,7 @@ class TaskFaker : Closeable { timeout: Long, unit: TimeUnit, ): T? { - taskRunner.lock.withLock { + taskRunner.withLock { val waitUntil = nanoTime + unit.toNanos(timeout) while (true) { val result = poll() @@ -367,7 +369,7 @@ class TaskFaker : Closeable { } override fun put(element: T) { - taskRunner.lock.withLock { + taskRunner.withLock { delegate.put(element) editCount++ } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt index 788fc0f44321..99d5b600b265 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt @@ -18,8 +18,8 @@ package okhttp3.internal.concurrent import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionException import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock import okhttp3.internal.assertNotHeld +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName /** @@ -32,7 +32,7 @@ class TaskQueue internal constructor( internal val taskRunner: TaskRunner, internal val name: String, ) { - val lock: ReentrantLock = ReentrantLock() + internal val lock: ReentrantLock = ReentrantLock() internal var shutdown = false @@ -50,7 +50,7 @@ class TaskQueue internal constructor( * currently-executing task unless it is also scheduled for future execution. */ val scheduledTasks: List - get() = taskRunner.lock.withLock { futureTasks.toList() } + get() = taskRunner.withLock { futureTasks.toList() } /** * Schedules [task] for execution in [delayNanos]. A task may only have one future execution @@ -66,7 +66,7 @@ class TaskQueue internal constructor( task: Task, delayNanos: Long = 0L, ) { - taskRunner.lock.withLock { + taskRunner.withLock { if (shutdown) { if (task.cancelable) { taskRunner.logger.taskLog(task, this) { "schedule canceled (queue is shutdown)" } @@ -126,7 +126,7 @@ class TaskQueue internal constructor( /** Returns a latch that reaches 0 when the queue is next idle. */ fun idleLatch(): CountDownLatch { - taskRunner.lock.withLock { + taskRunner.withLock { // If the queue is already idle, that's easy. if (activeTask == null && futureTasks.isEmpty()) { return CountDownLatch(0) @@ -208,7 +208,7 @@ class TaskQueue internal constructor( fun cancelAll() { lock.assertNotHeld() - taskRunner.lock.withLock { + taskRunner.withLock { if (cancelAllAndDecide()) { taskRunner.kickCoordinator(this) } @@ -218,7 +218,7 @@ class TaskQueue internal constructor( fun shutdown() { lock.assertNotHeld() - taskRunner.lock.withLock { + taskRunner.withLock { shutdown = true if (cancelAllAndDecide()) { taskRunner.kickCoordinator(this) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index 6acc7b24e774..2e749e7c8c72 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -23,10 +23,15 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import java.util.logging.Logger -import kotlin.concurrent.withLock +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTime import okhttp3.internal.addIfAbsent import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE +import okhttp3.internal.connection.Locks +import okhttp3.internal.connection.Locks.newLockCondition +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName import okhttp3.internal.threadFactory @@ -45,8 +50,8 @@ class TaskRunner( val backend: Backend, internal val logger: Logger = TaskRunner.logger, ) { - val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() + internal val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newLockCondition() private var nextQueueName = 10000 private var coordinatorWaiting = false @@ -63,7 +68,7 @@ class TaskRunner( override fun run() { while (true) { val task = - this@TaskRunner.lock.withLock { + this@TaskRunner.withLock { awaitTaskToRun() } ?: return @@ -75,7 +80,7 @@ class TaskRunner( } finally { // If the task is crashing start another thread to service the queues. if (!completedNormally) { - lock.withLock { + this@TaskRunner.withLock { backend.execute(this@TaskRunner, this) } } @@ -123,7 +128,7 @@ class TaskRunner( try { delayNanos = task.runOnce() } finally { - lock.withLock { + this.withLock { afterRun(task, delayNanos) } currentThread.name = oldName @@ -239,7 +244,7 @@ class TaskRunner( } fun newQueue(): TaskQueue { - val name = lock.withLock { nextQueueName++ } + val name = this.withLock { nextQueueName++ } return TaskQueue(this, "Q$name") } @@ -248,7 +253,7 @@ class TaskRunner( * necessarily track queues that have no tasks scheduled. */ fun activeQueues(): List { - lock.withLock { + this.withLock { return busyQueues + readyQueues } } @@ -295,7 +300,7 @@ class TaskRunner( // keepAliveTime: 60L, TimeUnit.SECONDS, - SynchronousQueue(), + SynchronousQueue(false), threadFactory, ) @@ -327,7 +332,13 @@ class TaskRunner( taskRunner: TaskRunner, runnable: Runnable, ) { - executor.execute(runnable) + val time = measureTime { + executor.execute(runnable) + } + + if (time > 500.microseconds) { + println("executor.execute " + time) + } } fun shutdown() { @@ -340,5 +351,9 @@ class TaskRunner( @JvmField val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true))) + + init { + Locks.lockToWatch = INSTANCE.lock + } } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt index 625791a5b835..90ecf50a8ab0 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt @@ -26,7 +26,6 @@ import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket -import kotlin.concurrent.withLock import okhttp3.CertificatePinner import okhttp3.ConnectionSpec import okhttp3.Handshake diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index b09ce9a2d33a..8b998abb19ff 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -17,11 +17,21 @@ package okhttp3.internal.connection +import java.util.Date +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTimedValue import okhttp3.Dispatcher +import okhttp3.internal.assertHeld +import okhttp3.internal.concurrent.TaskQueue +import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.http2.Http2Connection import okhttp3.internal.http2.Http2Stream import okhttp3.internal.http2.Http2Writer @@ -32,34 +42,117 @@ import okhttp3.internal.http2.Http2Writer internal object Locks { inline fun Dispatcher.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun RealConnection.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun RealCall.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun Http2Connection.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun Http2Stream.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } - inline fun Http2Writer.withLock(action: () -> T): T { + inline fun TaskRunner.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock.withMonitoredLock(action) + } + + inline fun TaskQueue.withLock(action: () -> T): T { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock.withMonitoredLock(action) + } + inline fun Http2Writer.withLock(action: () -> T): T { // TODO can we assert we don't have the connection lock? - return lock.withLock(action) + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock.withMonitoredLock(action) + } + + internal fun ReentrantLock.newLockCondition(): Condition { + val condition = this.newCondition() + return object : Condition by condition { + override fun await() { + assertHeld() + return timeAwait { condition.await() } + } + + override fun await( + time: Long, + unit: TimeUnit?, + ): Boolean { + assertHeld() + return timeAwait { condition.await(time, unit) } + } + + override fun awaitUninterruptibly() { + assertHeld() + return timeAwait { condition.awaitUninterruptibly() } + } + + override fun awaitNanos(nanosTimeout: Long): Long { + assertHeld() + return timeAwait { condition.awaitNanos(nanosTimeout) } + } + + override fun awaitUntil(deadline: Date): Boolean { + assertHeld() + return timeAwait { condition.awaitUntil(deadline) } + } + } + } + + private fun ReentrantLock.timeAwait(function: () -> T): T { + return if (this == lockToWatch) { + measureTimedValue { function() }.also { + val lockDuration = it.duration +// if (lockDuration > 1.milliseconds) { +// println(Thread.currentThread().name + " await " + lockDuration) +// Exception().printStackTrace() + threadLocalAwait.set(threadLocalAwait.get() + lockDuration) +// } + }.value + } else { + function() + } } + + inline fun ReentrantLock.withMonitoredLock(action: () -> T): T { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return if (this == lockToWatch) { + withLock { + measureTimedValue { + action() + } + }.also { + val awaitDuration = threadLocalAwait.get() + threadLocalAwait.remove() + if (it.duration - awaitDuration > 1.milliseconds) { + println(Thread.currentThread().name + " lock " + it.duration + " " + awaitDuration) +// Exception().printStackTrace() + } + }.value + } else { + withLock(action) + } + } + + @Suppress("NewApi") + val threadLocalAwait = ThreadLocal.withInitial { Duration.ZERO } + + @Volatile + var lockToWatch: ReentrantLock? = null } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index 432dcc074e7e..89c0bb30bd26 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket -import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.Connection import okhttp3.ConnectionListener @@ -335,7 +334,7 @@ class RealConnection( return http2Connection.isHealthy(nowNs) } - val idleDurationNs = lock.withLock { nowNs - idleAtNs } + val idleDurationNs = this.withLock { nowNs - idleAtNs } if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { return socket.isHealthy(source) } @@ -354,7 +353,7 @@ class RealConnection( connection: Http2Connection, settings: Settings, ) { - lock.withLock { + this.withLock { val oldLimit = allocationLimit allocationLimit = settings.getMaxConcurrentStreams() @@ -398,7 +397,7 @@ class RealConnection( e: IOException?, ) { var noNewExchangesEvent = false - lock.withLock { + this.withLock { if (e is StreamResetException) { when { e.errorCode == ErrorCode.REFUSED_STREAM -> { diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt index 7339f4fcf8df..3da1fb82414a 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -22,11 +22,14 @@ import java.net.Socket import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTime import okhttp3.internal.EMPTY_BYTE_ARRAY import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertThreadDoesntHoldLock import okhttp3.internal.closeQuietly import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE @@ -56,7 +59,7 @@ import okio.source @Suppress("NAME_SHADOWING") class Http2Connection internal constructor(builder: Builder) : Closeable { internal val lock: ReentrantLock = ReentrantLock() - internal val condition: Condition = lock.newCondition() + internal val condition: Condition = lock.newLockCondition() // Internal state of this connection is guarded by 'lock'. No blocking operations may be // performed while holding this lock! @@ -792,7 +795,13 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { peerSettings = newPeerSettings settingsListenerQueue.execute("$connectionName onSettings") { - listener.onSettings(this@Http2Connection, newPeerSettings) + measureTime { + listener.onSettings(this@Http2Connection, newPeerSettings) + }.also { + if (it > 1.milliseconds) { + println("onSettings " + it) + } + } } } try { diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt index ea27c00ecf5e..fcabc4d2ca5c 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock import okhttp3.Headers import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertNotHeld +import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.http2.flowcontrol.WindowCounter import okhttp3.internal.toHeaderList @@ -45,7 +46,7 @@ class Http2Stream internal constructor( headers: Headers?, ) { internal val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() + val condition: Condition = lock.newLockCondition() // Internal state is guarded by [lock]. No long-running or potentially blocking operations are // performed while the lock is held. diff --git a/okhttp/src/test/java/okhttp3/CallTest.kt b/okhttp/src/test/java/okhttp3/CallTest.kt index e59e0de7b6ee..c1aa3ac10c07 100644 --- a/okhttp/src/test/java/okhttp3/CallTest.kt +++ b/okhttp/src/test/java/okhttp3/CallTest.kt @@ -42,6 +42,8 @@ import java.net.HttpURLConnection import java.net.InetAddress import java.net.ProtocolException import java.net.Proxy +import java.net.Socket +import java.net.SocketAddress import java.net.SocketTimeoutException import java.net.UnknownHostException import java.net.UnknownServiceException @@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import javax.net.SocketFactory import javax.net.ssl.SSLException import javax.net.ssl.SSLHandshakeException import javax.net.ssl.SSLPeerUnverifiedException @@ -142,6 +145,32 @@ open class CallTest { private var client = clientTestRule.newClientBuilder() .eventListenerFactory(clientTestRule.wrap(listener)) + .socketFactory( + object : DelegatingSocketFactory(SocketFactory.getDefault()) { + override fun createSocket(): Socket { + Thread.sleep(1_000) + return object : Socket() { + override fun connect(endpoint: SocketAddress?) { + Thread.sleep(500) + super.connect(endpoint) + } + + override fun connect( + endpoint: SocketAddress?, + timeout: Int, + ) { + Thread.sleep(500) + super.connect(endpoint, timeout) + } + + override fun close() { + Thread.sleep(500) + super.close() + } + } + } + }, + ) .build() private val callback = RecordingCallback() private val cache = diff --git a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt new file mode 100644 index 000000000000..b009cb895e80 --- /dev/null +++ b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3 + +import java.net.Socket +import java.net.SocketAddress +import java.util.concurrent.CountDownLatch +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds +import mockwebserver3.MockResponse +import mockwebserver3.MockWebServer +import mockwebserver3.SocketPolicy +import okhttp3.internal.connection.RealConnection +import okhttp3.testing.PlatformRule +import okio.IOException +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +class SlowNetworkTest { + @JvmField + @RegisterExtension + val clientTestRule = OkHttpClientTestRule() + + @JvmField + @RegisterExtension + val platform = PlatformRule() + + private val handshakeCertificates = platform.localhostHandshakeCertificates() + private lateinit var client: OkHttpClient + private lateinit var server: MockWebServer + + @BeforeEach + fun setUp(server: MockWebServer) { + this.server = server + + client = + clientTestRule.newClientBuilder() + .sslSocketFactory( + handshakeCertificates.sslSocketFactory(), + handshakeCertificates.trustManager, + ) + .socketFactory(object : DelegatingSocketFactory(getDefault()) { + override fun createSocket(): Socket { + return object : Socket() { + override fun connect(endpoint: SocketAddress?) { + Thread.sleep(100) + super.connect(endpoint) + } + + override fun connect(endpoint: SocketAddress?, timeout: Int) { + Thread.sleep(100) + super.connect(endpoint, timeout) + } + + override fun close() { + Thread.sleep(100) + super.close() + } + } + } + }) + .callTimeout(15.seconds) + .connectTimeout(15.seconds) + .eventListener(object : EventListener() { + override fun connectionAcquired(call: Call, connection: Connection) { + (connection as RealConnection).noNewExchanges() + } + }) + .build() + + server.useHttps(handshakeCertificates.sslSocketFactory()) + } + + @Test + fun slowRequests() { + repeat(100) { + server.enqueue( + MockResponse.Builder() + .socketPolicy(SocketPolicy.DelayAccept(10.milliseconds)) + .build(), + ) + } + + val latch = CountDownLatch(100) + + (1..100).map { + client.newCall(Request(server.url("/"))).enqueue( + object : Callback { + override fun onFailure( + call: Call, + e: IOException, + ) { + println(e) + latch.countDown() + } + + override fun onResponse( + call: Call, + response: Response, + ) { +// println("response") + response.body.string() + latch.countDown() + } + }, + ) + } + + latch.await() + } + + @Test + fun test1() { + repeat(10) { + slowRequests() + } + } + + @Test + fun test2() { + repeat(10) { + slowRequests() + } + } + + @Test + fun test3() { + repeat(10) { + slowRequests() + } + } +}