diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4f1d955bfc..0dfd81ec7a 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -283,7 +283,9 @@ func (g *PartitionCompactionGrouper) groupBlocksByRange(blocks []*metadata.Meta, } if len(group.blocks) > 1 { - ret = append(ret, group) + if time.UnixMilli(group.rangeEnd).Before(time.Now().Add(-2 * g.compactorCfg.CleanupInterval)) { + ret = append(ret, group) + } } } diff --git a/pkg/compactor/partition_compaction_grouper_test.go b/pkg/compactor/partition_compaction_grouper_test.go index 6ca1ee8877..0f64cb8092 100644 --- a/pkg/compactor/partition_compaction_grouper_test.go +++ b/pkg/compactor/partition_compaction_grouper_test.go @@ -2036,12 +2036,48 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { {blocks: []ulid.ULID{block1, block2}, partitionCount: 1, partitionID: 0, rangeStart: 0 * H, rangeEnd: 12 * H}, }, }, + "group end time is too close to now time": { + ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, + blocks: map[ulid.ULID]mockBlock{ + block1: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: getRangeStartBasedOnNow(2*time.Hour) - 2*time.Hour.Milliseconds(), MaxTime: getRangeStartBasedOnNow(2 * time.Hour), Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block2: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: getRangeStartBasedOnNow(2*time.Hour) - 2*time.Hour.Milliseconds(), MaxTime: getRangeStartBasedOnNow(2 * time.Hour), Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block3: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: getRangeStartBasedOnNow(2*time.Hour) - 2*time.Hour.Milliseconds(), MaxTime: getRangeStartBasedOnNow(2 * time.Hour), Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block4: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: getRangeStartBasedOnNow(2*time.Hour) - 2*time.Hour.Milliseconds(), MaxTime: getRangeStartBasedOnNow(2 * time.Hour), Compaction: tsdb.BlockMetaCompaction{Level: 1}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + }, + existingPartitionedGroups: []mockExistingPartitionedGroup{}, + expected: []expectedCompactionJob{}, + }, } for testName, testCase := range tests { t.Run(testName, func(t *testing.T) { compactorCfg := &Config{ - BlockRanges: testCase.ranges, + BlockRanges: testCase.ranges, + CleanupInterval: time.Hour, } limits := &validation.Limits{ @@ -2125,6 +2161,11 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { } } +func getRangeStartBasedOnNow(r time.Duration) int64 { + tr := r.Milliseconds() + return tr * (time.Now().UnixMilli() / tr) +} + type generateCompactionJobsTestCase struct { ranges []time.Duration blocks map[ulid.ULID]mockBlock