Skip to content

Commit

Permalink
memory leak fixes
Browse files Browse the repository at this point in the history
move records in a separate buffer with fixed size of 100k to prevent code from building a single batch indefinitely
limited records queue to 200k per topic to prevent records building up
  • Loading branch information
AdamKatzDev committed Nov 16, 2023
1 parent 26fe9cf commit 808496d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -59,7 +60,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private final ConcurrentHashMap<String, Queue<ClickHouseStruct>> records = new ConcurrentHashMap<>();


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -198,14 +199,8 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord
} catch(Exception e) {
log.error("Error retrieving status metrics");
}
ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
}
synchronized (this.records) {
if (chStruct != null) {
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
}

Expand Down Expand Up @@ -562,17 +557,22 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars
* @param topicName
* @param chs
*/
private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) {
ConcurrentLinkedQueue<ClickHouseStruct> structs;
private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) throws InterruptedException {
Queue<ClickHouseStruct> structs;

if (this.records.containsKey(topicName)) {
structs = this.records.get(topicName);
} else {
structs = new ConcurrentLinkedQueue<>();
structs = new LinkedBlockingQueue<>(200_000);
this.records.putIfAbsent(topicName, structs);
}
structs.add(chs);
synchronized (this.records) {
this.records.put(topicName, structs);
while (true) {
try {
structs.add(chs);
break;
} catch (IllegalStateException e) {
Thread.sleep(50);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -39,7 +40,7 @@ public ClickHouseSinkTask() {
private ClickHouseBatchExecutor executor;

// Records grouped by Topic Name
private ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private ConcurrentHashMap<String, Queue<ClickHouseStruct>> records;

private DeDuplicator deduplicator;

Expand Down Expand Up @@ -115,16 +116,25 @@ public void put(Collection<SinkRecord> records) {
}

private void appendToRecords(String topicName, ClickHouseStruct chs) {
ConcurrentLinkedQueue<ClickHouseStruct> structs;
Queue<ClickHouseStruct> structs;

if(this.records.containsKey(topicName)) {
if (this.records.containsKey(topicName)) {
structs = this.records.get(topicName);
} else {
structs = new ConcurrentLinkedQueue<>();
structs = new LinkedBlockingQueue<>(200_000);
this.records.putIfAbsent(topicName, structs);
}
structs.add(chs);
synchronized (this.records) {
this.records.put(topicName, structs);
while (true) {
try {
structs.add(chs);
break;
} catch (IllegalStateException e) {
try {
Thread.sleep(50);
} catch (InterruptedException ignored) {

}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
Expand Down Expand Up @@ -237,8 +232,8 @@ private void updatePartitionOffsetMap(Map<TopicPartition, Long> offsetToPartitio
* @param records
* @return
*/
public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
public Map<TopicPartition, Long> groupQueryWithRecords(Collection<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
List<ClickHouseStruct>> queryToRecordsMap) {


Expand Down Expand Up @@ -468,7 +463,9 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map<MutablePa
success = true;

long taskId = this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
log.info("*************** EXECUTED BATCH Successfully " + "Records: " + recordsList.size() + "************** task(" + taskId + ")" + " Thread ID: " + Thread.currentThread().getName());
log.info("*************** EXECUTED BATCH Successfully "
+ "Topic: " + topicName + " "
+ "Records: " + recordsList.size() + " ************** task(" + taskId + ")" + " Thread ID: " + Thread.currentThread().getName());

// ToDo: Clear is not an atomic operation.
// It might delete the records that are inserted by the ingestion process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Runnable object that will be called on
Expand All @@ -28,7 +25,7 @@
*/
public class ClickHouseBatchRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class);
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private final ConcurrentHashMap<String, Queue<ClickHouseStruct>> records;

private final ClickHouseSinkConnectorConfig config;

Expand All @@ -44,11 +41,9 @@ public class ClickHouseBatchRunnable implements Runnable {


// Map of topic name to buffered records.
Map<String, Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>>> topicToRecordsMap;

private DBCredentials dbCredentials;

public ClickHouseBatchRunnable(ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records,
public ClickHouseBatchRunnable(ConcurrentHashMap<String, Queue<ClickHouseStruct>> records,
ClickHouseSinkConnectorConfig config,
Map<String, String> topic2TableMap) {
this.records = records;
Expand All @@ -61,7 +56,6 @@ public ClickHouseBatchRunnable(ConcurrentHashMap<String, ConcurrentLinkedQueue<C

//this.queryToRecordsMap = new HashMap<>();
this.topicToDbWriterMap = new HashMap<>();
this.topicToRecordsMap = new HashMap<>();

this.dbCredentials = parseDBConfiguration();

Expand Down Expand Up @@ -96,16 +90,46 @@ public void run() {
}

// Topic Name -> List of records
for (Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : this.records.entrySet()) {
if (entry.getValue().size() > 0) {
processRecordsByTopic(entry.getKey(), entry.getValue());
for (Map.Entry<String, Queue<ClickHouseStruct>> entry : this.records.entrySet()) {
Queue<ClickHouseStruct> queue = entry.getValue();
while (!queue.isEmpty()) {
Queue<ClickHouseStruct> buffer = this.moveRecordsToSeparateBuffer(entry.getValue());
processRecordsByTopic(entry.getKey(), buffer);
}
}
} catch(Exception e) {
log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e);
}
}

private Queue<ClickHouseStruct> moveRecordsToSeparateBuffer(Queue<ClickHouseStruct> from) throws InterruptedException {
long timeMillis = System.currentTimeMillis();
Iterator<ClickHouseStruct> iterator = from.iterator();
int bufferSize = 100_000;
ArrayDeque<ClickHouseStruct> buffer = new ArrayDeque<>(bufferSize);
while (System.currentTimeMillis() - timeMillis < 5000) {
if (!iterator.hasNext()) {
break;
}
int counter = 0;
while (iterator.hasNext() && buffer.size() < bufferSize) {
buffer.add(iterator.next());
iterator.remove();
++counter;
}
if (buffer.size() == bufferSize) {
break;
}
if (counter < 1000) { //probably fetching data from binlog by now (or small or really wide table)
break;
}
Thread.sleep(50);
}
log.info(String.format("Built new batch for processing in %d msec", System.currentTimeMillis() - timeMillis));

return buffer;
}

/**
* Function to retrieve table name from topic name
*
Expand Down Expand Up @@ -141,7 +165,7 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou
* @param topicName
* @param records
*/
private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<ClickHouseStruct> records) throws SQLException {
private void processRecordsByTopic(String topicName, Queue<ClickHouseStruct> records) throws SQLException {

//The user parameter will override the topic mapping to table.
String tableName = getTableFromTopic(topicName);
Expand All @@ -154,22 +178,12 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<Click
// Step 1: The Batch Insert with preparedStatement in JDBC
// works by forming the Query and then adding records to the Batch.
// This step creates a Map of Query -> Records(List of ClickHouseStruct)
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap;

if(topicToRecordsMap.containsKey(topicName)) {
queryToRecordsMap = topicToRecordsMap.get(topicName);
} else {
queryToRecordsMap = new HashMap<>();
topicToRecordsMap.put(topicName, queryToRecordsMap);
}
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<>();

Map<TopicPartition, Long> partitionToOffsetMap = writer.groupQueryWithRecords(records, queryToRecordsMap);
BlockMetaData bmd = new BlockMetaData();

if(flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) {
// Remove the entry.
queryToRecordsMap.remove(topicName);
}
flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd);

if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET.toString())) {
log.info("***** KAFKA OFFSET MANAGEMENT ENABLED *****");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ClickHouseBatchRunnableTest {


ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>>
ConcurrentHashMap<String, Queue<ClickHouseStruct>>
records = new ConcurrentHashMap<>();
Map<String, String> topic2TableMap = new HashMap<>();

Expand Down

0 comments on commit 808496d

Please sign in to comment.