diff --git a/pkg/microservice/aslan/core/common/repository/models/product.go b/pkg/microservice/aslan/core/common/repository/models/product.go index 8b57699c80..001e29ce9b 100644 --- a/pkg/microservice/aslan/core/common/repository/models/product.go +++ b/pkg/microservice/aslan/core/common/repository/models/product.go @@ -170,6 +170,7 @@ type ProductService struct { Type string `bson:"type" json:"type"` Revision int64 `bson:"revision" json:"revision"` Containers []*Container `bson:"containers" json:"containers,omitempty"` + WorkLoads []*WorkLoad `bson:"workloads" json:"workloads"` Error string `bson:"error,omitempty" json:"error,omitempty"` Resources []*ServiceResource `bson:"resources,omitempty" json:"resources,omitempty"` UpdateTime int64 `bson:"update_time" json:"update_time"` @@ -228,6 +229,12 @@ func (svc *ProductService) GetContainerImageMap() map[string]string { return resp } +type WorkLoad struct { + WorkloadType string `bson:"workload_type" json:"workload_type"` + WorkloadName string `bson:"workload_name" json:"workload_name"` + Replicas int32 `bson:"replicas" json:"replicas"` +} + type ServiceConfig struct { ConfigName string `bson:"config_name" json:"config_name"` Revision int64 `bson:"revision" json:"revision"` diff --git a/pkg/microservice/aslan/core/common/service/kube/render.go b/pkg/microservice/aslan/core/common/service/kube/render.go index bd68380494..86cdc9becd 100644 --- a/pkg/microservice/aslan/core/common/service/kube/render.go +++ b/pkg/microservice/aslan/core/common/service/kube/render.go @@ -53,14 +53,16 @@ import ( ) type GeneSvcYamlOption struct { - ProductName string - EnvName string - ServiceName string - UpdateServiceRevision bool - VariableYaml string - VariableKVs []*commontypes.RenderVariableKV - UnInstall bool - Containers []*models.Container + ProductName string + EnvName string + ServiceName string + UpdateServiceRevision bool + VariableYaml string + VariableKVs []*commontypes.RenderVariableKV + ReplicaOverrides []*commonmodels.WorkLoad + IgnoreCurrentReplicaOverrides bool + UnInstall bool + Containers []*models.Container } type WorkloadResource struct { @@ -404,7 +406,13 @@ func FetchCurrentAppliedYaml(option *GeneSvcYamlOption) (string, int, error) { fullRenderedYaml = ParseSysKeys(productInfo.Namespace, productInfo.EnvName, option.ProductName, option.ServiceName, fullRenderedYaml) mergedContainers := mergeContainers(prodSvcTemplate.Containers, curProductSvc.Containers) fullRenderedYaml, _, err = ReplaceWorkloadImages(fullRenderedYaml, mergedContainers) - return fullRenderedYaml, 0, nil + if err != nil { + return "", 0, err + } + + replicaOverrides := resolveReplicaOverrides(curProductSvc.WorkLoads, option.ReplicaOverrides, option.IgnoreCurrentReplicaOverrides) + fullRenderedYaml, err = ApplyReplicaOverrides(fullRenderedYaml, replicaOverrides) + return fullRenderedYaml, 0, err } func FetchImportedManifests(option *GeneSvcYamlOption, productInfo *models.Product, serviceTmp *models.Service, svcRender *template.ServiceRender) (string, []*WorkloadResource, error) { @@ -580,9 +588,29 @@ func GenerateRenderedYaml(option *GeneSvcYamlOption) (string, int, []*WorkloadRe mergedContainers := mergeContainers(curContainers, latestSvcTemplate.Containers, svcContainersInProduct, option.Containers) fullRenderedYaml, workloadResource, err := ReplaceWorkloadImages(fullRenderedYaml, mergedContainers) + if err != nil { + return "", 0, nil, err + } + + var currentReplicaOverrides []*commonmodels.WorkLoad + if curProductSvc != nil { + currentReplicaOverrides = curProductSvc.WorkLoads + } + replicaOverrides := resolveReplicaOverrides(currentReplicaOverrides, option.ReplicaOverrides, option.IgnoreCurrentReplicaOverrides) + fullRenderedYaml, err = ApplyReplicaOverrides(fullRenderedYaml, replicaOverrides) return fullRenderedYaml, int(latestSvcTemplate.Revision), workloadResource, err } +func resolveReplicaOverrides(currentOverrides, optionOverrides []*commonmodels.WorkLoad, ignoreCurrent bool) []*commonmodels.WorkLoad { + if optionOverrides != nil { + return optionOverrides + } + if ignoreCurrent { + return nil + } + return currentOverrides +} + func RenderServiceYaml(originYaml, productName, serviceName string, svcRender *template.ServiceRender) (string, error) { if svcRender == nil { originYaml = strings.ReplaceAll(originYaml, setting.TemplateVariableProduct, productName) @@ -618,5 +646,8 @@ func RenderEnvServiceWithTempl(prod *commonmodels.Product, serviceRender *templa } parsedYaml = ParseSysKeys(prod.Namespace, prod.EnvName, prod.ProductName, service.ServiceName, parsedYaml) parsedYaml, _, err = ReplaceWorkloadImages(parsedYaml, service.Containers) - return parsedYaml, err + if err != nil { + return "", err + } + return ApplyReplicaOverrides(parsedYaml, service.WorkLoads) } diff --git a/pkg/microservice/aslan/core/common/service/kube/replica_override.go b/pkg/microservice/aslan/core/common/service/kube/replica_override.go new file mode 100644 index 0000000000..bc7c91faef --- /dev/null +++ b/pkg/microservice/aslan/core/common/service/kube/replica_override.go @@ -0,0 +1,228 @@ +package kube + +import ( + "bytes" + "fmt" + "strconv" + "strings" + + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + "github.com/koderover/zadig/v2/pkg/setting" + "github.com/koderover/zadig/v2/pkg/tool/kube/serializer" + "github.com/koderover/zadig/v2/pkg/util" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + yaml "sigs.k8s.io/yaml/goyaml.v3" +) + +var supportedReplicaWorkloadTypes = map[string]string{ + strings.ToLower(setting.Deployment): setting.Deployment, + strings.ToLower(setting.StatefulSet): setting.StatefulSet, + strings.ToLower(setting.CloneSet): setting.CloneSet, +} + +func ReplicaOverrideKey(workloadType, workloadName string) string { + return strings.ToLower(workloadType) + "/" + workloadName +} + +func NormalizeReplicaWorkloadType(workloadType string) string { + if normalizedType, ok := supportedReplicaWorkloadTypes[strings.ToLower(workloadType)]; ok { + return normalizedType + } + return workloadType +} + +func ValidateReplicaWorkloadType(workloadType string) error { + if _, ok := supportedReplicaWorkloadTypes[strings.ToLower(workloadType)]; ok { + return nil + } + return fmt.Errorf("unsupported replica workload type %q", workloadType) +} + +func normalizeReplicaOverrideTarget(workloadType, workloadName string) (string, string, error) { + if err := ValidateReplicaWorkloadType(workloadType); err != nil { + return "", "", err + } + workloadName = strings.TrimSpace(workloadName) + if workloadName == "" { + return "", "", fmt.Errorf("workload name is empty") + } + return NormalizeReplicaWorkloadType(workloadType), workloadName, nil +} + +func UpsertWorkLoadsReplicas(origins []*commonmodels.WorkLoad, workloadType, workloadName string, replicas int32) ([]*commonmodels.WorkLoad, error) { + normalizedType, workloadName, err := normalizeReplicaOverrideTarget(workloadType, workloadName) + if err != nil { + return nil, err + } + for _, item := range origins { + if item == nil { + continue + } + if NormalizeReplicaWorkloadType(item.WorkloadType) == normalizedType && item.WorkloadName == workloadName { + item.WorkloadType = normalizedType + item.WorkloadName = workloadName + item.Replicas = replicas + return origins, nil + } + } + + return append(origins, &commonmodels.WorkLoad{ + WorkloadType: normalizedType, + WorkloadName: workloadName, + Replicas: replicas, + }), nil +} + +// ApplyReplicaOverrides 按归一化后的 workload 目标,把 override 写入渲染结果中的 spec.replicas。 +func ApplyReplicaOverrides(renderedYaml string, overrides []*commonmodels.WorkLoad) (string, error) { + if len(overrides) == 0 || len(strings.TrimSpace(renderedYaml)) == 0 { + return renderedYaml, nil + } + + overrideMap := make(map[string]int32, len(overrides)) + for _, item := range overrides { + if item == nil { + continue + } + normalizedType, workloadName, err := normalizeReplicaOverrideTarget(item.WorkloadType, item.WorkloadName) + if err != nil { + return "", err + } + overrideMap[ReplicaOverrideKey(normalizedType, workloadName)] = item.Replicas + } + + manifests := util.SplitManifests(renderedYaml) + updated := make([]string, 0, len(manifests)) + for _, manifest := range manifests { + if len(strings.TrimSpace(manifest)) == 0 { + continue + } + + u, err := serializer.NewDecoder().YamlToUnstructured([]byte(manifest)) + if err != nil { + return "", err + } + + kind := NormalizeReplicaWorkloadType(u.GetKind()) + switch kind { + case setting.Deployment, setting.StatefulSet, setting.CloneSet: + default: + updated = append(updated, manifest) + continue + } + + replicas, ok := overrideMap[ReplicaOverrideKey(kind, u.GetName())] + if !ok { + updated = append(updated, manifest) + continue + } + + updatedManifest, err := patchManifestReplicas(manifest, replicas) + if err != nil { + return "", fmt.Errorf("failed to set replicas for %s/%s: %w", kind, u.GetName(), err) + } + updated = append(updated, updatedManifest) + } + + return util.JoinYamls(updated), nil +} + +// patchManifestReplicas 在保持 Mapping 键顺序的前提下,更新 spec.replicas。 +func patchManifestReplicas(manifest string, replicas int32) (string, error) { + var doc yaml.Node + if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil { + return "", err + } + if len(doc.Content) == 0 { + return manifest, nil + } + root := doc.Content[0] + if root.Kind != yaml.MappingNode { + return manifest, nil + } + + specNode := findMapValueNode(root, "spec") + if specNode == nil || specNode.Kind != yaml.MappingNode { + return manifest, nil + } + replicaNode := findMapValueNode(specNode, "replicas") + if replicaNode == nil { + return manifest, nil + } + replicaNode.Kind = yaml.ScalarNode + replicaNode.Tag = "!!int" + replicaNode.Value = strconv.FormatInt(int64(replicas), 10) + + buf := bytes.NewBuffer(nil) + enc := yaml.NewEncoder(buf) + enc.SetIndent(2) + if err := enc.Encode(&doc); err != nil { + _ = enc.Close() + return "", err + } + if err := enc.Close(); err != nil { + return "", err + } + return strings.TrimSuffix(buf.String(), "---\n"), nil +} + +func findMapValueNode(mapping *yaml.Node, key string) *yaml.Node { + if mapping == nil { + return nil + } + if mapping.Kind != yaml.MappingNode { + return nil + } + + for i := 0; i+1 < len(mapping.Content); i += 2 { + keyNode := mapping.Content[i] + valueNode := mapping.Content[i+1] + if keyNode.Kind == yaml.ScalarNode && keyNode.Value == key { + return valueNode + } + } + return nil +} + +// ExtractWorkloadReplicas 提取支持类型 workload 的 spec.replicas,并返回标准化的 "type/name -> replicas" 映射。 +func ExtractWorkloadReplicas(renderedYaml string) (map[string]int32, error) { + ret := make(map[string]int32) + if len(strings.TrimSpace(renderedYaml)) == 0 { + return ret, nil + } + + for _, manifest := range util.SplitManifests(renderedYaml) { + if len(strings.TrimSpace(manifest)) == 0 { + continue + } + + u, err := serializer.NewDecoder().YamlToUnstructured([]byte(manifest)) + if err != nil { + return nil, err + } + + switch NormalizeReplicaWorkloadType(u.GetKind()) { + case setting.Deployment, setting.StatefulSet, setting.CloneSet: + replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas") + if err != nil { + return nil, fmt.Errorf("failed to get replicas from %s/%s: %w", u.GetKind(), u.GetName(), err) + } + if !found { + continue + } + ret[ReplicaOverrideKey(u.GetKind(), u.GetName())] = int32(replicas) + } + } + + return ret, nil +} + +func GetWorkloadReplica(renderedYaml, workloadType, workloadName string) (int32, bool, error) { + replicaMap, err := ExtractWorkloadReplicas(renderedYaml) + if err != nil { + return 0, false, err + } + + replicas, found := replicaMap[ReplicaOverrideKey(workloadType, workloadName)] + return replicas, found, nil +} diff --git a/pkg/microservice/aslan/core/environment/handler/openapi.go b/pkg/microservice/aslan/core/environment/handler/openapi.go index 12a448ddab..a494436134 100644 --- a/pkg/microservice/aslan/core/environment/handler/openapi.go +++ b/pkg/microservice/aslan/core/environment/handler/openapi.go @@ -79,8 +79,8 @@ func OpenAPIScaleWorkloads(c *gin.Context) { ctx.Logger.Errorf("CreateProductTemplate json.Unmarshal err : %v", err) } - detail := fmt.Sprintf("环境名称:%s,%s:%s", req.EnvName, req.WorkloadType, req.WorkloadName) - detailEn := fmt.Sprintf("Environment name: %s,%s,%s", req.EnvName, req.WorkloadType, req.WorkloadName) + detail := fmt.Sprintf("环境名称:%s,服务名称:%s,%s:%s", req.EnvName, req.ServiceName, req.WorkloadType, req.WorkloadName) + detailEn := fmt.Sprintf("Environment name: %s, service name: %s,%s,%s", req.EnvName, req.ServiceName, req.WorkloadType, req.WorkloadName) internalhandler.InsertDetailedOperationLog( c, ctx.UserName+"(openAPI)", req.ProjectKey, setting.OperationSceneEnv, @@ -119,7 +119,7 @@ func OpenAPIScaleWorkloads(c *gin.Context) { } } - ctx.RespErr = service.OpenAPIScale(req, ctx.Logger) + ctx.RespErr = service.OpenAPIScale(req, ctx.UserName+"(openAPI)", ctx.Logger) } // @Summary 获取测试环境中k8s服务yaml diff --git a/pkg/microservice/aslan/core/environment/handler/service.go b/pkg/microservice/aslan/core/environment/handler/service.go index cf0ad29a0c..a43030711b 100644 --- a/pkg/microservice/aslan/core/environment/handler/service.go +++ b/pkg/microservice/aslan/core/environment/handler/service.go @@ -513,9 +513,6 @@ func ScaleNewService(c *gin.Context) { return } - args := new(service.ScaleArgs) - args.Type = setting.Deployment - projectKey := c.Query("projectName") serviceName := c.Param("serviceName") envName := c.Param("name") @@ -580,7 +577,7 @@ func ScaleNewService(c *gin.Context) { Name: name, Number: number, Production: production, - }, ctx.Logger) + }, ctx.UserName, ctx.Logger) } type OpenAPIGetServiceResponse struct { diff --git a/pkg/microservice/aslan/core/environment/service/environment.go b/pkg/microservice/aslan/core/environment/service/environment.go index 629bce9522..4a14e947d2 100644 --- a/pkg/microservice/aslan/core/environment/service/environment.go +++ b/pkg/microservice/aslan/core/environment/service/environment.go @@ -548,7 +548,7 @@ type UpdateEnv struct { } func UpdateMultipleK8sEnv(args []*UpdateEnv, envNames []string, productName, requestID string, force, production bool, username string, log *zap.SugaredLogger) ([]*EnvStatus, error) { - mutexAutoUpdate := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", productName)) + mutexAutoUpdate := cache.NewRedisLock(updateMultipleProductLockKey(productName)) err := mutexAutoUpdate.Lock() if err != nil { return nil, e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to acquire lock, err: %s", err)) @@ -758,16 +758,19 @@ func updateProductImpl(updateRevisionSvcs []string, deployStrategy map[string]st Revision: prodService.Revision, Render: prodService.Render, Containers: prodService.Containers, + WorkLoads: prodService.WorkLoads, } // need update service revision if util.InStringArray(prodService.ServiceName, updateRevisionSvcs) { - svcRev, ok := serviceRevisionMap[prodService.ServiceName+prodService.Type] + svcRev, ok := serviceRevisionMap[serviceNameTypeKey(prodService.ServiceName, prodService.Type)] if !ok { groupSvcs = append(groupSvcs, prodService) continue } - service.Revision = svcRev.NextRevision + if svcRev.NextRevision > 0 { + service.Revision = svcRev.NextRevision + } service.Containers = svcRev.Containers service.UpdateTime = time.Now().Unix() } @@ -2237,7 +2240,7 @@ func updateHelmProductVariable(productResp *commonmodels.Product, userName, requ } func UpdateMultipleHelmEnv(requestID, userName string, args *UpdateMultiHelmProductArg, production bool, log *zap.SugaredLogger) ([]*EnvStatus, error) { - mutexAutoUpdate := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", args.ProductName)) + mutexAutoUpdate := cache.NewRedisLock(updateMultipleProductLockKey(args.ProductName)) err := mutexAutoUpdate.Lock() if err != nil { return nil, e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to acquire lock, err: %s", err)) @@ -2314,7 +2317,7 @@ func UpdateMultipleHelmEnv(requestID, userName string, args *UpdateMultiHelmProd } func UpdateMultipleHelmChartEnv(requestID, userName string, args *UpdateMultiHelmProductArg, production bool, log *zap.SugaredLogger) ([]*EnvStatus, error) { - mutexUpdateMultiHelm := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", args.ProductName)) + mutexUpdateMultiHelm := cache.NewRedisLock(updateMultipleProductLockKey(args.ProductName)) err := mutexUpdateMultiHelm.Lock() if err != nil { @@ -3116,7 +3119,7 @@ func installProductHelmCharts(user, requestID string, args *commonmodels.Product func getServiceRevisionMap(serviceRevisionList []*SvcRevision) map[string]*SvcRevision { serviceRevisionMap := make(map[string]*SvcRevision) for _, revision := range serviceRevisionList { - serviceRevisionMap[revision.ServiceName+revision.Type] = revision + serviceRevisionMap[serviceNameTypeKey(revision.ServiceName, revision.Type)] = revision } return serviceRevisionMap } diff --git a/pkg/microservice/aslan/core/environment/service/environment_update.go b/pkg/microservice/aslan/core/environment/service/environment_update.go index 2f1c7ab043..b32ad23034 100644 --- a/pkg/microservice/aslan/core/environment/service/environment_update.go +++ b/pkg/microservice/aslan/core/environment/service/environment_update.go @@ -281,6 +281,7 @@ func updateK8sProduct(exitedProd *commonmodels.Product, user, requestID string, } } } + exitedProd.GlobalVariables = globalVariables log.Infof("[%s][P:%s] updateProductImpl, services: %v", envName, productName, updateRevisionSvc) @@ -301,6 +302,15 @@ func updateK8sProduct(exitedProd *commonmodels.Product, user, requestID string, // build services productSvcs := exitedProd.GetServiceMap() + currentSvcSnapshotMap := make(map[string]*commonmodels.ProductService) + for _, svcGroup := range exitedProd.Services { + for _, svc := range svcGroup { + if svc == nil || !svc.FromZadig() { + continue + } + currentSvcSnapshotMap[serviceNameTypeKey(svc.ServiceName, svc.Type)] = cloneProductService(svc) + } + } svcGroupMap := make(map[string]int) for i, svg := range exitedProd.Services { for _, svc := range svg { @@ -357,6 +367,14 @@ func updateK8sProduct(exitedProd *commonmodels.Product, user, requestID string, return e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to get product revision, err: %v", err)) } serviceRevisionMap := getServiceRevisionMap(prodRevs.ServiceRevisions) + latestServiceMap := make(map[string]*commonmodels.Service) + for _, svc := range allServices { + latestServiceMap[serviceNameTypeKey(svc.ServiceName, svc.Type)] = svc + } + + if err := syncUpdatedProductReplicaOverrides(updateProd, currentSvcSnapshotMap, serviceRevisionMap, updateRevisionSvc, latestServiceMap); err != nil { + return e.ErrUpdateEnv.AddErr(err) + } for _, prodServiceGroup := range updateProd.Services { for _, prodService := range prodServiceGroup { @@ -371,11 +389,12 @@ func updateK8sProduct(exitedProd *commonmodels.Product, user, requestID string, Revision: prodService.Revision, Render: prodService.Render, Containers: prodService.Containers, + WorkLoads: prodService.WorkLoads, } // need update service revision if util.InStringArray(prodService.ServiceName, updateRevisionSvc) { - svcRev, ok := serviceRevisionMap[prodService.ServiceName+prodService.Type] + svcRev, ok := serviceRevisionMap[serviceNameTypeKey(prodService.ServiceName, prodService.Type)] if !ok { continue } diff --git a/pkg/microservice/aslan/core/environment/service/k8s.go b/pkg/microservice/aslan/core/environment/service/k8s.go index 1bdea0f461..9fe9651d3a 100644 --- a/pkg/microservice/aslan/core/environment/service/k8s.go +++ b/pkg/microservice/aslan/core/environment/service/k8s.go @@ -163,6 +163,11 @@ func (k *K8sService) updateService(args *SvcOptArgs) error { newProductSvc.GetServiceRender().OverrideYaml.RenderVariableKVs = args.ServiceRev.VariableKVs newProductSvc.GetServiceRender().OverrideYaml.YamlContent = args.ServiceRev.VariableYaml + newProductSvc.WorkLoads, err = syncServiceReplicaOverrides(prodinfo, currentProductSvc, newProductSvc, nil) + if err != nil { + return e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to reconcile replica overrides, err: %s", err)) + } + kubeClient, err := clientmanager.NewKubeClientManager().GetControllerRuntimeClient(prodinfo.ClusterID) if err != nil { return e.ErrUpdateEnv.AddErr(err) diff --git a/pkg/microservice/aslan/core/environment/service/replica_scale.go b/pkg/microservice/aslan/core/environment/service/replica_scale.go new file mode 100644 index 0000000000..e070bcf2e0 --- /dev/null +++ b/pkg/microservice/aslan/core/environment/service/replica_scale.go @@ -0,0 +1,215 @@ +package service + +import ( + "fmt" + "regexp" + "strings" + + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/kube" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/repository" + commontypes "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/types" + "github.com/koderover/zadig/v2/pkg/tool/kube/serializer" + "github.com/koderover/zadig/v2/pkg/util" + "github.com/koderover/zadig/v2/pkg/util/converter" + "sigs.k8s.io/yaml" +) + +var ( + directReplicaVariableRegexp = regexp.MustCompile(`(?m)^[ \t]*replicas:[ \t]*\{\{\s*\.([A-Za-z0-9_-]+(?:\.[A-Za-z0-9_-]+)*)\s*\}\}[ \t]*$`) + templatedReplicaRegexp = regexp.MustCompile(`(?m)^[ \t]*replicas:[^\n]*\{\{.*\}\}[ \t]*$`) +) + +type replicaSourceKind string + +const ( + replicaSourceLiteral replicaSourceKind = "literal" + replicaSourceService replicaSourceKind = "service" + replicaSourceGlobal replicaSourceKind = "global" +) + +type replicaSource struct { + Kind replicaSourceKind + RootKey string + SubPath string +} + +// resolveScaleReplicaSource 判定目标 workload 的 replicas 来源(字面量/服务变量/全局变量),并在变量场景返回变量路径。 +func resolveScaleReplicaSource(prod *commonmodels.Product, currentSvc *commonmodels.ProductService, currentTmpl *commonmodels.Service, workloadType, workloadName string) (*replicaSource, error) { + renderedYaml, err := renderServiceWithOverrides(prod, currentSvc, currentTmpl, nil) + if err != nil { + return nil, err + } + + path, templated, replicaRefCount, err := resolveReplicaVariablePath(renderedYaml, currentTmpl.Yaml, workloadType, workloadName) + if err != nil { + return nil, err + } + if !templated { + return &replicaSource{Kind: replicaSourceLiteral}, nil + } + + if replicaRefCount > 1 { + return nil, fmt.Errorf("replicas of workload %s/%s is shared by multiple replica locations or cannot be updated safely", workloadType, workloadName) + } + + rootKey, subPath := path, "" + if parts := strings.SplitN(path, ".", 2); len(parts) == 2 { + rootKey = parts[0] + subPath = parts[1] + } + mergedRenderKVs, err := mergeServiceRenderVariableKVs(currentTmpl.ServiceVariableKVs, currentSvc.GetServiceRender().OverrideYaml.RenderVariableKVs) + if err != nil { + return nil, fmt.Errorf("failed to merge render variables for service %s: %w", currentSvc.ServiceName, err) + } + var targetKV *commontypes.RenderVariableKV + for _, kv := range mergedRenderKVs { + if kv != nil && kv.Key == rootKey { + targetKV = kv + break + } + } + if targetKV == nil { + return nil, fmt.Errorf("failed to find replicas variable %s for workload %s/%s", path, workloadType, workloadName) + } + + if targetKV.UseGlobalVariable { + return &replicaSource{Kind: replicaSourceGlobal, RootKey: rootKey, SubPath: subPath}, nil + } + + return &replicaSource{Kind: replicaSourceService, RootKey: rootKey, SubPath: subPath}, nil +} + +// resolveReplicaVariablePath 在模板原文中定位目标 workload 的 replicas 变量路径。 +// 返回值中的 bool 表示 replicas 是否来自模板表达式,int 表示同变量路径在模板中被 replicas 直接引用的次数。 +func resolveReplicaVariablePath(renderedYaml, rawTemplateYaml, workloadType, workloadName string) (string, bool, int, error) { + manifestIndex := -1 + normalizedType := kube.NormalizeReplicaWorkloadType(workloadType) + manifests := util.SplitManifests(renderedYaml) + for index, manifest := range manifests { + if len(strings.TrimSpace(manifest)) == 0 { + continue + } + u, err := serializer.NewDecoder().YamlToUnstructured([]byte(manifest)) + if err != nil { + return "", false, 0, err + } + if kube.NormalizeReplicaWorkloadType(u.GetKind()) == normalizedType && u.GetName() == workloadName { + manifestIndex = index + break + } + } + if manifestIndex < 0 { + return "", false, 0, fmt.Errorf("failed to find workload %s/%s in rendered yaml", workloadType, workloadName) + } + + rawManifests := util.SplitManifests(rawTemplateYaml) + if manifestIndex >= len(rawManifests) { + return "", true, 0, fmt.Errorf("failed to map workload %s/%s to template manifest; replicas source cannot be determined safely", workloadType, workloadName) + } + + rawManifest := rawManifests[manifestIndex] + matches := directReplicaVariableRegexp.FindStringSubmatch(rawManifest) + if len(matches) == 2 { + replicaPath := matches[1] + replicaRefCount := 0 + for _, manifest := range rawManifests { + matches := directReplicaVariableRegexp.FindStringSubmatch(manifest) + if len(matches) == 2 && matches[1] == replicaPath { + replicaRefCount++ + } + } + return replicaPath, true, replicaRefCount, nil + } + if templatedReplicaRegexp.MatchString(rawManifest) { + return "", true, 0, fmt.Errorf("replicas of workload %s/%s is derived from a complex template expression", workloadType, workloadName) + } + return "", false, 0, nil +} + +// updateRenderVariableReplicaValue 更新 replicas 对应变量值;更新前会克隆 render kv,避免原对象被就地修改。 +func updateRenderVariableReplicaValue(renderVars []*commontypes.RenderVariableKV, rootKey, subPath string, replicas int) ([]*commontypes.RenderVariableKV, error) { + cloned := cloneRenderVariableKVs(renderVars) + for _, kv := range cloned { + if kv == nil || kv.Key != rootKey { + continue + } + if subPath == "" { + if kv.Type == commontypes.ServiceVariableKVTypeYaml { + renderedValue, err := yaml.Marshal(replicas) + if err != nil { + return nil, err + } + kv.Value = strings.TrimSpace(string(renderedValue)) + return cloned, nil + } + kv.Value = replicas + return cloned, nil + } + + if kv.Type != commontypes.ServiceVariableKVTypeYaml { + return nil, fmt.Errorf("variable %s does not support nested replica path %s", kv.Key, subPath) + } + yamlValue, ok := kv.Value.(string) + if !ok { + return nil, fmt.Errorf("variable %s is not a valid yaml value", kv.Key) + } + + flatMap, err := converter.YamlToFlatMap([]byte(yamlValue)) + if err != nil { + return nil, fmt.Errorf("failed to flatten variable %s: %w", kv.Key, err) + } + flatMap[subPath] = replicas + + expanded, err := converter.Expand(flatMap) + if err != nil { + return nil, fmt.Errorf("failed to expand variable %s: %w", kv.Key, err) + } + renderedValue, err := yaml.Marshal(expanded) + if err != nil { + return nil, fmt.Errorf("failed to marshal variable %s: %w", kv.Key, err) + } + kv.Value = string(renderedValue) + return cloned, nil + } + return nil, fmt.Errorf("failed to find render variable %s", rootKey) +} + +// buildPreviewCandidateOverrides 仅用于预览:基于候选变量/版本计算预期的副本 override,不修改当前环境状态。 +func buildPreviewCandidateOverrides(prod *commonmodels.Product, serviceName string, updateServiceRevision bool, variableKVs []*commontypes.RenderVariableKV) ([]*commonmodels.WorkLoad, error) { + currentSvc := cloneProductService(prod.GetServiceMap()[serviceName]) + if currentSvc == nil { + return nil, nil + } + + currentTmpl, err := loadServiceTemplateByRevision(currentSvc, prod.Production) + if err != nil { + return nil, err + } + candidateTmpl := currentTmpl + if updateServiceRevision { + candidateTmpl, err = repository.QueryTemplateService(&commonrepo.ServiceFindOption{ + ServiceName: serviceName, + ProductName: prod.ProductName, + Revision: 0, + }, prod.Production) + if err != nil { + return nil, err + } + } + + candidateSvc := cloneProductService(currentSvc) + candidateSvc.Revision = candidateTmpl.Revision + candidateRenderKVs, err := mergeServiceRenderVariableKVs(candidateTmpl.ServiceVariableKVs, currentSvc.GetServiceRender().OverrideYaml.RenderVariableKVs, variableKVs) + if err != nil { + return nil, err + } + candidateSvc.GetServiceRender().OverrideYaml.RenderVariableKVs = candidateRenderKVs + candidateSvc.GetServiceRender().OverrideYaml.YamlContent, err = commontypes.RenderVariableKVToYaml(candidateRenderKVs, true) + if err != nil { + return nil, err + } + + return syncServiceReplicaOverrides(prod, currentSvc, candidateSvc, nil) +} diff --git a/pkg/microservice/aslan/core/environment/service/replica_sync.go b/pkg/microservice/aslan/core/environment/service/replica_sync.go new file mode 100644 index 0000000000..697131bfa7 --- /dev/null +++ b/pkg/microservice/aslan/core/environment/service/replica_sync.go @@ -0,0 +1,247 @@ +package service + +import ( + "fmt" + "reflect" + "sort" + "strings" + + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + templatemodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models/template" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/kube" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/repository" + commontypes "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/types" + "github.com/koderover/zadig/v2/pkg/setting" + "github.com/koderover/zadig/v2/pkg/util" +) + +func cloneWorkLoads(workLoads []*commonmodels.WorkLoad) []*commonmodels.WorkLoad { + ret := make([]*commonmodels.WorkLoad, 0, len(workLoads)) + for _, item := range workLoads { + if item == nil { + continue + } + copied := *item + ret = append(ret, &copied) + } + return ret +} + +func cloneRenderVariableKVs(kvs []*commontypes.RenderVariableKV) []*commontypes.RenderVariableKV { + ret := make([]*commontypes.RenderVariableKV, 0, len(kvs)) + for _, kv := range kvs { + if kv == nil { + continue + } + copied := *kv + copied.Options = append([]string{}, kv.Options...) + ret = append(ret, &copied) + } + return ret +} + +func cloneProductService(service *commonmodels.ProductService) *commonmodels.ProductService { + if service == nil { + return nil + } + copied := *service + copied.Containers = append([]*commonmodels.Container{}, service.Containers...) + copied.Resources = append([]*commonmodels.ServiceResource{}, service.Resources...) + copied.WorkLoads = cloneWorkLoads(service.WorkLoads) + copied.Render = cloneServiceRender(service.GetServiceRender()) + return &copied +} + +func cloneServiceRender(render *templatemodels.ServiceRender) *templatemodels.ServiceRender { + if render == nil { + return nil + } + copied := *render + if render.OverrideYaml != nil { + overrideCopied := *render.OverrideYaml + overrideCopied.RenderVariableKVs = cloneRenderVariableKVs(render.OverrideYaml.RenderVariableKVs) + copied.OverrideYaml = &overrideCopied + } + return &copied +} + +func loadServiceTemplateByRevision(service *commonmodels.ProductService, production bool) (*commonmodels.Service, error) { + if service == nil { + return nil, fmt.Errorf("service is nil") + } + return repository.QueryTemplateService(&commonrepo.ServiceFindOption{ + ServiceName: service.ServiceName, + ProductName: service.ProductName, + Type: service.Type, + Revision: service.Revision, + }, production) +} + +func mergeServiceRenderVariableKVs(templateVars []*commontypes.ServiceVariableKV, kvGroups ...[]*commontypes.RenderVariableKV) ([]*commontypes.RenderVariableKV, error) { + mergedInput := [][]*commontypes.RenderVariableKV{commontypes.ServiceToRenderVariableKVs(templateVars)} + for _, group := range kvGroups { + mergedInput = append(mergedInput, cloneRenderVariableKVs(group)) + } + _, ret, err := commontypes.MergeRenderVariableKVs(mergedInput...) + if err != nil { + return nil, err + } + return ret, nil +} + +func renderServiceWithOverrides(prod *commonmodels.Product, service *commonmodels.ProductService, tmpl *commonmodels.Service, overrides []*commonmodels.WorkLoad) (string, error) { + serviceCopy := cloneProductService(service) + if serviceCopy == nil { + return "", fmt.Errorf("service is nil") + } + serviceCopy.WorkLoads = cloneWorkLoads(overrides) + return kube.RenderEnvServiceWithTempl(prod, serviceCopy.GetServiceRender(), serviceCopy, tmpl) +} + +func serviceReplicaStateChanged(currentSvc, candidateSvc *commonmodels.ProductService) bool { + if currentSvc == nil && candidateSvc == nil { + return false + } + if currentSvc == nil || candidateSvc == nil { + return true + } + currentRender := currentSvc.GetServiceRender().OverrideYaml + candidateRender := candidateSvc.GetServiceRender().OverrideYaml + return !reflect.DeepEqual(currentRender.RenderVariableKVs, candidateRender.RenderVariableKVs) || + currentRender.YamlContent != candidateRender.YamlContent || + !reflect.DeepEqual(currentSvc.WorkLoads, candidateSvc.WorkLoads) +} + +// syncServiceReplicaOverrides 通过对比当前与候选服务渲染结果,重新计算需要保留/更新的副本 override。 +// preloadedServiceTemplate 用于复用调用方已查询到的服务模板,命中 revision 时可避免重复查询。 +func syncServiceReplicaOverrides(prod *commonmodels.Product, currentSvc, candidateSvc *commonmodels.ProductService, preloadedServiceTemplate *commonmodels.Service) ([]*commonmodels.WorkLoad, error) { + if candidateSvc == nil { + return nil, fmt.Errorf("candidate service is nil") + } + if currentSvc == nil { + return cloneWorkLoads(candidateSvc.WorkLoads), nil + } + + currentSvc = cloneProductService(currentSvc) + candidateSvc = cloneProductService(candidateSvc) + + resolveTemplate := func(svc *commonmodels.ProductService, role string) (*commonmodels.Service, error) { + if preloadedServiceTemplate != nil && preloadedServiceTemplate.Revision == svc.Revision { + return preloadedServiceTemplate, nil + } + tmpl, err := loadServiceTemplateByRevision(svc, prod.Production) + if err != nil { + return nil, fmt.Errorf("failed to get %s service template %s: %w", role, svc.ServiceName, err) + } + return tmpl, nil + } + + currentTmpl, err := resolveTemplate(currentSvc, "current") + if err != nil { + return nil, err + } + candidateTmpl := currentTmpl + if candidateSvc.Revision != currentSvc.Revision { + candidateTmpl, err = resolveTemplate(candidateSvc, "candidate") + if err != nil { + return nil, err + } + } + + return reconcileReplicaOverrides(prod, currentSvc, currentTmpl, candidateSvc, candidateTmpl) +} + +// syncUpdatedProductReplicaOverrides 对更新后的产品中所有 k8s 服务执行副本 override 对齐(含升版服务)。 +func syncUpdatedProductReplicaOverrides(prod *commonmodels.Product, currentSvcSnapshotMap map[string]*commonmodels.ProductService, serviceRevisionMap map[string]*SvcRevision, updateRevisionSvcs []string, latestServiceMap map[string]*commonmodels.Service) error { + for _, prodServiceGroup := range prod.Services { + for _, prodService := range prodServiceGroup { + if prodService == nil || prodService.Type != setting.K8SDeployType { + continue + } + + serviceKey := serviceNameTypeKey(prodService.ServiceName, prodService.Type) + currentSvcSnapshot := currentSvcSnapshotMap[serviceKey] + if currentSvcSnapshot == nil { + continue + } + + candidateSvc := cloneProductService(prodService) + if util.InStringArray(candidateSvc.ServiceName, updateRevisionSvcs) { + if svcRev, ok := serviceRevisionMap[serviceKey]; ok { + if svcRev.NextRevision > 0 { + candidateSvc.Revision = svcRev.NextRevision + } + candidateSvc.Containers = svcRev.Containers + } + } + + overrides, err := syncServiceReplicaOverrides(prod, currentSvcSnapshot, candidateSvc, latestServiceMap[serviceKey]) + if err != nil { + return fmt.Errorf("failed to reconcile replica overrides for service %s: %w", prodService.ServiceName, err) + } + prodService.WorkLoads = overrides + } + } + return nil +} + +// reconcileReplicaOverrides 在副本未变化时保留现有 override,仅对变化的 workload 执行 upsert。 +func reconcileReplicaOverrides(prod *commonmodels.Product, currentSvc *commonmodels.ProductService, currentTmpl *commonmodels.Service, candidateSvc *commonmodels.ProductService, candidateTmpl *commonmodels.Service) ([]*commonmodels.WorkLoad, error) { + var ( + currentReplicaMap map[string]int32 + err error + ) + + if currentSvc != nil && currentTmpl != nil { + currentYaml, err := renderServiceWithOverrides(prod, currentSvc, currentTmpl, currentSvc.WorkLoads) + if err != nil { + return nil, err + } + currentReplicaMap, err = kube.ExtractWorkloadReplicas(currentYaml) + if err != nil { + return nil, err + } + } else { + currentReplicaMap = map[string]int32{} + } + + candidateYaml, err := renderServiceWithOverrides(prod, candidateSvc, candidateTmpl, nil) + if err != nil { + return nil, err + } + candidateReplicaMap, err := kube.ExtractWorkloadReplicas(candidateYaml) + if err != nil { + return nil, err + } + + baseOverrides := cloneWorkLoads(candidateSvc.WorkLoads) + if currentSvc != nil { + baseOverrides = cloneWorkLoads(currentSvc.WorkLoads) + } + + keys := make([]string, 0, len(candidateReplicaMap)) + for key := range candidateReplicaMap { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + candidateReplica := candidateReplicaMap[key] + currentReplica, exists := currentReplicaMap[key] + if exists && currentReplica == candidateReplica { + continue + } + workloadType, workloadName := "", key + if parts := strings.SplitN(key, "/", 2); len(parts) == 2 { + workloadType = kube.NormalizeReplicaWorkloadType(parts[0]) + workloadName = parts[1] + } + baseOverrides, err = kube.UpsertWorkLoadsReplicas(baseOverrides, workloadType, workloadName, candidateReplica) + if err != nil { + return nil, err + } + } + + return baseOverrides, nil +} diff --git a/pkg/microservice/aslan/core/environment/service/service.go b/pkg/microservice/aslan/core/environment/service/service.go index 025bc663c7..c12ca24dda 100644 --- a/pkg/microservice/aslan/core/environment/service/service.go +++ b/pkg/microservice/aslan/core/environment/service/service.go @@ -48,61 +48,6 @@ import ( "github.com/koderover/zadig/v2/pkg/util" ) -func Scale(args *ScaleArgs, logger *zap.SugaredLogger) error { - opt := &commonrepo.ProductFindOptions{ - Name: args.ProductName, - EnvName: args.EnvName, - Production: &args.Production, - } - prod, err := commonrepo.NewProductColl().Find(opt) - if err != nil { - return e.ErrScaleService.AddErr(err) - } - if prod.IsSleeping() { - return e.ErrScaleService.AddErr(fmt.Errorf("environment is sleeping")) - } - - kubeClient, err := clientmanager.NewKubeClientManager().GetControllerRuntimeClient(prod.ClusterID) - if err != nil { - return e.ErrScaleService.AddErr(err) - } - - namespace := prod.Namespace - - switch args.Type { - case setting.Deployment: - err = updater.ScaleDeployment(namespace, args.Name, args.Number, kubeClient) - if err != nil { - logger.Errorf("failed to scale %s/deploy/%s to %d", namespace, args.Name, args.Number) - } - case setting.StatefulSet: - err = updater.ScaleStatefulSet(namespace, args.Name, args.Number, kubeClient) - if err != nil { - logger.Errorf("failed to scale %s/sts/%s to %d", namespace, args.Name, args.Number) - } - case setting.CloneSet: - err = updater.ScaleCloneSet(namespace, args.Name, args.Number, kubeClient) - if err != nil { - logger.Errorf("failed to scale %s/cloneset/%s to %d", namespace, args.Name, args.Number) - } - } - - return nil -} - -func OpenAPIScale(req *OpenAPIScaleServiceReq, logger *zap.SugaredLogger) error { - args := &ScaleArgs{ - Type: req.WorkloadType, - ProductName: req.ProjectKey, - EnvName: req.EnvName, - Name: req.WorkloadName, - Number: req.TargetReplicas, - Production: false, - } - - return Scale(args, logger) -} - func RestartScale(args *RestartScaleArgs, production bool, _ *zap.SugaredLogger) error { opt := &commonrepo.ProductFindOptions{ Name: args.ProductName, @@ -407,6 +352,19 @@ func PreviewService(args *PreviewServiceArgs, _ *zap.SugaredLogger) (*SvcDiffRes return ret, nil } + // product, err := commonrepo.NewProductColl().Find(&commonrepo.ProductFindOptions{ + // Name: args.ProductName, + // EnvName: args.EnvName, + // }) + // if err != nil { + // return nil, e.ErrPreviewYaml.AddErr(err) + // } + + // candidateOverrides, err := buildPreviewCandidateOverrides(product, args.ServiceName, args.UpdateServiceRevision, args.VariableKVs) + // if err != nil { + // return nil, e.ErrPreviewYaml.AddErr(err) + // } + latestYaml, _, _, err := kube.GenerateRenderedYaml(&kube.GeneSvcYamlOption{ ProductName: args.ProductName, EnvName: args.EnvName, @@ -415,6 +373,8 @@ func PreviewService(args *PreviewServiceArgs, _ *zap.SugaredLogger) (*SvcDiffRes VariableYaml: newVariableYaml, VariableKVs: args.VariableKVs, Containers: args.ServiceModules, + // ReplicaOverrides: candidateOverrides, + // IgnoreCurrentReplicaOverrides: true, }) if err != nil { return nil, e.ErrPreviewYaml.AddErr(err) diff --git a/pkg/microservice/aslan/core/environment/service/service_keys.go b/pkg/microservice/aslan/core/environment/service/service_keys.go new file mode 100644 index 0000000000..b2a52672a4 --- /dev/null +++ b/pkg/microservice/aslan/core/environment/service/service_keys.go @@ -0,0 +1,13 @@ +package service + +import "fmt" + +const updateMultipleProductLockKeyFormat = "update_multiple_product:%s" + +func updateMultipleProductLockKey(productName string) string { + return fmt.Sprintf(updateMultipleProductLockKeyFormat, productName) +} + +func serviceNameTypeKey(serviceName, serviceType string) string { + return serviceName + ":" + serviceType +} diff --git a/pkg/microservice/aslan/core/environment/service/service_scale.go b/pkg/microservice/aslan/core/environment/service/service_scale.go new file mode 100644 index 0000000000..dc2b45e224 --- /dev/null +++ b/pkg/microservice/aslan/core/environment/service/service_scale.go @@ -0,0 +1,260 @@ +package service + +import ( + "context" + "fmt" + "time" + + "github.com/koderover/zadig/v2/pkg/tool/clientmanager" + "github.com/openkruise/kruise-api/apps/v1alpha1" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" + commonrepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb" + templaterepo "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/mongodb/template" + "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/service/kube" + commontypes "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/types" + commonutil "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/util" + "github.com/koderover/zadig/v2/pkg/setting" + "github.com/koderover/zadig/v2/pkg/tool/cache" + e "github.com/koderover/zadig/v2/pkg/tool/errors" + "github.com/koderover/zadig/v2/pkg/tool/kube/updater" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +// Scale 先校验副本来源,再执行集群副本变更,最后在需要时持久化环境状态与版本记录。 +func Scale(args *ScaleArgs, updateBy string, logger *zap.SugaredLogger) error { + opt := &commonrepo.ProductFindOptions{ + Name: args.ProductName, + EnvName: args.EnvName, + Production: &args.Production, + } + prod, err := commonrepo.NewProductColl().Find(opt) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + if prod.IsSleeping() { + return e.ErrScaleService.AddErr(fmt.Errorf("environment is sleeping")) + } + + project, err := templaterepo.NewProductColl().Find(args.ProductName) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + + kubeClient, err := clientmanager.NewKubeClientManager().GetControllerRuntimeClient(prod.ClusterID) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + + if !project.IsK8sYamlProduct() { + return scaleWorkload(prod.Namespace, args.Type, args.Name, args.Number, kubeClient, logger) + } + + mutexAutoUpdate := cache.NewRedisLock(updateMultipleProductLockKey(args.ProductName)) + if err := mutexAutoUpdate.Lock(); err != nil { + return e.ErrScaleService.AddErr(fmt.Errorf("failed to acquire lock, err: %s", err)) + } + defer func() { + mutexAutoUpdate.Unlock() + }() + + currentSvc, currentTmpl, err := findScaleTargetService(prod, args.ServiceName, args.Type, args.Name) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + + source, err := resolveScaleReplicaSource(prod, currentSvc, currentTmpl, args.Type, args.Name) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + if source.Kind == replicaSourceGlobal { + return e.ErrScaleService.AddErr(fmt.Errorf("replicas of workload %s/%s is sourced from environment global variables and cannot be updated by scale", args.Type, args.Name)) + } + + liveReplica, err := getWorkloadLiveReplica(prod.Namespace, args.Type, args.Name, kubeClient) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + + candidateSvc := cloneProductService(currentSvc) + switch source.Kind { + case replicaSourceLiteral: + case replicaSourceService: + mergedRenderKVs, err := mergeServiceRenderVariableKVs(currentTmpl.ServiceVariableKVs, currentSvc.GetServiceRender().OverrideYaml.RenderVariableKVs) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + updatedRenderKVs, err := updateRenderVariableReplicaValue(mergedRenderKVs, source.RootKey, source.SubPath, args.Number) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + candidateSvc.GetServiceRender().OverrideYaml.RenderVariableKVs = updatedRenderKVs + candidateSvc.GetServiceRender().OverrideYaml.YamlContent, err = commontypes.RenderVariableKVToYaml(updatedRenderKVs, true) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + default: + return e.ErrScaleService.AddErr(fmt.Errorf("unsupported replicas source for workload %s/%s", args.Type, args.Name)) + } + candidateSvc.WorkLoads, err = kube.UpsertWorkLoadsReplicas(candidateSvc.WorkLoads, args.Type, args.Name, int32(args.Number)) + if err != nil { + return e.ErrScaleService.AddErr(err) + } + candidateSvc.UpdateTime = time.Now().Unix() + + envStateChanged := serviceReplicaStateChanged(currentSvc, candidateSvc) + targetReplica := int32(args.Number) + if liveReplica == targetReplica && !envStateChanged { + return nil + } + + if liveReplica != targetReplica { + if err := scaleWorkload(prod.Namespace, args.Type, args.Name, args.Number, kubeClient, logger); err != nil { + return e.ErrScaleService.AddErr(err) + } + } + + if !envStateChanged { + return nil + } + + if err := updateEnvService(prod, candidateSvc); err != nil { + return e.ErrScaleService.AddErr(err) + } + + session := mongotool.Session() + defer session.EndSession(context.Background()) + if err := mongotool.StartTransaction(session); err != nil { + return e.ErrScaleService.AddErr(err) + } + + productColl := commonrepo.NewProductCollWithSession(session) + if err := productColl.Update(prod); err != nil { + mongotool.AbortTransaction(session) + return e.ErrScaleService.AddErr(err) + } + + if err := commonutil.CreateEnvServiceVersion(prod, candidateSvc, updateBy, config.EnvOperationDefault, "", session, logger); err != nil { + mongotool.AbortTransaction(session) + return e.ErrScaleService.AddErr(err) + } + + if err := mongotool.CommitTransaction(session); err != nil { + return e.ErrScaleService.AddErr(err) + } + + return nil +} + +func OpenAPIScale(req *OpenAPIScaleServiceReq, updateBy string, logger *zap.SugaredLogger) error { + args := &ScaleArgs{ + Type: req.WorkloadType, + ProductName: req.ProjectKey, + EnvName: req.EnvName, + ServiceName: req.ServiceName, + Name: req.WorkloadName, + Number: req.TargetReplicas, + Production: false, + } + + return Scale(args, updateBy, logger) +} + +func scaleWorkload(namespace, workloadType, workloadName string, replicas int, kubeClient client.Client, logger *zap.SugaredLogger) error { + switch kube.NormalizeReplicaWorkloadType(workloadType) { + case setting.Deployment: + if err := updater.ScaleDeployment(namespace, workloadName, replicas, kubeClient); err != nil { + logger.Errorf("failed to scale %s/deployment/%s to %d", namespace, workloadName, replicas) + return err + } + case setting.StatefulSet: + if err := updater.ScaleStatefulSet(namespace, workloadName, replicas, kubeClient); err != nil { + logger.Errorf("failed to scale %s/statefulset/%s to %d", namespace, workloadName, replicas) + return err + } + case setting.CloneSet: + if err := updater.ScaleCloneSet(namespace, workloadName, replicas, kubeClient); err != nil { + logger.Errorf("failed to scale %s/cloneset/%s to %d", namespace, workloadName, replicas) + return err + } + default: + return fmt.Errorf("unsupported workload type: %s", workloadType) + } + return nil +} + +func getWorkloadLiveReplica(namespace, workloadType, workloadName string, kubeClient client.Client) (int32, error) { + switch kube.NormalizeReplicaWorkloadType(workloadType) { + case setting.Deployment: + obj := &appsv1.Deployment{} + if err := kubeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: workloadName}, obj); err != nil { + return 0, err + } + if obj.Spec.Replicas == nil { + return 1, nil + } + return *obj.Spec.Replicas, nil + case setting.StatefulSet: + obj := &appsv1.StatefulSet{} + if err := kubeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: workloadName}, obj); err != nil { + return 0, err + } + if obj.Spec.Replicas == nil { + return 1, nil + } + return *obj.Spec.Replicas, nil + case setting.CloneSet: + obj := &v1alpha1.CloneSet{} + if err := kubeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: workloadName}, obj); err != nil { + return 0, err + } + if obj.Spec.Replicas == nil { + return 1, nil + } + return *obj.Spec.Replicas, nil + default: + return 0, fmt.Errorf("unsupported workload type: %s", workloadType) + } +} + +// findScaleTargetService 根据 serviceName 解析目标服务及模板,并校验目标 workload 确实属于该服务。 +func findScaleTargetService(prod *commonmodels.Product, serviceName, workloadType, workloadName string) (*commonmodels.ProductService, *commonmodels.Service, error) { + service := prod.GetServiceMap()[serviceName] + if service == nil || service.Type != setting.K8SDeployType { + return nil, nil, fmt.Errorf("failed to find k8s service %s in env %s", serviceName, prod.EnvName) + } + + tmpl, err := loadServiceTemplateByRevision(service, prod.Production) + if err != nil { + return nil, nil, err + } + + renderedYaml, err := renderServiceWithOverrides(prod, service, tmpl, service.WorkLoads) + if err != nil { + return nil, nil, err + } + _, found, err := kube.GetWorkloadReplica(renderedYaml, workloadType, workloadName) + if err != nil { + return nil, nil, err + } + if !found { + return nil, nil, fmt.Errorf("workload %s/%s does not belong to service %s", workloadType, workloadName, serviceName) + } + return service, tmpl, nil +} + +func updateEnvService(prod *commonmodels.Product, service *commonmodels.ProductService) error { + for _, group := range prod.Services { + for index, current := range group { + if current.ServiceName == service.ServiceName && current.Type == service.Type { + group[index] = service + return nil + } + } + } + return fmt.Errorf("failed to replace service %s in env %s", service.ServiceName, prod.EnvName) +} diff --git a/pkg/microservice/aslan/core/environment/service/types.go b/pkg/microservice/aslan/core/environment/service/types.go index daca0df5b3..f9f1decd08 100644 --- a/pkg/microservice/aslan/core/environment/service/types.go +++ b/pkg/microservice/aslan/core/environment/service/types.go @@ -244,6 +244,7 @@ type MatchedEnv struct { type OpenAPIScaleServiceReq struct { ProjectKey string `json:"project_key"` EnvName string `json:"env_key"` + ServiceName string `json:"service_name"` WorkloadName string `json:"workload_name"` WorkloadType string `json:"workload_type"` TargetReplicas int `json:"target_replicas"` @@ -256,6 +257,9 @@ func (req *OpenAPIScaleServiceReq) Validate() error { if req.EnvName == "" { return fmt.Errorf("env_key is required") } + if req.ServiceName == "" { + return fmt.Errorf("service_name is required") + } if req.WorkloadName == "" { return fmt.Errorf("workload_name is required") } @@ -264,7 +268,7 @@ func (req *OpenAPIScaleServiceReq) Validate() error { } switch req.WorkloadType { - case setting.Deployment, setting.StatefulSet: + case setting.Deployment, setting.StatefulSet, setting.CloneSet: default: return fmt.Errorf("unsupported workload type: %s", req.WorkloadType) } diff --git a/pkg/util/yaml.go b/pkg/util/yaml.go index ea45283b03..1a250862fa 100644 --- a/pkg/util/yaml.go +++ b/pkg/util/yaml.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "sort" "strings" "github.com/koderover/zadig/v2/pkg/setting" @@ -30,11 +31,18 @@ func CombineManifests(yamls []string) string { return strings.Join(yamls, separator) } +// SplitManifests 按原始文件顺序返回 manifest 列表。 +// Helm 的 SplitManifests 返回 map,这里显式排序 key,避免顺序不稳定。 func SplitManifests(content string) []string { - var res []string + res := make([]string, 0) manifests := releaseutil.SplitManifests(content) - for _, m := range manifests { - res = append(res, m) + manifestKeys := make([]string, 0, len(manifests)) + for key := range manifests { + manifestKeys = append(manifestKeys, key) + } + sort.Sort(releaseutil.BySplitManifestsOrder(manifestKeys)) + for _, key := range manifestKeys { + res = append(res, manifests[key]) } return res