Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28256: Iceberg: Major QB Compaction on partition level with evol… #5248

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

difin
Copy link
Contributor

@difin difin commented May 13, 2024

…ution

What changes were proposed in this pull request?

Adding support for compacting a given partition of a Hive Iceberg table even if the table has undergone partition evolution. The partition spec can be current or one of the older partition specs of the table.

Why are the changes needed?

So far compaction on partition level wasn't supported for Hive Iceberg tables that have undergone partition evolution.

Does this PR introduce any user-facing change?

Yes. Users can now submit partition-level compaction requests for Hive Iceberg tables with partition spec that conforms to one of the previous partition specs in the table.

Is the change a dependency upgrade?

No

How was this patch tested?

New q-tests added

throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
}
partitions = partitions.stream().filter(part -> part.getSpec().size() == partitionSpec.size()).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are we checking here? number of partitions in table spec and compaction request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validates that the partition spec given in the compaction command matches exactly one partition in the table, not a partial partition spec.

Let's say, a table has partitions with specs (a,b) and (a,b,c) because of evolution and a compaction command is run with spec (a,b). On line 144 it will find both partition specs and after filtering it will have only one (a,b) and will pass validation.

Another case, let's assume a table has the same partitions with specs (a,b) and (a,b,c) and a compaction command is run with spec (a). On line 144 it will find both partition specs and after filtering it will have zero partitions and will fail validation with TOO_MANY_COMPACTION_PARTITIONS exception.

… by spec by any past table specs.

Moved Iceberg compaction constant to a class in Iceberg module.

Use VirtualColumn.PARTITION_SPEC_ID.getName() instead of partition__spec__id.
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_PARTITION_PATH))
.orElse(null);

if (rewritePolicy != RewritePolicy.DEFAULT || compactionPartSpecId != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the value of rewritePolicy = ALL_PARTITIONS in the case of table level compaction on a fully partitioned table and rewritePolicy = PARTITION in the case of partition level compaction?

@@ -55,6 +57,9 @@ public void analyzeInternal(ASTNode root) throws SemanticException {
if (command.getType() == HiveParser.TOK_ALTERTABLE_RENAMEPART) {
partitionSpec = getPartSpec(partitionSpecNode);
} else {
if (command.getType() == HiveParser.TOK_ALTERTABLE_COMPACT) {
HiveConf.setVar(conf, HiveConf.ConfVars.REWRITE_POLICY, Context.RewritePolicy.PARTITION.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it applied for partition compaction case or is it added generically to all cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewritePolicy = ALL_PARTITIONS is added in case of full table compaction for partitioned and unpartitioned tables.

HiveConf.setVar(conf, HiveConf.ConfVars.REWRITE_POLICY, Context.RewritePolicy.PARTITION.name());
Is added for partition compaction case only. This code branch is reachable only when partitionSpecNode != null and command.getType() == HiveParser.TOK_ALTERTABLE_COMPACT

… partition spec in IcebergMajorQueryCompactor.
Copy link
Contributor

@SourabhBadhya SourabhBadhya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1 (pending tests)

Copy link

sonarcloud bot commented Jun 11, 2024

Quality Gate Passed Quality Gate passed

Issues
15 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarCloud

.map(Integer::valueOf)
.orElse(null);

String compactionPartitionPath = outputTable.jobContexts.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we please use consistent naming, not

compactionPartSpecId/COMPACTION_PART_SPEC_ID
compactionPartitionPath/COMPACTION_PARTITION_PATH

why do we need compaction in name? how about:

partitionSpecId/PARTITION_SPEC_ID
partitionPath/PARTITION_PATH

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why don't we compute path only inside compaction block?

return data;
}

public static Pair<List<DataFile>, List<DeleteFile>> getDataAndDeleteFiles(Table table, int specId,
Copy link
Member

@deniskuzZ deniskuzZ Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not to split it on 2, instead of this combined Pair<List, List>?
are we reusing something and saving cpu cycles?

@@ -746,6 +746,11 @@ default void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
throw new UnsupportedOperationException("Storage handler does not support validation of partition values");
}

default void validatePartAnySpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it do?

@@ -1690,7 +1690,12 @@ public static Map<String, String> getPartSpec(ASTNode node)
public static void validatePartSpec(Table tbl, Map<String, String> partSpec,
ASTNode astNode, HiveConf conf, boolean shouldBeFull) throws SemanticException {
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
tbl.getStorageHandler().validatePartSpec(tbl, partSpec);
if (Context.RewritePolicy.fromString(conf.get(HiveConf.ConfVars.REWRITE_POLICY.varname,
Context.RewritePolicy.DEFAULT.name())) == Context.RewritePolicy.PARTITION) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to specify default?

@@ -801,6 +806,11 @@ default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, M
throw new UnsupportedOperationException("Storage handler does not support getting partition for a table.");
}

default Partition getPartitionAnySpec(org.apache.hadoop.hive.ql.metadata.Table table,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it do, can you add meaningful javadoc?

@@ -3663,7 +3664,12 @@ public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitionsByNames

public Partition getPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
return tbl.getStorageHandler().getPartition(tbl, partSpec);
if (Context.RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname,
Context.RewritePolicy.DEFAULT.name())) == Context.RewritePolicy.PARTITION) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need default

@@ -256,7 +256,8 @@ public String toString() {
public enum RewritePolicy {

DEFAULT,
ALL_PARTITIONS;
ALL_PARTITIONS,
PARTITION;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need separate policy for PARTITION? not ALL_PARTITIONS isn't enough?


public class IcebergCompactionContext {

public static final String COMPACTION_PART_SPEC_ID = "compaction_part_spec_id";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this to the rest of iceberg constants, no need for this class

List<Pair<PartitionData, Integer>> partitionList = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
fileScanTasks.forEach(task ->
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(), row -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you creating HashSet from KV Pairs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants