Skip to content

Commit

Permalink
Merge pull request #736 from couchbaselabs/couchbase2
Browse files Browse the repository at this point in the history
[couchbase2] Better Defaults & Scan Refactor
  • Loading branch information
busbey committed May 4, 2016
2 parents a372340 + 73e2b18 commit fd58738
Showing 1 changed file with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class Couchbase2Client extends DB {
System.setProperty("com.couchbase.query.encodedPlanEnabled", "false");
}

private static final String SEPARATOR = ":";
private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class);
private static final Object INIT_COORDINATOR = new Object();

Expand All @@ -125,6 +126,7 @@ public class Couchbase2Client extends DB {
private int boost;
private int networkMetricsInterval;
private int runtimeMetricsInterval;
private String scanAllQuery;

@Override
public void init() throws DBException {
Expand All @@ -142,11 +144,12 @@ public void init() throws DBException {
kv = props.getProperty("couchbase.kv", "true").equals("true");
maxParallelism = Integer.parseInt(props.getProperty("couchbase.maxParallelism", "1"));
kvEndpoints = Integer.parseInt(props.getProperty("couchbase.kvEndpoints", "1"));
queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "5"));
queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "1"));
epoll = props.getProperty("couchbase.epoll", "false").equals("true");
boost = Integer.parseInt(props.getProperty("couchbase.boost", "3"));
networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0"));
runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0"));
scanAllQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";

try {
synchronized (INIT_COORDINATOR) {
Expand All @@ -170,6 +173,9 @@ public void init() throws DBException {
.callbacksOnIoPool(true)
.runtimeMetricsCollectorConfig(runtimeConfig)
.networkLatencyMetricsCollectorConfig(latencyConfig)
.socketConnectTimeout(10000) // 10 secs socket connect timeout
.connectTimeout(30000) // 30 secs overall bucket open timeout
.kvTimeout(10000) // 10 instead of 2.5s for KV ops
.kvEndpoints(kvEndpoints);

// Tune boosting and epoll based on settings
Expand Down Expand Up @@ -600,18 +606,19 @@ public Status scan(final String table, final String startkey, final int recordco
*/
private Status scanAllFields(final String table, final String startkey, final int recordcount,
final Vector<HashMap<String, ByteIterator>> result) {
final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
Collection<HashMap<String, ByteIterator>> documents = bucket.async()
final List<HashMap<String, ByteIterator>> data = new ArrayList<HashMap<String, ByteIterator>>(recordcount);

bucket.async()
.query(N1qlQuery.parameterized(
scanQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
))
scanAllQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
))
.doOnNext(new Action1<AsyncN1qlQueryResult>() {
@Override
public void call(AsyncN1qlQueryResult result) {
if (!result.parseSuccess()) {
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery
+ ", Errors: " + result.errors());
}
}
Expand All @@ -625,7 +632,11 @@ public Observable<AsyncN1qlQueryRow> call(AsyncN1qlQueryResult result) {
.flatMap(new Func1<AsyncN1qlQueryRow, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(AsyncN1qlQueryRow row) {
return bucket.async().get(row.value().getString("id"), RawJsonDocument.class);
String id = new String(row.byteValue());
return bucket.async().get(
id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')),
RawJsonDocument.class
);
}
})
.map(new Func1<RawJsonDocument, HashMap<String, ByteIterator>>() {
Expand All @@ -636,11 +647,15 @@ public HashMap<String, ByteIterator> call(RawJsonDocument document) {
return tuple;
}
})
.toList()
.toBlocking()
.single();
.forEach(new Action1<HashMap<String, ByteIterator>>() {
@Override
public void call(HashMap<String, ByteIterator> tuple) {
data.add(tuple);
}
});

result.addAll(documents);
result.addAll(data);
return Status.OK;
}

Expand All @@ -656,15 +671,16 @@ public HashMap<String, ByteIterator> call(RawJsonDocument document) {
*/
private Status scanSpecificFields(final String table, final String startkey, final int recordcount,
final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) {
String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName
+ "` WHERE meta().id >= '$1' LIMIT $2";
N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
scanQuery,
scanSpecQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
));

if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery
+ ", Errors: " + queryResult.errors());
}

Expand Down Expand Up @@ -777,7 +793,7 @@ private static String joinFields(final Set<String> fields) {
* @return a document ID that can be used with Couchbase.
*/
private static String formatId(final String prefix, final String key) {
return prefix + ":" + key;
return prefix + SEPARATOR + key;
}

/**
Expand Down

0 comments on commit fd58738

Please sign in to comment.