diff --git a/bitsail-common/src/main/java/com/bytedance/bitsail/common/util/DateUtil.java b/bitsail-common/src/main/java/com/bytedance/bitsail/common/util/DateUtil.java index 41a6c99f8..0579127aa 100644 --- a/bitsail-common/src/main/java/com/bytedance/bitsail/common/util/DateUtil.java +++ b/bitsail-common/src/main/java/com/bytedance/bitsail/common/util/DateUtil.java @@ -26,9 +26,11 @@ import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -249,4 +251,24 @@ public static long convertStringToSeconds(Column column) { return stringToDate(column.asString(), null, null).toInstant().getEpochSecond(); } } + + public static List getDatesBetweenTwoDate(Date beginDate, Date endDate) { + List dataRange = new ArrayList<>(); + dataRange.add(beginDate); + Calendar cal = Calendar.getInstance(); + cal.setTime(beginDate); + while (endDate.after(cal.getTime())) { + cal.add(Calendar.DAY_OF_MONTH, 1); + dataRange.add(cal.getTime()); + } + return dataRange; + } + + public static Date getNDaysAfterDate(Date date, int n) { + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + cal.add(Calendar.DAY_OF_MONTH, n); + return cal.getTime(); + } + } diff --git a/bitsail-common/src/test/java/com/bytedance/bitsail/common/util/DateUtilTest.java b/bitsail-common/src/test/java/com/bytedance/bitsail/common/util/DateUtilTest.java index 11503399f..ee7370d12 100644 --- a/bitsail-common/src/test/java/com/bytedance/bitsail/common/util/DateUtilTest.java +++ b/bitsail-common/src/test/java/com/bytedance/bitsail/common/util/DateUtilTest.java @@ -21,6 +21,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Date; + public class DateUtilTest { @Test public void testConvertStringToSeconds() { @@ -29,4 +31,12 @@ public void testConvertStringToSeconds() { Assert.assertEquals(1575734400L, DateUtil.convertStringToSeconds(new StringColumn("2020-08-01"))); Assert.assertEquals(1596214923L, DateUtil.convertStringToSeconds(new StringColumn("2020-08-01 01:02:03"))); } + + @Test + public void testGetDatesBetweenTwoDate() { + Date start = new Date(); + Date end = new Date(); + Assert.assertEquals(1, DateUtil.getDatesBetweenTwoDate(start, end).size()); + } + } diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java index de30a25aa..9c476c2f4 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java @@ -19,10 +19,10 @@ import com.bytedance.bitsail.common.annotation.Essential; import com.bytedance.bitsail.common.option.ConfigOption; import com.bytedance.bitsail.common.option.WriterOptions; +import com.bytedance.bitsail.connector.doris.partition.DorisPartitionTemplate; import com.alibaba.fastjson.TypeReference; -import java.util.List; import java.util.Map; import static com.bytedance.bitsail.common.option.ConfigOptions.key; @@ -60,10 +60,9 @@ public interface DorisWriterOptions extends WriterOptions.BaseWriterOptions { key(WRITER_PREFIX + "table_has_partition") .defaultValue(true); - ConfigOption>> PARTITIONS = + ConfigOption PARTITIONS = key(WRITER_PREFIX + "partitions") - .onlyReference(new TypeReference>>() { - }); + .onlyReference(new TypeReference() {}); ConfigOption SINK_FLUSH_INTERVAL_MS = key(WRITER_PREFIX + "sink_flush_interval_ms") diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionLevel.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionLevel.java new file mode 100644 index 000000000..9f9aa8a12 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionLevel.java @@ -0,0 +1,31 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.partition; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum DorisPartitionLevel { + DAY("day"), + WEEK("week"), + MONTH("month"), + QUARTER("quarter"); + + private final String type; +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionTemplate.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionTemplate.java new file mode 100644 index 000000000..dd5715c62 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionTemplate.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.partition; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@AllArgsConstructor +@NoArgsConstructor +@Data +public class DorisPartitionTemplate { + + @JsonProperty(value = "prefix", required = false, defaultValue = "p") + private String prefix; + + @JsonProperty(value = "start_range", required = true) + private String startRange; + + @JsonProperty(value = "end_range", required = true) + private String endRange; + + @JsonProperty(value = "pattern", required = false, defaultValue = "yyyy-MM-dd") + private String pattern; + + @JsonProperty(value = "partition_level", required = false, defaultValue = "day") + private String partitionLevel; + + public enum DorisPartitionLevel { + DAY, + WEEK, + MONTH, + QUARTER; + } + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java index 40be345d4..4710aad46 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java @@ -21,11 +21,13 @@ import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter; import com.bytedance.bitsail.base.serializer.BinarySerializer; import com.bytedance.bitsail.base.serializer.SimpleVersionedBinarySerializer; +import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.exception.CommonErrorCode; import com.bytedance.bitsail.common.model.ColumnInfo; import com.bytedance.bitsail.common.type.TypeInfoConverter; import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.common.util.DateUtil; import com.bytedance.bitsail.connector.doris.DorisConnectionHolder; import com.bytedance.bitsail.connector.doris.committer.DorisCommittable; import com.bytedance.bitsail.connector.doris.committer.DorisCommittableSerializer; @@ -36,6 +38,7 @@ import com.bytedance.bitsail.connector.doris.option.DorisWriterOptions; import com.bytedance.bitsail.connector.doris.partition.DorisPartition; import com.bytedance.bitsail.connector.doris.partition.DorisPartitionManager; +import com.bytedance.bitsail.connector.doris.partition.DorisPartitionTemplate; import com.bytedance.bitsail.connector.doris.sink.ddl.DorisSchemaManagerGenerator; import com.alibaba.fastjson.JSON; @@ -44,12 +47,16 @@ import java.io.IOException; import java.sql.SQLException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.stream.Collectors; public class DorisSink implements Sink { private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); @@ -136,16 +143,58 @@ private void initDorisOptions(BitSailConfiguration writerConfiguration) { // Need partition info in batch replace modes. if (isHasPartition && this.writeMode.equals(DorisExecutionOptions.WRITE_MODE.BATCH_REPLACE)) { //BATCH and REPLACE mode need the partition infos - List> partitionList = writerConfiguration.getNecessaryOption(DorisWriterOptions.PARTITIONS, CommonErrorCode.CONFIG_ERROR); - builder.partitions( - partitionList.stream() - .map(partition -> JSON.parseObject(JSON.toJSONString(partition), DorisPartition.class)) - .collect(Collectors.toList()) - ); + DorisPartitionTemplate dorisPartitionTemplate = writerConfiguration.getNecessaryOption(DorisWriterOptions.PARTITIONS, CommonErrorCode.CONFIG_ERROR); + List dorisPartitions = parseTemplateToDorisPartitions(dorisPartitionTemplate); + builder.partitions(dorisPartitions); } dorisOptions = builder.build(); } + private List parseTemplateToDorisPartitions(DorisPartitionTemplate dorisPartitionTemplate) { + String pattern = dorisPartitionTemplate.getPattern(); + String prefix = dorisPartitionTemplate.getPrefix(); + String start = dorisPartitionTemplate.getStartRange(); + String end = dorisPartitionTemplate.getEndRange(); + DorisPartitionTemplate.DorisPartitionLevel partitionLevel = DorisPartitionTemplate.DorisPartitionLevel.valueOf( + dorisPartitionTemplate.getPartitionLevel().toUpperCase()); + + SimpleDateFormat sdf = new SimpleDateFormat(pattern); + try { + Date dateStart = sdf.parse(start); + Date dateEnd = sdf.parse(end); + List dorisPartitions = new ArrayList<>(); + List listDate; + switch (partitionLevel) { + case DAY: + listDate = DateUtil.getDatesBetweenTwoDate(dateStart, dateEnd); + break; + case WEEK: + listDate = DateUtil.getDatesBetweenTwoDate(dateStart, dateEnd); + break; + case MONTH: + listDate = DateUtil.getDatesBetweenTwoDate(dateStart, dateEnd); + break; + case QUARTER: + listDate = DateUtil.getDatesBetweenTwoDate(dateStart, dateEnd); + break; + default: + throw BitSailException.asBitSailException(CommonErrorCode.CONFIG_ERROR, "The configure partition level " + partitionLevel + + " is not supported, only support day, week, month and quarter"); + } + + listDate.forEach(date -> { + DorisPartition dorisPartition = new DorisPartition(); + dorisPartition.setName(prefix + sdf.format(date)); + dorisPartition.setStartRange(Collections.singletonList(sdf.format(date))); + dorisPartition.setEndRange(Collections.singletonList(sdf.format(DateUtil.getNDaysAfterDate(date, 1)))); + dorisPartitions.add(dorisPartition); + }); + return dorisPartitions; + } catch (ParseException e) { + throw BitSailException.asBitSailException(CommonErrorCode.CONFIG_ERROR, "Can't parse configuration info: " + dorisPartitionTemplate, e); + } + } + private void initDorisExecutionOptions(BitSailConfiguration writerConfiguration) { LOG.info("Start to init DorisExecutionOptions!"); final DorisExecutionOptions.DorisExecutionOptionsBuilder builder = DorisExecutionOptions.builder();