Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAT-19308 - feat(databricks): ensure essential table properties Delta.* #233

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8262a31
DAT-19308 - feat(databricks): ensure essential table properties are p…
CharlesQueiroz Jan 3, 2025
4a79ace
DAT-19308 - feat(databricks): just to trigger the tests
CharlesQueiroz Jan 3, 2025
7c1c4ad
DAT-19308 - Removing default values you don't need
CharlesQueiroz Jan 7, 2025
8e9480b
DAT-19308 - To validate pipeline tests
CharlesQueiroz Jan 9, 2025
45c9317
DAT-19308 - To validate pipeline tests
CharlesQueiroz Jan 9, 2025
31994be
DAT-19308 - To validate pipeline tests
CharlesQueiroz Jan 9, 2025
42de58d
DAT-19308 - Fixing tests
CharlesQueiroz Jan 9, 2025
89d915b
DAT-19308 - Fixing tests
CharlesQueiroz Jan 9, 2025
4c9ec5a
DAT-19308 - Fixing tests
CharlesQueiroz Jan 9, 2025
a17da30
DAT-19308 - cleanup the logic.
CharlesQueiroz Jan 9, 2025
4e814e0
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
6dd2ef1
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
4b44ccb
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
97fd5a5
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
96ee659
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
b4dd494
DAT-19308 - fixing tests
CharlesQueiroz Jan 10, 2025
50bc107
DAT-19308 - fixing tests
CharlesQueiroz Jan 12, 2025
d40231d
DAT-19308 - fixing tests
CharlesQueiroz Jan 13, 2025
dc84426
brought back java 8
KushnirykOleh Jan 16, 2025
ba0f5b1
reverted test changes
KushnirykOleh Jan 16, 2025
92b5816
reverted personal schema
KushnirykOleh Jan 16, 2025
d15e929
more rollbacks
KushnirykOleh Jan 16, 2025
40cc1b2
more rollbacks
KushnirykOleh Jan 16, 2025
59326f3
disabled external location IT test
KushnirykOleh Jan 24, 2025
4e6c755
reverted chnages for local createTable testing
KushnirykOleh Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -26,6 +27,15 @@ public class ChangedTblPropertiesUtil {

private static final String SPLIT_ON_COMMAS = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"$])";
private static final String SPLIT_ON_EQUALS = "=(?=(?:[^\"]*\"[^\"]*\")*[^\"$])";
private static final Set<String> ALLOWED_DELTA_PROPERTIES = Set.of(
"delta.columnMapping.mode",
"delta.enableDeletionVectors",
"delta.feature.allowColumnDefaults",
"delta.logRetentionDuration",
"delta.deletedFileRetentionDuration",
"delta.targetFileSize",
"delta.enableChangeDataFeed"
);

private ChangedTblPropertiesUtil() {
}
Expand Down Expand Up @@ -60,6 +70,7 @@ static AbstractAlterPropertiesChangeDatabricks[] getAbstractTablePropertiesChang
addPropertiesMap.put(key, value);
}
});

//then we remove the properties that are not in the reference
Map<String, String> removePropertiesMap = comparedValuesMap.entrySet().stream()
.filter(entry -> !referencedValuesMap.containsKey(entry.getKey()))
Expand Down Expand Up @@ -94,11 +105,14 @@ static AbstractAlterPropertiesChangeDatabricks[] getAbstractTablePropertiesChang
*/
private static Map<String, String> convertToMapExcludingDeltaParameters(Object referenceValueObject) {
String referenceValue = referenceValueObject == null ? "" : referenceValueObject.toString();
return Arrays.stream(referenceValue.split(SPLIT_ON_COMMAS))
return Arrays.stream(referenceValue.split(SPLIT_ON_COMMAS))
.map(s -> s.split(SPLIT_ON_EQUALS))
.filter(a -> a.length > 1)
.map(a -> new String[]{a[0].trim(), a[1].trim()})
.filter(a -> !a[0].replace("'", "").matches("^delta.+"))
.map(a -> new String[]{a[0].trim(), a[1].trim()})
.filter(a -> {
String propertyName = a[0].replace("'", "");
return !propertyName.startsWith("delta.") || ALLOWED_DELTA_PROPERTIES.contains(propertyName);
})
.collect(Collectors.toMap(a -> a[0], a -> a[1]));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
package liquibase.ext.databricks.sqlgenerator;


import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.change.createTable.CreateTableStatementDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.sqlgenerator.core.CreateTableGenerator;
import liquibase.database.Database;
import liquibase.sql.Sql;
import liquibase.sql.UnparsedSql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.CreateTableGenerator;
import liquibase.statement.core.CreateTableStatement;
import liquibase.structure.DatabaseObject;
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class CreateTableGeneratorDatabricks extends CreateTableGenerator {

private static final String[] PROPERTY_ORDER = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is still some spare time to fix we could get rid of this array and replace it by the keys from DEFAULT_VALUES .

"'delta.enableDeletionVectors'",
"'delta.columnMapping.mode'",
"'delta.feature.allowColumnDefaults'"
};

private static final Map<String, String> DEFAULT_VALUES = Map.of(
"'delta.enableDeletionVectors'", "true",
"'delta.columnMapping.mode'", "'name'",
"'delta.feature.allowColumnDefaults'", "'supported'"
);

@Override
public int getPriority() {
return PRIORITY_DATABASE;
Expand All @@ -29,16 +44,58 @@ public boolean supports(CreateTableStatement statement, Database database) {

public ValidationErrors validate(CreateTableStatementDatabricks createStatement, Database database, SqlGeneratorChain sqlGeneratorChain) {
ValidationErrors validationErrors = new ValidationErrors();
if (!(createStatement.getPartitionColumns().isEmpty()) && !(createStatement.getClusterColumns().isEmpty())){
if (!(createStatement.getPartitionColumns().isEmpty()) && !(createStatement.getClusterColumns().isEmpty())) {
validationErrors.addError("WARNING! Databricks does not supported creating tables with PARTITION and CLUSTER columns, please one supply one option.");
}
return validationErrors;
}

private String mergeTableProperties(String customProperties) {

// First, ensure all essential properties are present with default values
Map<String, String> properties = new LinkedHashMap<>(DEFAULT_VALUES);

// If there are custom properties, parse and add them
if (StringUtils.isNotEmpty(customProperties)) {
Arrays.stream(customProperties.split(","))
.map(String::trim)
.filter(prop -> !prop.isEmpty())
.forEach(prop -> {
String[] parts = prop.split("=", 2);
if (parts.length == 2) {
properties.put(parts[0].trim(), parts[1].trim());
}
});
}

// Build result in specified order
StringBuilder result = new StringBuilder();

for (String key : PROPERTY_ORDER) {
if (properties.containsKey(key)) {
if (result.length() > 0) {
result.append(", ");
}
result.append(key).append(" = ").append(properties.get(key));
properties.remove(key);
}
}

// Then add any remaining custom properties
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getValue() != null && !entry.getValue().equals("null")) {
if (result.length() > 0) {
result.append(", ");
}
result.append(entry.getKey()).append(" = ").append(entry.getValue());
}
}

return result.toString();
}

@Override
public Sql[] generateSql(CreateTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

Sql[] sqls = super.generateSql(statement, database, sqlGeneratorChain);
StringBuilder finalsql = new StringBuilder(sqls[0].toSql());

Expand All @@ -50,13 +107,13 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
} else {
finalsql.append(" USING delta");
}
if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNotEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) {
finalsql.append(" TBLPROPERTIES (").append(thisStatement.getExtendedTableProperties().getTblProperties()).append(")");
} else {
finalsql.append(" TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)");

String properties = null;
if (thisStatement.getExtendedTableProperties() != null) {
properties = thisStatement.getExtendedTableProperties().getTblProperties();
}
finalsql.append(" TBLPROPERTIES(").append(mergeTableProperties(properties)).append(")");

// Databricks can decide to have tables live in a particular location. If null, Databricks will handle the location automatically in DBFS
if (!StringUtils.isEmpty(thisStatement.getTableLocation())) {
finalsql.append(" LOCATION '").append(thisStatement.getTableLocation()).append("'");
} else if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNotEmpty(thisStatement.getExtendedTableProperties().getTableLocation())) {
Expand All @@ -66,49 +123,34 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
List<String> clusterCols = thisStatement.getClusterColumns();
List<String> partitionCols = thisStatement.getPartitionColumns();


// If there are any cluster columns, add the clause
// ONLY if there are NOT cluster columns, then do partitions, but never both.
if (!clusterCols.isEmpty()) {

finalsql.append(" CLUSTER BY (");

int val = 0;
while (clusterCols.size() > val) {
finalsql.append(clusterCols.get(val));

val +=1;
val += 1;
if (clusterCols.size() > val) {
finalsql.append(", ");
}
else {
} else {
finalsql.append(")");
}
}
} else if (!partitionCols.isEmpty()) {
finalsql.append(" PARTITIONED BY (");

int val = 0;
while (partitionCols.size() > val) {
finalsql.append(partitionCols.get(val));

val +=1;
val += 1;
if (partitionCols.size() > val) {
finalsql.append(", ");
}
else {
} else {
finalsql.append(")");
}
}
}


}

sqls[0] = new UnparsedSql(finalsql.toString(), sqls[0].getAffectedDatabaseObjects().toArray(new DatabaseObject[0]));

return sqls;

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import liquibase.structure.core.Table;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.*;

class ChangedTblPropertiesUtilTest {
Expand Down Expand Up @@ -136,4 +140,98 @@ void ignoreInternalDeltaProperties() {
assertEquals("'this.should.be.dropped'", result[1].getUnsetExtendedTableProperties().getTblProperties());
assertNull(result[1].getSetExtendedTableProperties());
}

@Test
void allowSpecificDeltaProperties() {
// Tests a mix of allowed and disallowed delta properties
Difference difference = new Difference("tblProperties",
"'delta.columnMapping.mode'='name', 'delta.enableDeletionVectors'=true, 'delta.feature.allowColumnDefaults'=true, 'delta.someOtherProperty'=false",
"'delta.columnMapping.mode'='id', 'delta.enableDeletionVectors'=false, 'delta.invalidProperty'=true");

// Act
AbstractAlterPropertiesChangeDatabricks[] result = ChangedTblPropertiesUtil
.getAlterTablePropertiesChangeDatabricks(table, control, difference);

// Assert
assertNotNull(result);
assertEquals(1, result.length);

Set<String> expectedProperties = Set.of(
"'delta.columnMapping.mode'='name'",
"'delta.enableDeletionVectors'=true",
"'delta.feature.allowColumnDefaults'=true"
);

Set<String> actualProperties = Arrays.stream(
result[0].getSetExtendedTableProperties().getTblProperties().split(","))
.map(String::trim)
.collect(Collectors.toSet());

// Checks if only allowed properties are included
assertEquals(expectedProperties, actualProperties);
}

@Test
void allowAllWhitelistedDeltaProperties() {
// We test all properties on the whitelist
Difference difference = new Difference("tblProperties",
"'delta.columnMapping.mode'='name', " +
"'delta.enableDeletionVectors'=true, " +
"'delta.feature.allowColumnDefaults'=true,", "");

// Act
AbstractAlterPropertiesChangeDatabricks[] result = ChangedTblPropertiesUtil
.getAlterTablePropertiesChangeDatabricks(table, control, difference);

// Assert
assertNotNull(result);
assertEquals(1, result.length);
// Checks whether all whitelist properties have been maintained
assertTrue(result[0].getSetExtendedTableProperties().getTblProperties().contains("'delta.columnMapping.mode'='name'"));
assertTrue(result[0].getSetExtendedTableProperties().getTblProperties().contains("'delta.enableDeletionVectors'=true"));
assertTrue(result[0].getSetExtendedTableProperties().getTblProperties().contains("'delta.feature.allowColumnDefaults'=true"));
}

@Test
void mixDeltaAndRegularProperties() {
// Tests a mix of allowed delta properties and regular properties
Difference difference = new Difference("tblProperties",
"'delta.columnMapping.mode'='name', 'regular.property'=true, 'delta.enableDeletionVectors'=true",
"'delta.columnMapping.mode'='id', 'other.property'=false");

// Act
AbstractAlterPropertiesChangeDatabricks[] result = ChangedTblPropertiesUtil
.getAlterTablePropertiesChangeDatabricks(table, control, difference);

// Assert
assertNotNull(result);
assertEquals(2, result.length);

// Verifies that both permitted and regular delta properties were processed correctly
assertTrue(result[0].getSetExtendedTableProperties().getTblProperties()
.contains("'delta.columnMapping.mode'='name'"));
assertTrue(result[0].getSetExtendedTableProperties().getTblProperties()
.contains("'regular.property'=true"));
assertEquals("'other.property'", result[1].getUnsetExtendedTableProperties().getTblProperties());
}

@Test
void updateWhitelistedDeltaProperties() {
// Tests updating values for allowed delta properties
Difference difference = new Difference("tblProperties",
"'delta.columnMapping.mode'='name', 'delta.enableDeletionVectors'=true",
"'delta.columnMapping.mode'='id', 'delta.enableDeletionVectors'=false");

// Act
AbstractAlterPropertiesChangeDatabricks[] result = ChangedTblPropertiesUtil
.getAlterTablePropertiesChangeDatabricks(table, control, difference);

// Assert
assertNotNull(result);
assertEquals(1, result.length);
// Checks whether the new values were applied correctly
String expectedNormalized = "'delta.columnMapping.mode'='name','delta.enableDeletionVectors'=true";
String actualNormalized = result[0].getSetExtendedTableProperties().getTblProperties();
assertEquals(expectedNormalized, actualNormalized);
}
}
Loading
Loading