Skip to content

Commit

Permalink
Performance improvement (#277)
Browse files Browse the repository at this point in the history
* don't go to kafka to again retrieve the partitionInfoList when its already available

* reuse already retrieved partitionInfoList
do only 2 request to retrieve all partition sized instead of 2 per topic

* don't get partition offsets where its not used

* remove partitionInfoList from TopicVo again and inject it where needed

* remove not needed topicsMap from arguments

* only fetch topic data that are part of the consumer groupId

Co-authored-by: Jork Zijlstra <[email protected]>
  • Loading branch information
jorkzijlstra and Jork Zijlstra authored Feb 8, 2022
1 parent d5fea14 commit 9d16be5
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 74 deletions.
14 changes: 4 additions & 10 deletions src/main/java/kafdrop/controller/ConsumerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) {

@RequestMapping("/{groupId:.+}")
public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)));
return "consumer-detail";
}
Expand All @@ -53,11 +50,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode
})
@GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId));
}
}
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model)
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);
model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic)));
model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)));
model.addAttribute("topicDeleteEnabled", topicDeleteEnabled);
model.addAttribute("keyFormat", defaultKeyFormat);
model.addAttribute("format", defaultFormat);
Expand Down Expand Up @@ -125,7 +125,7 @@ public String createTopicPage(Model model) {
public @ResponseBody List<ConsumerVO> getConsumers(@PathVariable("name") String topicName) {
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
return kafkaMonitor.getConsumers(Collections.singleton(topic));
return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic));
}

/**
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/kafdrop/model/TopicVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public void setConfig(Map<String, String> config) {
this.config = config;
}

public Map<Integer, TopicPartitionVO> getPartitionMap() {
return Collections.unmodifiableMap(partitions);
}

public Collection<TopicPartitionVO> getPartitions() {
return Collections.unmodifiableCollection(partitions.values());
}
Expand Down
94 changes: 49 additions & 45 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import kafdrop.config.*;
import kafdrop.model.*;
import kafdrop.util.*;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.*;
import org.slf4j.*;
import org.springframework.stereotype.*;

import javax.annotation.*;
import java.nio.*;
import java.time.*;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.*;
import java.util.stream.*;
import java.util.stream.Collectors;

@Service
public final class KafkaHighLevelConsumer {
Expand Down Expand Up @@ -47,36 +48,37 @@ private void initializeClient() {
}
}

synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
synchronized void setTopicPartitionSizes(List<TopicVO> topics) {
initializeClient();

final var partitionInfoSet = kafkaConsumer.partitionsFor(topic);
kafkaConsumer.assign(partitionInfoSet.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList()));

kafkaConsumer.poll(Duration.ofMillis(0));
final Set<TopicPartition> assignedPartitionList = kafkaConsumer.assignment();
final TopicVO topicVO = getTopicInfo(topic);
final Map<Integer, TopicPartitionVO> partitionsVo = topicVO.getPartitionMap();

kafkaConsumer.seekToBeginning(assignedPartitionList);
assignedPartitionList.forEach(topicPartition -> {
final TopicPartitionVO topicPartitionVo = partitionsVo.get(topicPartition.partition());
final long startOffset = kafkaConsumer.position(topicPartition);
LOG.debug("topic: {}, partition: {}, startOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset);
topicPartitionVo.setFirstOffset(startOffset);
});

kafkaConsumer.seekToEnd(assignedPartitionList);
assignedPartitionList.forEach(topicPartition -> {
final long latestOffset = kafkaConsumer.position(topicPartition);
LOG.debug("topic: {}, partition: {}, latestOffset: {}", topicPartition.topic(), topicPartition.partition(), latestOffset);
final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition());
partitionVo.setSize(latestOffset);
});
return partitionsVo;
Map<TopicVO, List<TopicPartition>> allTopics = topics.stream().map(topicVO -> {
List<TopicPartition> topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO ->
new TopicPartition(topicVO.getName(), topicPartitionVO.getId())
).collect(Collectors.toList());

return Pair.of(topicVO, topicPartitions);
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<TopicPartition> allTopicPartitions = allTopics.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());

kafkaConsumer.assign(allTopicPartitions);
Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions);
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(allTopicPartitions);

allTopics.forEach((topicVO, topicPartitions) -> topicPartitions.forEach(topicPartition -> {
Optional<TopicPartitionVO> partition = topicVO.getPartition(topicPartition.partition());

partition.ifPresent(p -> {
Long startOffset = beginningOffset.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);

LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset, endOffset);
p.setFirstOffset(startOffset);
p.setSize(endOffset);
});
}));
}

/**
Expand Down Expand Up @@ -195,25 +197,27 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes
return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty";
}

synchronized Map<String, TopicVO> getTopicInfos(String[] topics) {
synchronized Map<String, List<PartitionInfo>> getAllTopics() {
initializeClient();

return kafkaConsumer.listTopics();
}

synchronized Map<String, TopicVO> getTopicInfos(Map<String, List<PartitionInfo>> allTopicsMap, String[] topics) {
initializeClient();
final var topicSet = kafkaConsumer.listTopics().keySet();

final var topicSet = allTopicsMap.keySet();
if (topics.length == 0) {
topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class);
}
final var topicVos = new HashMap<String, TopicVO>(topics.length, 1f);

for (var topic : topics) {
if (topicSet.contains(topic)) {
topicVos.put(topic, getTopicInfo(topic));
}
}

return topicVos;
}
return Arrays.stream(topics)
.filter(topicSet::contains)
.map(topic -> Pair.of(topic, getTopicInfo(topic, allTopicsMap.get(topic))))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

private TopicVO getTopicInfo(String topic) {
final var partitionInfoList = kafkaConsumer.partitionsFor(topic);
private TopicVO getTopicInfo(String topic, List<PartitionInfo> partitionInfoList) {
final var topicVo = new TopicVO(topic);
final var partitions = new TreeMap<Integer, TopicPartitionVO>();

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int coun

ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics);

List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos);
List<ConsumerVO> getConsumersByGroup(String groupId);

List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos);

/**
* Create topic
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,31 @@ public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {

@Override
public List<TopicVO> getTopics() {
final var topicVos = getTopicMetadata().values().stream()
final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopics()).values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());
for (var topicVo : topicVos) {
topicVo.setPartitions(getTopicPartitionSizes(topicVo));
}

return topicVos;
}

public List<TopicVO> getTopics(String[] topics) {
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopics();

ArrayList<TopicVO> topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values());
setTopicPartitionSizes(topicVos);

return topicVos;
}

@Override
public Optional<TopicVO> getTopic(String topic) {
final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic));
topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo)));
return topicVo;
String[] topics = { topic };

return getTopics(topics).stream().findAny();
}

private Map<String, TopicVO> getTopicMetadata(String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(topics);
private Map<String, TopicVO> getTopicMetadata(Map<String, List<PartitionInfo>> allTopicsMap, String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(allTopicsMap, topics);
final var retrievedTopicNames = topicInfos.keySet();
final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames);

Expand Down Expand Up @@ -191,12 +198,29 @@ private static Map<String, String> headersToMap(Headers headers) {
return map;
}

private Map<Integer, TopicPartitionVO> getTopicPartitionSizes(TopicVO topic) {
return highLevelConsumer.getPartitionSize(topic.getName());
private void setTopicPartitionSizes(List<TopicVO> topics) {
highLevelConsumer.setTopicPartitionSizes(topics);
}

@Override
public List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos) {
public List<ConsumerVO> getConsumersByGroup(String groupId) {
List<ConsumerGroupOffsets> consumerGroupOffsets = getConsumerOffsets(groupId);

String[] uniqueTopicNames = consumerGroupOffsets.stream()
.flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet()
.stream().map(TopicPartition::topic))
.distinct()
.toArray(String[]::new);

List<TopicVO> topicVOs = getTopics(uniqueTopicNames);

LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
LOG.debug("topicVos: {}", topicVOs);
return convert(consumerGroupOffsets, topicVOs);
}

@Override
public List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos) {
final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet());
final var consumerGroupOffsets = getConsumerOffsets(topics);
LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
Expand Down Expand Up @@ -308,6 +332,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) {
return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(String groupId) {
return Collections.singletonList(resolveOffsets(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
final var consumerGroups = highLevelAdminClient.listConsumerGroups();
return consumerGroups.stream()
Expand Down

0 comments on commit 9d16be5

Please sign in to comment.