-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28256: Iceberg: Major QB Compaction on partition level with evol… #5248
base: master
Are you sure you want to change the base?
Conversation
99d309d
to
4c36293
Compare
4c36293
to
746d1f2
Compare
746d1f2
to
abcf246
Compare
abcf246
to
7398bd5
Compare
7398bd5
to
c518d81
Compare
throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC); | ||
} | ||
partitions = partitions.stream().filter(part -> part.getSpec().size() == partitionSpec.size()).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are we checking here? number of partitions in table spec and compaction request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validates that the partition spec given in the compaction command matches exactly one partition in the table, not a partial partition spec.
Let's say, a table has partitions with specs (a,b) and (a,b,c) because of evolution and a compaction command is run with spec (a,b). On line 144 it will find both partition specs and after filtering it will have only one (a,b) and will pass validation.
Another case, let's assume a table has the same partitions with specs (a,b) and (a,b,c) and a compaction command is run with spec (a). On line 144 it will find both partition specs and after filtering it will have zero partitions and will fail validation with TOO_MANY_COMPACTION_PARTITIONS exception.
… by spec by any past table specs. Moved Iceberg compaction constant to a class in Iceberg module. Use VirtualColumn.PARTITION_SPEC_ID.getName() instead of partition__spec__id.
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_PARTITION_PATH)) | ||
.orElse(null); | ||
|
||
if (rewritePolicy != RewritePolicy.DEFAULT || compactionPartSpecId != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is the value of rewritePolicy = ALL_PARTITIONS in the case of table level compaction on a fully partitioned table and rewritePolicy = PARTITION in the case of partition level compaction?
@@ -55,6 +57,9 @@ public void analyzeInternal(ASTNode root) throws SemanticException { | |||
if (command.getType() == HiveParser.TOK_ALTERTABLE_RENAMEPART) { | |||
partitionSpec = getPartSpec(partitionSpecNode); | |||
} else { | |||
if (command.getType() == HiveParser.TOK_ALTERTABLE_COMPACT) { | |||
HiveConf.setVar(conf, HiveConf.ConfVars.REWRITE_POLICY, Context.RewritePolicy.PARTITION.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it applied for partition compaction case or is it added generically to all cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rewritePolicy = ALL_PARTITIONS
is added in case of full table compaction for partitioned and unpartitioned tables.
HiveConf.setVar(conf, HiveConf.ConfVars.REWRITE_POLICY, Context.RewritePolicy.PARTITION.name());
Is added for partition compaction case only. This code branch is reachable only when partitionSpecNode != null
and command.getType() == HiveParser.TOK_ALTERTABLE_COMPACT
… partition spec in IcebergMajorQueryCompactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1 (pending tests)
|
.map(Integer::valueOf) | ||
.orElse(null); | ||
|
||
String compactionPartitionPath = outputTable.jobContexts.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we please use consistent naming, not
compactionPartSpecId/COMPACTION_PART_SPEC_ID
compactionPartitionPath/COMPACTION_PARTITION_PATH
why do we need compaction in name? how about:
partitionSpecId/PARTITION_SPEC_ID
partitionPath/PARTITION_PATH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also why don't we compute path only inside compaction block?
return data; | ||
} | ||
|
||
public static Pair<List<DataFile>, List<DeleteFile>> getDataAndDeleteFiles(Table table, int specId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not to split it on 2, instead of this combined Pair<List, List>?
are we reusing something and saving cpu cycles?
@@ -746,6 +746,11 @@ default void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, | |||
throw new UnsupportedOperationException("Storage handler does not support validation of partition values"); | |||
} | |||
|
|||
default void validatePartAnySpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it do?
@@ -1690,7 +1690,12 @@ public static Map<String, String> getPartSpec(ASTNode node) | |||
public static void validatePartSpec(Table tbl, Map<String, String> partSpec, | |||
ASTNode astNode, HiveConf conf, boolean shouldBeFull) throws SemanticException { | |||
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) { | |||
tbl.getStorageHandler().validatePartSpec(tbl, partSpec); | |||
if (Context.RewritePolicy.fromString(conf.get(HiveConf.ConfVars.REWRITE_POLICY.varname, | |||
Context.RewritePolicy.DEFAULT.name())) == Context.RewritePolicy.PARTITION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to specify default?
@@ -801,6 +806,11 @@ default Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table, M | |||
throw new UnsupportedOperationException("Storage handler does not support getting partition for a table."); | |||
} | |||
|
|||
default Partition getPartitionAnySpec(org.apache.hadoop.hive.ql.metadata.Table table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it do, can you add meaningful javadoc?
@@ -3663,7 +3664,12 @@ public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitionsByNames | |||
|
|||
public Partition getPartition(Table tbl, Map<String, String> partSpec) throws HiveException { | |||
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) { | |||
return tbl.getStorageHandler().getPartition(tbl, partSpec); | |||
if (Context.RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname, | |||
Context.RewritePolicy.DEFAULT.name())) == Context.RewritePolicy.PARTITION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need default
@@ -256,7 +256,8 @@ public String toString() { | |||
public enum RewritePolicy { | |||
|
|||
DEFAULT, | |||
ALL_PARTITIONS; | |||
ALL_PARTITIONS, | |||
PARTITION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need separate policy for PARTITION? not ALL_PARTITIONS isn't enough?
|
||
public class IcebergCompactionContext { | ||
|
||
public static final String COMPACTION_PART_SPEC_ID = "compaction_part_spec_id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put this to the rest of iceberg constants, no need for this class
List<Pair<PartitionData, Integer>> partitionList = Lists.newArrayList(); | ||
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) { | ||
fileScanTasks.forEach(task -> | ||
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(), row -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you creating HashSet from KV Pairs?
…ution
What changes were proposed in this pull request?
Adding support for compacting a given partition of a Hive Iceberg table even if the table has undergone partition evolution. The partition spec can be current or one of the older partition specs of the table.
Why are the changes needed?
So far compaction on partition level wasn't supported for Hive Iceberg tables that have undergone partition evolution.
Does this PR introduce any user-facing change?
Yes. Users can now submit partition-level compaction requests for Hive Iceberg tables with partition spec that conforms to one of the previous partition specs in the table.
Is the change a dependency upgrade?
No
How was this patch tested?
New q-tests added