Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testTimePartition() throws Exception {
}
timestatmps.forEach(
t -> {
long timePartitionId = TimePartitionUtils.getTimePartitionId(t);
long timePartitionId = TimePartitionUtils.getTimePartitionId(t, "root.sg1");
assertTrue(timePartitions.contains(timePartitionId));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ databaseAttributeClause
databaseAttributeKey
: TTL
| TIME_PARTITION_INTERVAL
| TIME_PARTITION_ORIGIN
| SCHEMA_REGION_GROUP_NUM
| DATA_REGION_GROUP_NUM
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,10 @@ TIME_PARTITION_INTERVAL
: T I M E '_' P A R T I T I O N '_' I N T E R V A L
;

TIME_PARTITION_ORIGIN
: T I M E '_' P A R T I T I O N '_' O R I G I N
;

SCHEMA_REGION_GROUP_NUM
: S C H E M A '_' R E G I O N '_' G R O U P '_' N U M
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -232,6 +233,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case SetTimePartitionInterval:
plan = new SetTimePartitionIntervalPlan();
break;
case SetTimePartitionOrigin:
plan = new SetTimePartitionOriginPlan();
break;
case AdjustMaxRegionGroupNum:
plan = new AdjustMaxRegionGroupNumPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum ConfigPhysicalPlanType {
SetSchemaReplicationFactor((short) 202),
SetDataReplicationFactor((short) 203),
SetTimePartitionInterval((short) 204),
SetTimePartitionOrigin((short) 212),
AdjustMaxRegionGroupNum((short) 205),
DeleteDatabase((short) 206),
PreDeleteDatabase((short) 207),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.database;

import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class SetTimePartitionOriginPlan extends ConfigPhysicalPlan {

private String storageGroup;

private long timePartitionOrigin;

public SetTimePartitionOriginPlan() {
super(ConfigPhysicalPlanType.SetTimePartitionOrigin);
}

public SetTimePartitionOriginPlan(String storageGroup, long timePartitionOrigin) {
this();
this.storageGroup = storageGroup;
this.timePartitionOrigin = timePartitionOrigin;
}

public String getDatabase() {
return storageGroup;
}

public long getTimePartitionOrigin() {
return timePartitionOrigin;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

BasicStructureSerDeUtil.write(storageGroup, stream);
stream.writeLong(timePartitionOrigin);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
storageGroup = BasicStructureSerDeUtil.readString(buffer);
timePartitionOrigin = buffer.getLong();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SetTimePartitionOriginPlan that = (SetTimePartitionOriginPlan) o;
return timePartitionOrigin == that.timePartitionOrigin
&& storageGroup.equals(that.storageGroup);
}

@Override
public int hashCode() {
return Objects.hash(storageGroup, timePartitionOrigin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@
public class AutoCleanPartitionTablePlan extends ConfigPhysicalPlan {

Map<String, Long> databaseTTLMap;
TTimePartitionSlot currentTimeSlot;
Map<String, TTimePartitionSlot> currentTimeSlotMap;

public AutoCleanPartitionTablePlan() {
super(ConfigPhysicalPlanType.AutoCleanPartitionTable);
}

public AutoCleanPartitionTablePlan(
Map<String, Long> databaseTTLMap, TTimePartitionSlot currentTimeSlot) {
Map<String, Long> databaseTTLMap, Map<String, TTimePartitionSlot> currentTimeSlotMap) {
this();
this.databaseTTLMap = databaseTTLMap;
this.currentTimeSlot = currentTimeSlot;
this.currentTimeSlotMap = currentTimeSlotMap;
}

public Map<String, Long> getDatabaseTTLMap() {
return databaseTTLMap;
}

public TTimePartitionSlot getCurrentTimeSlot() {
return currentTimeSlot;
public Map<String, TTimePartitionSlot> getCurrentTimeSlotMap() {
return currentTimeSlotMap;
}

@Override
Expand All @@ -64,7 +64,7 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
BasicStructureSerDeUtil.write(entry.getKey(), stream);
stream.writeLong(entry.getValue());
}
ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlot, stream);
ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlotMap, stream);
}

@Override
Expand All @@ -76,7 +76,8 @@ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
long value = buffer.getLong();
databaseTTLMap.put(key, value);
}
currentTimeSlot = ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
currentTimeSlotMap =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlotMap(buffer, new TreeMap<>());
}

@Override
Expand All @@ -89,11 +90,11 @@ public boolean equals(Object o) {
}
AutoCleanPartitionTablePlan that = (AutoCleanPartitionTablePlan) o;
return Objects.equals(databaseTTLMap, that.databaseTTLMap)
&& Objects.equals(currentTimeSlot, that.currentTimeSlot);
&& Objects.equals(currentTimeSlotMap, that.currentTimeSlotMap);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlot);
return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlotMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -720,6 +721,16 @@ public TSStatus setTimePartitionInterval(
}
}

@Override
public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.setTimePartitionOrigin(setTimePartitionOriginPlan);
} else {
return status;
}
}

@Override
public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
Expand Down Expand Up @@ -386,6 +387,8 @@ public interface IManager {

TSStatus setTimePartitionInterval(SetTimePartitionIntervalPlan configPhysicalPlan);

TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan configPhysicalPlan);

/**
* Count Databases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableCommentPlan;
Expand Down Expand Up @@ -470,6 +471,17 @@ public TSStatus setTimePartitionInterval(
}
}

public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
try {
return getConsensusManager().write(setTimePartitionOriginPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}

/**
* Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
* each Database based on existing cluster resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -435,6 +436,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
case SetTimePartitionInterval:
return clusterSchemaInfo.setTimePartitionInterval(
(SetTimePartitionIntervalPlan) physicalPlan);
case SetTimePartitionOrigin:
return clusterSchemaInfo.setTimePartitionOrigin((SetTimePartitionOriginPlan) physicalPlan);
case CreateRegionGroups:
return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
case OfferRegionMaintainTasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
*/
public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) {
long[] removedTimePartitionSlots =
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot).stream()
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot, databaseName).stream()
.map(TTimePartitionSlot::getStartTime)
.collect(Collectors.toList())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ public TSStatus autoCleanPartitionTable(AutoCleanPartitionTablePlan plan) {
if (isDatabaseExisted(database) && 0 < ttl && ttl < Long.MAX_VALUE) {
databasePartitionTables
.get(database)
.autoCleanPartitionTable(ttl, plan.getCurrentTimeSlot());
.autoCleanPartitionTable(
ttl, plan.getCurrentTimeSlotMap().getOrDefault(database, null));
}
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
Expand All @@ -53,6 +54,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
Expand Down Expand Up @@ -196,6 +198,10 @@ public TSStatus createDatabase(final DatabaseSchemaPlan plan) {
final TDatabaseSchema databaseSchema = plan.getSchema();
final PartialPath partialPathName = getQualifiedDatabasePartialPath(databaseSchema.getName());

// Update TimePartitionUtils cache with database-specific time partition settings
TimePartitionUtils.updateDatabaseTimePartitionConfig(
databaseSchema.getName(), databaseSchema);

final ConfigMTree mTree = databaseSchema.isIsTableModel() ? tableModelMTree : treeModelMTree;
mTree.setStorageGroup(partialPathName);

Expand Down Expand Up @@ -280,6 +286,9 @@ public TSStatus alterDatabase(final DatabaseSchemaPlan plan) {
.getAsMNode()
.setDatabaseSchema(currentSchema);

// Update TimePartitionUtils cache with new time partition settings
TimePartitionUtils.updateDatabaseTimePartitionConfig(currentSchema.getName(), currentSchema);

result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
Expand All @@ -305,6 +314,9 @@ public TSStatus deleteDatabase(final DeleteDatabasePlan plan) {
(isTableModel ? tableModelMTree : treeModelMTree)
.deleteDatabase(getQualifiedDatabasePartialPath(plan.getName()));

// Remove database-specific time partition configuration from cache
TimePartitionUtils.removeDatabaseTimePartitionConfig(plan.getName());

result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final MetadataException e) {
LOGGER.warn("Database not exist", e);
Expand Down Expand Up @@ -482,6 +494,32 @@ public TSStatus setTimePartitionInterval(final SetTimePartitionIntervalPlan plan
return result;
}

public TSStatus setTimePartitionOrigin(final SetTimePartitionOriginPlan plan) {
final TSStatus result = new TSStatus();
databaseReadWriteLock.writeLock().lock();
try {
final ConfigMTree mTree =
PathUtils.isTableModelDatabase(plan.getDatabase()) ? tableModelMTree : treeModelMTree;
final PartialPath path = getQualifiedDatabasePartialPath(plan.getDatabase());
if (mTree.isDatabaseAlreadySet(path)) {
mTree
.getDatabaseNodeByDatabasePath(path)
.getAsMNode()
.getDatabaseSchema()
.setTimePartitionOrigin(plan.getTimePartitionOrigin());
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
}
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()).setMessage(ERROR_NAME);
} finally {
databaseReadWriteLock.writeLock().unlock();
}
return result;
}

/**
* Adjust the maximum RegionGroup count of each Database.
*
Expand Down
Loading
Loading