diff --git a/CHANGELOG.md b/CHANGELOG.md index 9228780b9dd..943b6ee0299 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ * [ENHANCEMENT] Alertmanager/Ruler: Introduce a user scanner to reduce the number of list calls to object storage. #6999 * [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118 * [ENHANCEMENT] Query Frontend: Add query priority based on operation. #7128 -* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7129 +* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7130 #7209 * [ENHANCEMENT] Distributor: Optimize memory usage by recycling v2 requests. #7131 * [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156 * [ENHANCEMENT] Upgrade to go 1.25. #7164 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 22fe7abf915..563febc11b1 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -786,7 +786,6 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo { - isPartitionGroupInfoDeleted := false partitionedGroupInfoFile := extraInfo.path deletedBlocksCount := 0 partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) @@ -812,18 +811,26 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_file", partitionedGroupInfoFile, "err", err) } else { level.Info(partitionedGroupLogger).Log("msg", "deleted partitioned group info", "partitioned_group_file", partitionedGroupInfoFile) - isPartitionGroupInfoDeleted = true } } } - if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) { - // Remove partition visit markers + if extraInfo.status.CanDelete { + // Remove all partition visit markers for completed partitions if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil { - level.Warn(partitionedGroupLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "err", err) + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete all partition visit markers for partitioned group", "err", err) } else { level.Info(partitionedGroupLogger).Log("msg", "deleted partition visit markers for partitioned group") } + } else { + // Remove all invalid visit markers + for _, v := range extraInfo.status.VisitMarkersToDelete { + if err := userBucket.Delete(ctx, v.GetVisitMarkerFilePath()); err != nil { + level.Warn(partitionedGroupLogger).Log("msg", "failed to delete invalid visit marker", "partition_visit_marker_file", v.String(), "err", err) + } else { + level.Info(partitionedGroupLogger).Log("msg", "deleted invalid visit marker", "partition_visit_marker_file", v.String()) + } + } } } } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index e5e5037db80..9396a1bd9a4 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -999,6 +999,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { partitionedGroupFileExists, err = userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID)) require.NoError(t, err) require.False(t, partitionedGroupFileExists) + + partitionedGroupFileExists, err = userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath()) + require.NoError(t, err) + require.False(t, partitionedGroupFileExists) } func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index a43bb262301..ffb6f822404 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -41,10 +41,10 @@ type PartitionedGroupStatus struct { PartitionedGroupID uint32 CanDelete bool IsCompleted bool - DeleteVisitMarker bool PendingPartitions int InProgressPartitions int PendingOrFailedPartitions []Partition + VisitMarkersToDelete []VisitMarker } func (s PartitionedGroupStatus) String() string { @@ -52,8 +52,12 @@ func (s PartitionedGroupStatus) String() string { for _, p := range s.PendingOrFailedPartitions { partitions = append(partitions, fmt.Sprintf("%d", p.PartitionID)) } - return fmt.Sprintf(`{"partitioned_group_id": %d, "can_delete": %t, "is_complete": %t, "delete_visit_marker": %t, "pending_partitions": %d, "in_progress_partitions": %d, "pending_or_failed_partitions": [%s]}`, - s.PartitionedGroupID, s.CanDelete, s.IsCompleted, s.DeleteVisitMarker, s.PendingPartitions, s.InProgressPartitions, strings.Join(partitions, ",")) + var visitMarkers []string + for _, v := range s.VisitMarkersToDelete { + visitMarkers = append(visitMarkers, v.GetVisitMarkerFilePath()) + } + return fmt.Sprintf(`{"partitioned_group_id": %d, "can_delete": %t, "is_complete": %t, "pending_partitions": %d, "in_progress_partitions": %d, "pending_or_failed_partitions": [%s], "visit_markers_to_delete": [%s]}`, + s.PartitionedGroupID, s.CanDelete, s.IsCompleted, s.PendingPartitions, s.InProgressPartitions, strings.Join(partitions, ","), strings.Join(visitMarkers, ",")) } type PartitionedGroupInfo struct { @@ -125,10 +129,10 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( PartitionedGroupID: p.PartitionedGroupID, CanDelete: false, IsCompleted: false, - DeleteVisitMarker: false, PendingPartitions: 0, InProgressPartitions: 0, PendingOrFailedPartitions: []Partition{}, + VisitMarkersToDelete: []VisitMarker{}, } allPartitionCompleted := true hasInProgressPartitions := false @@ -153,7 +157,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( allPartitionCompleted = false status.PendingOrFailedPartitions = append(status.PendingOrFailedPartitions, partition) } else if visitMarker.VisitTime < p.CreationTime { - status.DeleteVisitMarker = true + status.VisitMarkersToDelete = append(status.VisitMarkersToDelete, visitMarker) allPartitionCompleted = false } else if (visitMarker.GetStatus() == Pending || visitMarker.GetStatus() == InProgress) && !visitMarker.IsExpired(partitionVisitMarkerTimeout) { status.InProgressPartitions++ @@ -174,7 +178,6 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( if allPartitionCompleted { status.CanDelete = true - status.DeleteVisitMarker = true return status } @@ -187,19 +190,16 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( if !p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) { level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is physically deleted", "block", blockID) status.CanDelete = true - status.DeleteVisitMarker = true return status } if p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) { level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for deletion", "block", blockID) status.CanDelete = true - status.DeleteVisitMarker = true return status } if p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { level.Info(partitionedGroupLogger).Log("msg", "delete partitioned group", "reason", "block is marked for no compact", "block", blockID) status.CanDelete = true - status.DeleteVisitMarker = true return status } checkedBlocks[blockID] = struct{}{} diff --git a/pkg/compactor/partitioned_group_info_test.go b/pkg/compactor/partitioned_group_info_test.go index 6e7dcc44464..6db8a4e0178 100644 --- a/pkg/compactor/partitioned_group_info_test.go +++ b/pkg/compactor/partitioned_group_info_test.go @@ -177,9 +177,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is not visited and contains block marked for deletion", expectedResult: PartitionedGroupStatus{ - CanDelete: true, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: true, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -230,9 +230,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is pending and contains block marked for deletion", expectedResult: PartitionedGroupStatus{ - CanDelete: true, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: true, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -292,7 +292,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) { expectedResult: PartitionedGroupStatus{ CanDelete: false, IsCompleted: false, - DeleteVisitMarker: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{}, }, partitionedGroupInfo: PartitionedGroupInfo{ @@ -342,9 +342,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is pending expired", expectedResult: PartitionedGroupStatus{ - CanDelete: false, - IsCompleted: false, - DeleteVisitMarker: false, + CanDelete: false, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 0, @@ -400,9 +400,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is complete with one block deleted and one partition is not visited with no blocks deleted", expectedResult: PartitionedGroupStatus{ - CanDelete: false, - IsCompleted: false, - DeleteVisitMarker: false, + CanDelete: false, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -453,9 +453,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is complete and one partition is failed with no blocks deleted", expectedResult: PartitionedGroupStatus{ - CanDelete: false, - IsCompleted: false, - DeleteVisitMarker: false, + CanDelete: false, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -511,9 +511,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is complete and one partition is failed one block deleted", expectedResult: PartitionedGroupStatus{ - CanDelete: true, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: true, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -573,7 +573,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) { expectedResult: PartitionedGroupStatus{ CanDelete: true, IsCompleted: true, - DeleteVisitMarker: true, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{}, }, partitionedGroupInfo: PartitionedGroupInfo{ @@ -623,9 +623,24 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test partitioned group created after visit marker", expectedResult: PartitionedGroupStatus{ - CanDelete: false, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: false, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{ + &partitionVisitMarker{ + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + Status: Completed, + VisitTime: time.Now().Add(-2 * time.Minute).Unix(), + Version: PartitionVisitMarkerVersion1, + }, + &partitionVisitMarker{ + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + Status: Completed, + VisitTime: time.Now().Add(-2 * time.Minute).Unix(), + Version: PartitionVisitMarkerVersion1, + }, + }, PendingOrFailedPartitions: []Partition{}, }, partitionedGroupInfo: PartitionedGroupInfo{ @@ -675,7 +690,7 @@ func TestGetPartitionedGroupStatus(t *testing.T) { expectedResult: PartitionedGroupStatus{ CanDelete: false, IsCompleted: false, - DeleteVisitMarker: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{}, }, partitionedGroupInfo: PartitionedGroupInfo{ @@ -725,9 +740,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is not visited and contains block with no compact mark", expectedResult: PartitionedGroupStatus{ - CanDelete: true, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: true, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -778,9 +793,9 @@ func TestGetPartitionedGroupStatus(t *testing.T) { { name: "test one partition is expired and contains block with no compact mark", expectedResult: PartitionedGroupStatus{ - CanDelete: true, - IsCompleted: false, - DeleteVisitMarker: true, + CanDelete: true, + IsCompleted: false, + VisitMarkersToDelete: []VisitMarker{}, PendingOrFailedPartitions: []Partition{ { PartitionID: 1, @@ -877,6 +892,10 @@ func TestGetPartitionedGroupStatus(t *testing.T) { for _, partition := range result.PendingOrFailedPartitions { require.Contains(t, tcase.expectedResult.PendingOrFailedPartitions, partition) } + require.Equal(t, len(tcase.expectedResult.VisitMarkersToDelete), len(result.VisitMarkersToDelete)) + for _, visitMarker := range result.VisitMarkersToDelete { + require.Contains(t, tcase.expectedResult.VisitMarkersToDelete, visitMarker) + } }) } }