diff --git a/docs/env.md b/docs/env.md
index c016d5a72..896352a30 100644
--- a/docs/env.md
+++ b/docs/env.md
@@ -259,7 +259,10 @@
| VM_FILTERPROMETHEUSCONVERTERANNOTATIONPREFIXES: `-` #
allows filtering for converted annotations, annotations with matched prefix will be ignored |
| VM_CLUSTERDOMAINNAME: `-` #
Defines domain name suffix for in-cluster addresses most known ClusterDomainName is .cluster.local |
| VM_APPREADYTIMEOUT: `80s` #
Defines deadline for deployment/statefulset to transit into ready state to wait for transition to ready state |
-| VM_PODWAITREADYTIMEOUT: `80s` #
Defines single pod deadline to wait for transition to ready state |
| VM_PODWAITREADYINTERVALCHECK: `5s` #
Defines poll interval for pods ready check at statefulset rollout update |
+| VM_PODWAITREADYTIMEOUT: `80s` #
Defines single pod deadline to wait for transition to ready state |
+| VM_PVC_WAIT_READY_INTERVAL: `5s` #
Defines poll interval for PVC ready check |
+| VM_PVC_WAIT_READY_TIMEOUT: `80s` #
Defines poll timeout for PVC ready check |
+| VM_WAIT_READY_INTERVAL: `5s` #
Defines poll interval for VM CRs |
| VM_FORCERESYNCINTERVAL: `60s` #
configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth. |
| VM_ENABLESTRICTSECURITY: `false` #
EnableStrictSecurity will add default `securityContext` to pods and containers created by operator Default PodSecurityContext include: 1. RunAsNonRoot: true 2. RunAsUser/RunAsGroup/FSGroup: 65534 '65534' refers to 'nobody' in all the used default images like alpine, busybox. If you're using customize image, please make sure '65534' is a valid uid in there or specify SecurityContext. 3. FSGroupChangePolicy: &onRootMismatch If KubeVersion>=1.20, use `FSGroupChangePolicy="onRootMismatch"` to skip the recursive permission change when the root of the volume already has the correct permissions 4. SeccompProfile: type: RuntimeDefault Use `RuntimeDefault` seccomp profile by default, which is defined by the container runtime, instead of using the Unconfined (seccomp disabled) mode. Default container SecurityContext include: 1. AllowPrivilegeEscalation: false 2. ReadOnlyRootFilesystem: true 3. Capabilities: drop: - all turn off `EnableStrictSecurity` by default, see https://github.com/VictoriaMetrics/operator/issues/749 for details |
diff --git a/internal/config/config.go b/internal/config/config.go
index 6254257a8..3113ccf94 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -578,13 +578,19 @@ type BaseOperatorConf struct {
// Defines deadline for deployment/statefulset
// to transit into ready state
// to wait for transition to ready state
- AppReadyTimeout time.Duration `default:"80s" env:"VM_APPREADYTIMEOUT"`
+ AppWaitReadyTimeout time.Duration `default:"80s" env:"VM_APPREADYTIMEOUT"`
+ // Defines poll interval for pods ready check
+ // at statefulset rollout update
+ PodWaitReadyInterval time.Duration `default:"5s" env:"VM_PODWAITREADYINTERVALCHECK"`
// Defines single pod deadline
// to wait for transition to ready state
PodWaitReadyTimeout time.Duration `default:"80s" env:"VM_PODWAITREADYTIMEOUT"`
- // Defines poll interval for pods ready check
- // at statefulset rollout update
- PodWaitReadyIntervalCheck time.Duration `default:"5s" env:"VM_PODWAITREADYINTERVALCHECK"`
+ // Defines poll interval for PVC ready check
+ PVCWaitReadyInterval time.Duration `default:"5s" env:"VM_PVC_WAIT_READY_INTERVAL"`
+ // Defines poll timeout for PVC ready check
+ PVCWaitReadyTimeout time.Duration `default:"80s" env:"VM_PVC_WAIT_READY_TIMEOUT"`
+ // Defines poll interval for VM CRs
+ VMWaitReadyInterval time.Duration `default:"5s" env:"VM_WAIT_READY_INTERVAL"`
// configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth.
ForceResyncInterval time.Duration `default:"60s" env:"VM_FORCERESYNCINTERVAL"`
// EnableStrictSecurity will add default `securityContext` to pods and containers created by operator
diff --git a/internal/controller/operator/factory/k8stools/interceptors.go b/internal/controller/operator/factory/k8stools/interceptors.go
index 479e6268a..f8d719825 100644
--- a/internal/controller/operator/factory/k8stools/interceptors.go
+++ b/internal/controller/operator/factory/k8stools/interceptors.go
@@ -4,6 +4,7 @@ import (
"context"
appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@@ -12,80 +13,55 @@ import (
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
)
+func updateStatus(ctx context.Context, cl client.WithWatch, obj client.Object) error {
+ switch v := obj.(type) {
+ case *appsv1.StatefulSet:
+ v.Status.ObservedGeneration = v.Generation
+ v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.UpdateRevision = "v1"
+ v.Status.CurrentRevision = "v1"
+ case *appsv1.Deployment:
+ v.Status.ObservedGeneration = v.Generation
+ v.Status.Conditions = append(v.Status.Conditions, appsv1.DeploymentCondition{
+ Type: appsv1.DeploymentProgressing,
+ Reason: "NewReplicaSetAvailable",
+ Status: "True",
+ })
+ v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
+ case *vmv1beta1.VMAgent:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *vmv1beta1.VMCluster:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *vmv1beta1.VMAuth:
+ v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
+ v.Status.ObservedGeneration = v.Generation
+ case *corev1.PersistentVolumeClaim:
+ v.Status.Capacity = v.Spec.Resources.Requests
+ default:
+ return nil
+ }
+ return cl.Status().Update(ctx, obj)
+}
+
// GetInterceptorsWithObjects returns interceptors for objects
func GetInterceptorsWithObjects() interceptor.Funcs {
return interceptor.Funcs{
Create: func(ctx context.Context, cl client.WithWatch, obj client.Object, opts ...client.CreateOption) error {
- switch v := obj.(type) {
- case *appsv1.StatefulSet:
- v.Status.ObservedGeneration = v.Generation + 1
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdateRevision = "v1"
- v.Status.CurrentRevision = "v1"
- case *appsv1.Deployment:
- v.Status.ObservedGeneration = v.Generation + 1
- v.Status.Conditions = append(v.Status.Conditions, appsv1.DeploymentCondition{
- Type: appsv1.DeploymentProgressing,
- Reason: "NewReplicaSetAvailable",
- Status: "True",
- })
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- }
if err := cl.Create(ctx, obj, opts...); err != nil {
return err
}
- switch v := obj.(type) {
- case *vmv1beta1.VMAgent:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMCluster:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMAuth:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- }
- return nil
+ return updateStatus(ctx, cl, obj)
},
Update: func(ctx context.Context, cl client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
- switch v := obj.(type) {
- case *appsv1.StatefulSet:
- v.Status.ObservedGeneration = v.Generation + 1
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.CurrentReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.UpdateRevision = "v1"
- v.Status.CurrentRevision = "v1"
- case *appsv1.Deployment:
- v.Status.ObservedGeneration = v.Generation + 1
- v.Status.UpdatedReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.ReadyReplicas = ptr.Deref(v.Spec.Replicas, 0)
- v.Status.Replicas = ptr.Deref(v.Spec.Replicas, 0)
- }
if err := cl.Update(ctx, obj, opts...); err != nil {
return err
}
- switch v := obj.(type) {
- case *vmv1beta1.VMAgent:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMCluster:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- case *vmv1beta1.VMAuth:
- v.Status.UpdateStatus = vmv1beta1.UpdateStatusOperational
- v.Status.ObservedGeneration = v.Generation
- return cl.Status().Update(ctx, v)
- }
- return nil
+ return updateStatus(ctx, cl, obj)
},
}
}
diff --git a/internal/controller/operator/factory/reconcile/daemonset.go b/internal/controller/operator/factory/reconcile/daemonset.go
index 9a26417d4..6bb87f550 100644
--- a/internal/controller/operator/factory/reconcile/daemonset.go
+++ b/internal/controller/operator/factory/reconcile/daemonset.go
@@ -67,7 +67,7 @@ func DaemonSet(ctx context.Context, rclient client.Client, newObj, prevObj *apps
if err != nil {
return err
}
- return waitDaemonSetReady(ctx, rclient, newObj, appWaitReadyDeadline)
+ return waitDaemonSetReady(ctx, rclient, newObj, appWaitReadyTimeout)
}
// waitDeploymentReady waits until deployment's replicaSet rollouts and all new pods is ready
diff --git a/internal/controller/operator/factory/reconcile/deploy.go b/internal/controller/operator/factory/reconcile/deploy.go
index 4a3140f57..5a2cd681a 100644
--- a/internal/controller/operator/factory/reconcile/deploy.go
+++ b/internal/controller/operator/factory/reconcile/deploy.go
@@ -69,7 +69,7 @@ func Deployment(ctx context.Context, rclient client.Client, newObj, prevObj *app
if err != nil {
return err
}
- return waitForDeploymentReady(ctx, rclient, newObj, appWaitReadyDeadline)
+ return waitForDeploymentReady(ctx, rclient, newObj, appWaitReadyTimeout)
}
// waitForDeploymentReady waits until deployment's replicaSet rollouts and all new pods is ready
diff --git a/internal/controller/operator/factory/reconcile/pvc.go b/internal/controller/operator/factory/reconcile/pvc.go
index 0d42b1a0d..ac88f37aa 100644
--- a/internal/controller/operator/factory/reconcile/pvc.go
+++ b/internal/controller/operator/factory/reconcile/pvc.go
@@ -6,8 +6,10 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
@@ -20,24 +22,62 @@ import (
// in case of deletion timestamp > 0 does nothing
// user must manually remove finalizer if needed
func PersistentVolumeClaim(ctx context.Context, rclient client.Client, newObj, prevObj *corev1.PersistentVolumeClaim, owner *metav1.OwnerReference) error {
- l := logger.WithContext(ctx)
- var existingObj corev1.PersistentVolumeClaim
nsn := types.NamespacedName{Namespace: newObj.Namespace, Name: newObj.Name}
- if err := rclient.Get(ctx, nsn, &existingObj); err != nil {
- if k8serrors.IsNotFound(err) {
- l.Info(fmt.Sprintf("creating new PVC=%s", nsn.String()))
- if err := rclient.Create(ctx, newObj); err != nil {
- return fmt.Errorf("cannot create new PVC=%s: %w", nsn.String(), err)
+ var existingObj corev1.PersistentVolumeClaim
+ err := retryOnConflict(func() error {
+ if err := rclient.Get(ctx, nsn, &existingObj); err != nil {
+ if k8serrors.IsNotFound(err) {
+ logger.WithContext(ctx).Info(fmt.Sprintf("creating new PVC=%s", nsn.String()))
+ return rclient.Create(ctx, newObj)
}
+ return fmt.Errorf("cannot get existing PVC=%s: %w", nsn.String(), err)
+ }
+ if !existingObj.DeletionTimestamp.IsZero() {
return nil
}
- return fmt.Errorf("cannot get existing PVC=%s: %w", nsn.String(), err)
+ return updatePVC(ctx, rclient, &existingObj, newObj, prevObj, owner)
+ })
+ if err != nil {
+ return err
+ }
+ size := newObj.Spec.Resources.Requests[corev1.ResourceStorage]
+ if !existingObj.CreationTimestamp.IsZero() {
+ size = existingObj.Spec.Resources.Requests[corev1.ResourceStorage]
+ }
+ if err = waitForPVCReady(ctx, rclient, nsn, size); err != nil {
+ return err
}
if !existingObj.DeletionTimestamp.IsZero() {
- l.Info(fmt.Sprintf("PVC=%s has non zero DeletionTimestamp, skip update."+
+ logger.WithContext(ctx).Info(fmt.Sprintf("PVC=%s has non zero DeletionTimestamp, skip update."+
" To fix this, make backup for this pvc, delete pvc finalizers and restore from backup.", nsn.String()))
- return nil
}
+ return nil
+}
- return updatePVC(ctx, rclient, &existingObj, newObj, prevObj, owner)
+func waitForPVCReady(ctx context.Context, rclient client.Client, nsn types.NamespacedName, size resource.Quantity) error {
+ var pvc corev1.PersistentVolumeClaim
+ return wait.PollUntilContextTimeout(ctx, pvcWaitReadyInterval, pvcWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ if err := rclient.Get(ctx, nsn, &pvc); err != nil {
+ if k8serrors.IsNotFound(err) {
+ return false, nil
+ }
+ return false, fmt.Errorf("cannot get PVC=%s: %w", nsn.String(), err)
+ }
+ if !pvc.DeletionTimestamp.IsZero() {
+ return true, nil
+ }
+ if len(pvc.Status.Capacity) == 0 {
+ return true, nil
+ }
+ actualSize := pvc.Status.Capacity[corev1.ResourceStorage]
+ if actualSize.Cmp(size) < 0 {
+ return false, nil
+ }
+ for _, condition := range pvc.Status.Conditions {
+ if condition.Type == corev1.PersistentVolumeClaimResizing && condition.Status == corev1.ConditionTrue {
+ return false, nil
+ }
+ }
+ return true, nil
+ })
}
diff --git a/internal/controller/operator/factory/reconcile/pvc_test.go b/internal/controller/operator/factory/reconcile/pvc_test.go
index 4f20a5279..c44a0e8c5 100644
--- a/internal/controller/operator/factory/reconcile/pvc_test.go
+++ b/internal/controller/operator/factory/reconcile/pvc_test.go
@@ -38,6 +38,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
},
},
}
+ pvc.Status.Capacity = pvc.Spec.Resources.Requests
for _, fn := range fns {
fn(pvc)
}
@@ -47,7 +48,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
f := func(o opts) {
t.Helper()
ctx := context.Background()
- cl := k8stools.GetTestClientWithActions(o.predefinedObjects)
+ cl := k8stools.GetTestClientWithActionsAndObjects(o.predefinedObjects)
synctest.Test(t, func(t *testing.T) {
assert.NoError(t, PersistentVolumeClaim(ctx, cl, o.new, o.prev, nil))
assert.Equal(t, o.actions, cl.Actions)
@@ -62,6 +63,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Create", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -74,6 +76,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -89,6 +92,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
@@ -106,6 +110,7 @@ func TestPersistentVolumeClaimReconcile(t *testing.T) {
actions: []k8stools.ClientAction{
{Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: nn},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: nn},
},
})
}
diff --git a/internal/controller/operator/factory/reconcile/reconcile.go b/internal/controller/operator/factory/reconcile/reconcile.go
index bd39a4b67..1f0dccfb4 100644
--- a/internal/controller/operator/factory/reconcile/reconcile.go
+++ b/internal/controller/operator/factory/reconcile/reconcile.go
@@ -18,23 +18,32 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
+ "github.com/VictoriaMetrics/operator/internal/config"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/finalize"
"github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger"
)
var (
- podWaitReadyIntervalCheck = 50 * time.Millisecond
- appWaitReadyDeadline = 5 * time.Second
- podWaitReadyTimeout = 5 * time.Second
- vmStatusInterval = 5 * time.Second
+ pvcWaitReadyInterval = 1 * time.Second
+ pvcWaitReadyTimeout = 5 * time.Second
+
+ podWaitReadyInterval = 1 * time.Second
+ podWaitReadyTimeout = 5 * time.Second
+
+ appWaitReadyTimeout = 5 * time.Second
+ vmWaitReadyInterval = 5 * time.Second
)
// Init sets package defaults
-func Init(intervalCheck, appWaitDeadline, podReadyDeadline, statusInterval, statusUpdate time.Duration) {
- podWaitReadyIntervalCheck = intervalCheck
- appWaitReadyDeadline = appWaitDeadline
- podWaitReadyTimeout = podReadyDeadline
- vmStatusInterval = statusInterval
+func Init(cfg *config.BaseOperatorConf, statusUpdate time.Duration) {
+ podWaitReadyInterval = cfg.PodWaitReadyInterval
+ podWaitReadyTimeout = cfg.PodWaitReadyTimeout
+
+ pvcWaitReadyInterval = cfg.PVCWaitReadyInterval
+ pvcWaitReadyTimeout = cfg.PVCWaitReadyTimeout
+
+ appWaitReadyTimeout = cfg.AppWaitReadyTimeout
+ vmWaitReadyInterval = cfg.VMWaitReadyInterval
statusUpdateTTL = statusUpdate
}
diff --git a/internal/controller/operator/factory/reconcile/statefulset.go b/internal/controller/operator/factory/reconcile/statefulset.go
index 53955c8c6..24dcfad1b 100644
--- a/internal/controller/operator/factory/reconcile/statefulset.go
+++ b/internal/controller/operator/factory/reconcile/statefulset.go
@@ -43,7 +43,7 @@ func waitForStatefulSetReady(ctx context.Context, rclient client.Client, newObj
if newObj.Spec.Replicas == nil {
return nil
}
- err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, appWaitReadyDeadline, true, func(ctx context.Context) (done bool, err error) {
+ err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, appWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
var existingObj appsv1.StatefulSet
if err := rclient.Get(ctx, types.NamespacedName{Namespace: newObj.Namespace, Name: newObj.Name}, &existingObj); err != nil {
if k8serrors.IsNotFound(err) {
@@ -195,8 +195,8 @@ func StatefulSet(ctx context.Context, rclient client.Client, cr STSOptions, newO
// if ObservedGeneration matches current generation
func getLatestStsState(ctx context.Context, rclient client.Client, targetSTS types.NamespacedName) (*appsv1.StatefulSet, error) {
var sts appsv1.StatefulSet
- err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck,
- appWaitReadyDeadline, true, func(ctx context.Context) (done bool, err error) {
+ err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval,
+ appWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if err := rclient.Get(ctx, targetSTS, &sts); err != nil {
return true, err
}
@@ -227,7 +227,7 @@ type rollingUpdateOpts struct {
// we always check if sts.Status.CurrentRevision needs update, to keep it equal to UpdateRevision
// see https://github.com/kubernetes/kube-state-metrics/issues/1324#issuecomment-1779751992
func performRollingUpdateOnSts(ctx context.Context, rclient client.Client, obj *appsv1.StatefulSet, o rollingUpdateOpts) error {
- time.Sleep(podWaitReadyIntervalCheck)
+ time.Sleep(podWaitReadyInterval)
nsn := types.NamespacedName{
Name: obj.Name,
Namespace: obj.Namespace,
@@ -320,7 +320,7 @@ func performRollingUpdateOnSts(ctx context.Context, rclient client.Client, obj *
l.Info(fmt.Sprintf("updating pod=%s revision label=%q", pod.Name, pod.Labels[podRevisionLabel]))
// eviction may fail due to podDisruption budget and it's unexpected
// so retry pod eviction
- evictErr := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ evictErr := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if o.delete {
if err := rclient.Delete(ctx, &pod); err != nil {
if k8serrors.IsNotFound(err) {
@@ -383,7 +383,7 @@ func PodIsReady(pod *corev1.Pod, minReadySeconds int32) bool {
func waitForPodReady(ctx context.Context, rclient client.Client, nsn types.NamespacedName, desiredRevision string, minReadySeconds int32) error {
var pod corev1.Pod
- if err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
+ if err := wait.PollUntilContextTimeout(ctx, podWaitReadyInterval, podWaitReadyTimeout, true, func(ctx context.Context) (done bool, err error) {
if err := rclient.Get(ctx, nsn, &pod); err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
diff --git a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
index a438b12cb..d32781a00 100644
--- a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
+++ b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand.go
@@ -121,6 +121,11 @@ func updateSTSPVC(ctx context.Context, rclient client.Client, sts *appsv1.Statef
if err := updatePVC(ctx, rclient, &pvc, &stsClaim, prevVCT, nil); err != nil {
return err
}
+ nsnPvc := types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}
+ size := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
+ if err := waitForPVCReady(ctx, rclient, nsnPvc, size); err != nil {
+ return err
+ }
}
return nil
}
diff --git a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
index 063ebcbd8..47fd66f5d 100644
--- a/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
+++ b/internal/controller/operator/factory/reconcile/statefulset_pvc_expand_test.go
@@ -160,10 +160,10 @@ func Test_updateSTSPVC(t *testing.T) {
}
f := func(o opts) {
t.Helper()
- cl := k8stools.GetTestClientWithActions(nil)
+ cl := k8stools.GetTestClientWithActionsAndObjects(nil)
ctx := context.TODO()
if o.preRun != nil {
- o.preRun(cl.Client)
+ o.preRun(cl)
cl.Actions = nil
}
err := updateSTSPVC(ctx, cl, o.sts, o.prevVCTs)
@@ -226,6 +226,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
}
}),
@@ -294,7 +299,7 @@ func Test_updateSTSPVC(t *testing.T) {
"test": "test",
"3rd-party": "value",
},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -303,6 +308,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -383,6 +393,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -398,7 +409,7 @@ func Test_updateSTSPVC(t *testing.T) {
"test": "after",
"3rd-party": "value",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -407,6 +418,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -490,7 +506,9 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc2NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc2NSN},
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -501,7 +519,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -510,6 +528,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("5Gi"),
+ },
+ },
},
{
ObjectMeta: metav1.ObjectMeta{
@@ -519,7 +542,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -528,6 +551,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -581,7 +609,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: pvc1NSN.Name,
Namespace: pvc1NSN.Namespace,
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -590,6 +618,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("10Gi"),
+ },
+ },
},
},
})
@@ -643,6 +676,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -653,7 +687,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -662,6 +696,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -725,6 +764,7 @@ func Test_updateSTSPVC(t *testing.T) {
},
actions: []k8stools.ClientAction{
{Verb: "Update", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
+ {Verb: "Get", Kind: "PersistentVolumeClaim", Resource: pvc1NSN},
},
expected: []corev1.PersistentVolumeClaim{
{
@@ -735,7 +775,7 @@ func Test_updateSTSPVC(t *testing.T) {
Annotations: map[string]string{
"operator.victoriametrics.com/pvc-allow-volume-expansion": "true",
},
- ResourceVersion: "2",
+ ResourceVersion: "4",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -744,6 +784,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("15Gi"),
+ },
+ },
},
},
})
@@ -772,7 +817,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: "orphan-vmselect-0",
Namespace: "default",
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
},
},
@@ -825,7 +870,7 @@ func Test_updateSTSPVC(t *testing.T) {
Name: "data-vmselect-0",
Namespace: "default",
Labels: map[string]string{"app": "vmselect"},
- ResourceVersion: "1",
+ ResourceVersion: "2",
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
@@ -834,6 +879,11 @@ func Test_updateSTSPVC(t *testing.T) {
},
},
},
+ Status: corev1.PersistentVolumeClaimStatus{
+ Capacity: map[corev1.ResourceName]resource.Quantity{
+ corev1.ResourceStorage: resource.MustParse("20Gi"),
+ },
+ },
},
},
})
diff --git a/internal/controller/operator/factory/reconcile/vmagent.go b/internal/controller/operator/factory/reconcile/vmagent.go
index 583279ab5..a85734128 100644
--- a/internal/controller/operator/factory/reconcile/vmagent.go
+++ b/internal/controller/operator/factory/reconcile/vmagent.go
@@ -59,7 +59,7 @@ func VMAgent(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1be
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMAgent=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/controller/operator/factory/reconcile/vmauth.go b/internal/controller/operator/factory/reconcile/vmauth.go
index b7a1f90b8..97c09caea 100644
--- a/internal/controller/operator/factory/reconcile/vmauth.go
+++ b/internal/controller/operator/factory/reconcile/vmauth.go
@@ -58,7 +58,7 @@ func VMAuth(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1bet
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMAuth=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/controller/operator/factory/reconcile/vmcluster.go b/internal/controller/operator/factory/reconcile/vmcluster.go
index bb114929f..0c2336779 100644
--- a/internal/controller/operator/factory/reconcile/vmcluster.go
+++ b/internal/controller/operator/factory/reconcile/vmcluster.go
@@ -58,7 +58,7 @@ func VMCluster(ctx context.Context, rclient client.Client, newObj, prevObj *vmv1
if err != nil {
return err
}
- if err := waitForStatus(ctx, rclient, newObj, vmStatusInterval, vmv1beta1.UpdateStatusOperational); err != nil {
+ if err := waitForStatus(ctx, rclient, newObj, vmWaitReadyInterval, vmv1beta1.UpdateStatusOperational); err != nil {
return fmt.Errorf("failed to wait for VMCluster=%s to be ready: %w", nsn.String(), err)
}
return nil
diff --git a/internal/manager/manager.go b/internal/manager/manager.go
index c1df4d98d..d01a1d21a 100644
--- a/internal/manager/manager.go
+++ b/internal/manager/manager.go
@@ -232,7 +232,7 @@ func RunManager(ctx context.Context) error {
}
}
- reconcile.Init(baseConfig.PodWaitReadyIntervalCheck, baseConfig.AppReadyTimeout, baseConfig.PodWaitReadyTimeout, 5*time.Second, *statusUpdateTTL)
+ reconcile.Init(baseConfig, *statusUpdateTTL)
config := ctrl.GetConfigOrDie()
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(*clientQPS), *clientBurst)