From 5b50c7947a1f5f7caebc06302c1f9ddbc2e8bb05 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Thu, 28 Apr 2016 14:10:18 +0200 Subject: [PATCH 1/2] [couchbase2] Better default settings. This changeset picks better default settings for the environment to make it less error-prone in cloud setups and in turn leads to easier usability. The changes made are: - queryEndpoints set from 5 to 1 by default since this is technically a leftover from earlier implementations. We now have a Bucket per Thread and since YCSB is sync anyways we only ever need one. Leaving it as an option since if we get async ops in the future we can utilize those. - socketConnectTimeout from 1s to 10s, especially in cloud environments socket timeouts are annoying and its better if we have higher defaults. - connectTimeout changed from 5s to 30s since the socket connect timeout is higher we also need to adjust this one. - kvTimeout changed from 2.5s to 10s so that under throughput saturation testing you don't get flooded with timeouts potentially. Note that it also makes for nicer output if you get "higher latencies" as part of the regular output and not in the error section. --- .../java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java index cdd57d8462..e89a3dfff9 100644 --- a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java @@ -142,7 +142,7 @@ public void init() throws DBException { kv = props.getProperty("couchbase.kv", "true").equals("true"); maxParallelism = Integer.parseInt(props.getProperty("couchbase.maxParallelism", "1")); kvEndpoints = Integer.parseInt(props.getProperty("couchbase.kvEndpoints", "1")); - queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "5")); + queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "1")); epoll = props.getProperty("couchbase.epoll", "false").equals("true"); boost = Integer.parseInt(props.getProperty("couchbase.boost", "3")); networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0")); @@ -170,6 +170,9 @@ public void init() throws DBException { .callbacksOnIoPool(true) .runtimeMetricsCollectorConfig(runtimeConfig) .networkLatencyMetricsCollectorConfig(latencyConfig) + .socketConnectTimeout(10000) // 10 secs socket connect timeout + .connectTimeout(30000) // 30 secs overall bucket open timeout + .kvTimeout(10000) // 10 instead of 2.5s for KV ops .kvEndpoints(kvEndpoints); // Tune boosting and epoll based on settings From 73e2b18838887b44b4c8b8b7f68d7107c5d199d0 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Thu, 28 Apr 2016 17:40:46 +0200 Subject: [PATCH 2/2] [couchbase2] Refactor scan operation. This changeset refactors the scan operation slightly, using a List with a predefined size (since we know it), as well as using the document ID directly for the response rather than doing full blown JSON decoding which is not needed. --- .../ycsb/db/couchbase2/Couchbase2Client.java | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java index e89a3dfff9..3d0bc0398c 100644 --- a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java @@ -102,6 +102,7 @@ public class Couchbase2Client extends DB { System.setProperty("com.couchbase.query.encodedPlanEnabled", "false"); } + private static final String SEPARATOR = ":"; private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class); private static final Object INIT_COORDINATOR = new Object(); @@ -125,6 +126,7 @@ public class Couchbase2Client extends DB { private int boost; private int networkMetricsInterval; private int runtimeMetricsInterval; + private String scanAllQuery; @Override public void init() throws DBException { @@ -147,6 +149,7 @@ public void init() throws DBException { boost = Integer.parseInt(props.getProperty("couchbase.boost", "3")); networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0")); runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0")); + scanAllQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; try { synchronized (INIT_COORDINATOR) { @@ -603,18 +606,19 @@ public Status scan(final String table, final String startkey, final int recordco */ private Status scanAllFields(final String table, final String startkey, final int recordcount, final Vector> result) { - final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; - Collection> documents = bucket.async() + final List> data = new ArrayList>(recordcount); + + bucket.async() .query(N1qlQuery.parameterized( - scanQuery, - JsonArray.from(formatId(table, startkey), recordcount), - N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) - )) + scanAllQuery, + JsonArray.from(formatId(table, startkey), recordcount), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )) .doOnNext(new Action1() { @Override public void call(AsyncN1qlQueryResult result) { if (!result.parseSuccess()) { - throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery + ", Errors: " + result.errors()); } } @@ -628,7 +632,11 @@ public Observable call(AsyncN1qlQueryResult result) { .flatMap(new Func1>() { @Override public Observable call(AsyncN1qlQueryRow row) { - return bucket.async().get(row.value().getString("id"), RawJsonDocument.class); + String id = new String(row.byteValue()); + return bucket.async().get( + id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')), + RawJsonDocument.class + ); } }) .map(new Func1>() { @@ -639,11 +647,15 @@ public HashMap call(RawJsonDocument document) { return tuple; } }) - .toList() .toBlocking() - .single(); + .forEach(new Action1>() { + @Override + public void call(HashMap tuple) { + data.add(tuple); + } + }); - result.addAll(documents); + result.addAll(data); return Status.OK; } @@ -659,15 +671,16 @@ public HashMap call(RawJsonDocument document) { */ private Status scanSpecificFields(final String table, final String startkey, final int recordcount, final Set fields, final Vector> result) { - String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; + String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + + "` WHERE meta().id >= '$1' LIMIT $2"; N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( - scanQuery, + scanSpecQuery, JsonArray.from(formatId(table, startkey), recordcount), N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) )); if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { - throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery + ", Errors: " + queryResult.errors()); } @@ -780,7 +793,7 @@ private static String joinFields(final Set fields) { * @return a document ID that can be used with Couchbase. */ private static String formatId(final String prefix, final String key) { - return prefix + ":" + key; + return prefix + SEPARATOR + key; } /**