Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plans for migrate topic metadata beyond partition count #20

Open
javierholguera opened this issue Oct 29, 2018 · 6 comments
Open

Plans for migrate topic metadata beyond partition count #20

javierholguera opened this issue Oct 29, 2018 · 6 comments

Comments

@javierholguera
Copy link

I had a look around and I believe the tool is able to identify if the partition count between source and destination topic is the same.

Is there any plans to be able to replicate other topic metadata like retention, max message size, etc?

Thanks.

@pdavidson100
Copy link
Contributor

It's something we would like to add, but don't have any plans to work on it in the short term. We would certainly be happy to accept any contributions adding support for this.

@OneCricketeer
Copy link

Just chiming in here to document my notes.

I wrote this in a separate project to get topic configurations alongside the partition count in the TopicDescription object.

    /**
     * Wrapper around {@link AdminClient#describeTopics(Collection)} and {@link AdminClient#describeConfigs(Collection)}  
     * for a single topic.
     *
     * @param adminClient a {@link AdminClient}
     * @param topic       the topic to describe
     * @return {@link KafkaFuture} with the result of the topic description
     */
    private static KafkaFuture<TopicConfigDescription> whenTopicIsDescribed(AdminClient adminClient, String topic) {

        Optional<TopicDescription> topicDescription;
        Optional<List<ConfigEntry>> dynamicTopicConfigEntries;

        try {
            topicDescription = Optional.of(adminClient.describeTopics(Collections.singletonList(topic)).all()
                    .thenApply(topicDescriptionMap -> topicDescriptionMap.get(topic))
                    .get());

            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
            dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource))
                    .all()
                    .thenApply(configMap -> configMap.get(resource).entries()
                            .stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
                            .collect(toList())
                    )
                    .get());

        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Unable to get topic description");
        }

        return KafkaFuture.completedFuture(new TopicConfigDescription(
                topicDescription.orElseThrow(() -> new RuntimeException("Unable to get topic description")),
                dynamicTopicConfigEntries.orElseThrow(() -> new RuntimeException("Unable to get topic configurations"))
        ));
    }

    static class TopicConfigDescription {
        final TopicDescription description;
        final List<ConfigEntry> topicConfigs;

        public TopicConfigDescription(TopicDescription description, List<ConfigEntry> topicConfigs) {
            this.description = description;
            this.topicConfigs = topicConfigs;
        }

        public String getTopicName() {
            return description.name();
        }
    }

@OneCricketeer
Copy link

In progress changes to add in AdminClient...

https://github.com/salesforce/mirus/compare/master...cricket007:refactor-constants-add-adminclient?expand=1

Would be interested to know how the monitor thread works exactly... For example, I assume the List<TopicPartition> contain multiple topics? I ask because I think I will need a Map<String, List<ConfigEntry>> upfront for a mapping of topics to their configurations. Not sure the best place to put that yet in context of checking if the source and destination have changed.

@pdavidson100
Copy link
Contributor

Yes, List can contain partitions multiple topics. This happens in fetchMatchingPartitions, where we iterate over the current list of source topics - see:

private List<TopicPartition> fetchMatchingPartitions(Consumer<byte[], byte[]> consumer) {

@OneCricketeer
Copy link

Right, so I guess my question is, should I modify this line to map out instead to an "enriched" ConfigurgedTopicPartition wrapper class like the one shown above?

.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))

Just thinking aloud here as to the best place to get access to the topic configs

@OneCricketeer
Copy link

OneCricketeer commented Jan 25, 2019

Thinking more about previous suggestion, Map<String, List<ConfigEntry>> makes more sense. A TopicPartition doesn't hold any configurations, and doing a lookup from map.get(topicPartition.topic()) is easier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants