Skip to content

Commit

Permalink
Kinesis Asynchronous puts.
Browse files Browse the repository at this point in the history
  • Loading branch information
shenavaa committed Oct 15, 2021
1 parent 9edafd1 commit e8d3a16
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class KinesisConfigurationUtil extends ConfigurationUtil {
public static final String STREAM_MAX_CONNECTION =
REPLICATIOM_KINESIS+".max-connection";

public static final String SYMCHRONIZED =
REPLICATIOM_KINESIS+".syncputs";

public static final String STREAM_REQUEST_TIMEOUT =
REPLICATIOM_KINESIS+".request-timeout";

Expand Down Expand Up @@ -172,4 +175,8 @@ public boolean isKPLAggregationEnabled() {
return isKPLAggregationEnabled;
}

public boolean isSynchPutsEnabled() {
return this.conf.getBoolean(SYMCHRONIZED, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand All @@ -26,6 +26,11 @@
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Futures;

public class KinesisDataSinkImpl extends DataSink {
private MessageDigest md;
Expand All @@ -35,13 +40,19 @@ public class KinesisDataSinkImpl extends DataSink {
private KinesisProducer kinesis = null;

private KinesisConfigurationUtil configUtil;


private FutureCallback<UserRecordResult> putRecordCallback;
private ListeningExecutorService executor;

/**
* Constructor
* @param config
*/
public KinesisDataSinkImpl(Configuration config) {
super(config);
this.configUtil = this.getConfigurationUtil();

try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
Expand All @@ -54,6 +65,21 @@ public KinesisDataSinkImpl(Configuration config) {
+ "", e);
e.printStackTrace();
}
if ( this.configUtil.isSynchPutsEnabled() == false) {
LOG.info("Initilizing Asynchronous putRecords. We wil skip failed PutRecords will be lost! ");
putRecordCallback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
/* Analyze and respond to the failure */
};
@Override
public void onSuccess(UserRecordResult result) {
/* Respond to the success */
};
};

executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(50));
}
}

/**
Expand All @@ -75,7 +101,9 @@ public boolean isBlocking() {
*/
public void putRecord(ByteBuffer buffer, String tablename) throws IOException, InterruptedException, ExecutionException {
String partition = UUIDHelper.getBase64UUID();
LOG.debug("Putting record in random partition: " + partition);
if (LOG.isDebugEnabled()) {
LOG.debug("Putting record in random partition: " + partition);
}
this.putRecord( buffer, tablename,partition);
}

Expand All @@ -91,34 +119,41 @@ public void putRecord(ByteBuffer buffer, String tablename) throws IOException, I
*/
public void putRecord(ByteBuffer buffer, String tablename,String partition) throws IOException, InterruptedException, ExecutionException {
if (kinesis == null) { // creating the producer when there is a request.
KinesisProducerConfiguration config = this.getConfigurationUtil().getKPLConfiguration();
LOG.debug("First Time ptoducer. endpoint " + config.getKinesisEndpoint() + " port: " + config.getKinesisPort() );
KinesisProducerConfiguration config = configUtil.getKPLConfiguration();
LOG.debug("First Time producer. endpoint " + config.getKinesisEndpoint() + " port: " + config.getKinesisPort() );
this.kinesis = KinesisProducerFactory.getProducer(config);
}



md.update(partition.getBytes());
String digest = Base64.getEncoder().encodeToString(md.digest());
LOG.debug("Putting record in digest partition: " + digest);
String destination = this.getConfigurationUtil().getStreamNameFromTableName(tablename);
Future<UserRecordResult> putFuture = (Future<UserRecordResult>) kinesis.addUserRecord(destination, digest, buffer);


long time = System.currentTimeMillis();
LOG.debug("Starting a put " + time);
UserRecordResult result = putFuture.get(); // this does block
LOG.debug("Out of Put " + System.currentTimeMillis());

if (result.isSuccessful()) {
LOG.debug(
"Put record into shard= {} PartitionKey = {}, time={} "
, result.getShardId()
, digest
, System.currentTimeMillis() - time);
if (configUtil.isSynchPutsEnabled()) {
Future<UserRecordResult> putFuture = (Future<UserRecordResult>) kinesis.addUserRecord(destination, digest, buffer);

UserRecordResult result = putFuture.get(); // this does block

if (result.isSuccessful()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Put record into shard= {} PartitionKey = {}, time={} "
, result.getShardId()
, digest
, System.currentTimeMillis() - time);
}
} else {
for (Attempt attempt : result.getAttempts()) {
LOG.error(attempt.getErrorMessage());
throw new IOException("Record faild to replicate");
}
}
} else {
for (Attempt attempt : result.getAttempts()) {
LOG.error(attempt.getErrorMessage());
throw new IOException("Record faild to replicate");
}
}
ListenableFuture<UserRecordResult> putFuture = kinesis.addUserRecord(destination, digest, buffer);
Futures.addCallback(putFuture,putRecordCallback, executor);
}
}

/**
Expand Down Expand Up @@ -168,26 +203,19 @@ public List<Entry> filter(final List<Entry> oldEntries) {

@Override
public boolean supportsTransaction() {
// TODO Auto-generated method stub
return false;
}

@Override
public void beginTransaction() {
// TODO Auto-generated method stub

}

@Override
public void commitTransaction() {
// TODO Auto-generated method stub

}

@Override
public void abortTransaction() {
// TODO Auto-generated method stub

}

}

0 comments on commit e8d3a16

Please sign in to comment.