Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type ProductService struct {
Type string `bson:"type" json:"type"`
Revision int64 `bson:"revision" json:"revision"`
Containers []*Container `bson:"containers" json:"containers,omitempty"`
WorkLoads []*WorkLoad `bson:"workloads" json:"workloads"`
Error string `bson:"error,omitempty" json:"error,omitempty"`
Resources []*ServiceResource `bson:"resources,omitempty" json:"resources,omitempty"`
UpdateTime int64 `bson:"update_time" json:"update_time"`
Expand Down Expand Up @@ -228,6 +229,12 @@ func (svc *ProductService) GetContainerImageMap() map[string]string {
return resp
}

type WorkLoad struct {
WorkloadType string `bson:"workload_type" json:"workload_type"`
WorkloadName string `bson:"workload_name" json:"workload_name"`
Replicas int32 `bson:"replicas" json:"replicas"`
}

type ServiceConfig struct {
ConfigName string `bson:"config_name" json:"config_name"`
Revision int64 `bson:"revision" json:"revision"`
Expand Down
51 changes: 41 additions & 10 deletions pkg/microservice/aslan/core/common/service/kube/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ import (
)

type GeneSvcYamlOption struct {
ProductName string
EnvName string
ServiceName string
UpdateServiceRevision bool
VariableYaml string
VariableKVs []*commontypes.RenderVariableKV
UnInstall bool
Containers []*models.Container
ProductName string
EnvName string
ServiceName string
UpdateServiceRevision bool
VariableYaml string
VariableKVs []*commontypes.RenderVariableKV
ReplicaOverrides []*commonmodels.WorkLoad
IgnoreCurrentReplicaOverrides bool
UnInstall bool
Containers []*models.Container
}

type WorkloadResource struct {
Expand Down Expand Up @@ -404,7 +406,13 @@ func FetchCurrentAppliedYaml(option *GeneSvcYamlOption) (string, int, error) {
fullRenderedYaml = ParseSysKeys(productInfo.Namespace, productInfo.EnvName, option.ProductName, option.ServiceName, fullRenderedYaml)
mergedContainers := mergeContainers(prodSvcTemplate.Containers, curProductSvc.Containers)
fullRenderedYaml, _, err = ReplaceWorkloadImages(fullRenderedYaml, mergedContainers)
return fullRenderedYaml, 0, nil
if err != nil {
return "", 0, err
}

replicaOverrides := resolveReplicaOverrides(curProductSvc.WorkLoads, option.ReplicaOverrides, option.IgnoreCurrentReplicaOverrides)
fullRenderedYaml, err = ApplyReplicaOverrides(fullRenderedYaml, replicaOverrides)
return fullRenderedYaml, 0, err
}

func FetchImportedManifests(option *GeneSvcYamlOption, productInfo *models.Product, serviceTmp *models.Service, svcRender *template.ServiceRender) (string, []*WorkloadResource, error) {
Expand Down Expand Up @@ -580,9 +588,29 @@ func GenerateRenderedYaml(option *GeneSvcYamlOption) (string, int, []*WorkloadRe

mergedContainers := mergeContainers(curContainers, latestSvcTemplate.Containers, svcContainersInProduct, option.Containers)
fullRenderedYaml, workloadResource, err := ReplaceWorkloadImages(fullRenderedYaml, mergedContainers)
if err != nil {
return "", 0, nil, err
}

var currentReplicaOverrides []*commonmodels.WorkLoad
if curProductSvc != nil {
currentReplicaOverrides = curProductSvc.WorkLoads
}
replicaOverrides := resolveReplicaOverrides(currentReplicaOverrides, option.ReplicaOverrides, option.IgnoreCurrentReplicaOverrides)
fullRenderedYaml, err = ApplyReplicaOverrides(fullRenderedYaml, replicaOverrides)
return fullRenderedYaml, int(latestSvcTemplate.Revision), workloadResource, err
}

func resolveReplicaOverrides(currentOverrides, optionOverrides []*commonmodels.WorkLoad, ignoreCurrent bool) []*commonmodels.WorkLoad {
if optionOverrides != nil {
return optionOverrides
}
if ignoreCurrent {
return nil
}
return currentOverrides
}

func RenderServiceYaml(originYaml, productName, serviceName string, svcRender *template.ServiceRender) (string, error) {
if svcRender == nil {
originYaml = strings.ReplaceAll(originYaml, setting.TemplateVariableProduct, productName)
Expand Down Expand Up @@ -618,5 +646,8 @@ func RenderEnvServiceWithTempl(prod *commonmodels.Product, serviceRender *templa
}
parsedYaml = ParseSysKeys(prod.Namespace, prod.EnvName, prod.ProductName, service.ServiceName, parsedYaml)
parsedYaml, _, err = ReplaceWorkloadImages(parsedYaml, service.Containers)
return parsedYaml, err
if err != nil {
return "", err
}
return ApplyReplicaOverrides(parsedYaml, service.WorkLoads)
}
164 changes: 164 additions & 0 deletions pkg/microservice/aslan/core/common/service/kube/replica_override.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package kube

import (
"fmt"
"strings"

commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models"
"github.com/koderover/zadig/v2/pkg/setting"
"github.com/koderover/zadig/v2/pkg/tool/kube/serializer"
"github.com/koderover/zadig/v2/pkg/util"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"
)

var supportedReplicaWorkloadTypes = map[string]string{
strings.ToLower(setting.Deployment): setting.Deployment,
strings.ToLower(setting.StatefulSet): setting.StatefulSet,
strings.ToLower(setting.CloneSet): setting.CloneSet,
}

func ReplicaOverrideKey(workloadType, workloadName string) string {
return strings.ToLower(workloadType) + "/" + workloadName
}

func NormalizeReplicaWorkloadType(workloadType string) string {
if normalizedType, ok := supportedReplicaWorkloadTypes[strings.ToLower(workloadType)]; ok {
return normalizedType
}
return workloadType
}

func ValidateReplicaWorkloadType(workloadType string) error {
if _, ok := supportedReplicaWorkloadTypes[strings.ToLower(workloadType)]; ok {
return nil
}
return fmt.Errorf("unsupported replica workload type %q", workloadType)
}

func normalizeReplicaOverrideTarget(workloadType, workloadName string) (string, string, error) {
if err := ValidateReplicaWorkloadType(workloadType); err != nil {
return "", "", err
}
workloadName = strings.TrimSpace(workloadName)
if workloadName == "" {
return "", "", fmt.Errorf("workload name is empty")
}
return NormalizeReplicaWorkloadType(workloadType), workloadName, nil
}

func UpsertWorkLoadsReplicas(origins []*commonmodels.WorkLoad, workloadType, workloadName string, replicas int32) ([]*commonmodels.WorkLoad, error) {
normalizedType, workloadName, err := normalizeReplicaOverrideTarget(workloadType, workloadName)
if err != nil {
return nil, err
}
for _, item := range origins {
if item == nil {
continue
}
if NormalizeReplicaWorkloadType(item.WorkloadType) == normalizedType && item.WorkloadName == workloadName {
item.WorkloadType = normalizedType
item.WorkloadName = workloadName
item.Replicas = replicas
return origins, nil
}
}

return append(origins, &commonmodels.WorkLoad{
WorkloadType: normalizedType,
WorkloadName: workloadName,
Replicas: replicas,
}), nil
}

// ApplyReplicaOverrides 按归一化后的 workload 目标,把 override 写入渲染结果中的 spec.replicas。
func ApplyReplicaOverrides(renderedYaml string, overrides []*commonmodels.WorkLoad) (string, error) {
if len(overrides) == 0 || len(strings.TrimSpace(renderedYaml)) == 0 {
return renderedYaml, nil
}

overrideMap := make(map[string]int32, len(overrides))
for _, item := range overrides {
if item == nil {
continue
}
normalizedType, workloadName, err := normalizeReplicaOverrideTarget(item.WorkloadType, item.WorkloadName)
if err != nil {
return "", err
}
overrideMap[ReplicaOverrideKey(normalizedType, workloadName)] = item.Replicas
}

manifests := util.SplitManifests(renderedYaml)
updated := make([]string, 0, len(manifests))
for _, manifest := range manifests {
if len(strings.TrimSpace(manifest)) == 0 {
continue
}

u, err := serializer.NewDecoder().YamlToUnstructured([]byte(manifest))
if err != nil {
return "", err
}

switch NormalizeReplicaWorkloadType(u.GetKind()) {
case setting.Deployment, setting.StatefulSet, setting.CloneSet:
if replicas, ok := overrideMap[ReplicaOverrideKey(u.GetKind(), u.GetName())]; ok {
if err := unstructured.SetNestedField(u.Object, int64(replicas), "spec", "replicas"); err != nil {
return "", fmt.Errorf("failed to set replicas for %s/%s: %w", u.GetKind(), u.GetName(), err)
}
}
}

yamlBytes, err := yaml.Marshal(u.Object)
if err != nil {
return "", fmt.Errorf("failed to marshal %s/%s: %w", u.GetKind(), u.GetName(), err)
}
updated = append(updated, string(yamlBytes))
}

return util.JoinYamls(updated), nil
}

// ExtractWorkloadReplicas 提取支持类型 workload 的 spec.replicas,并返回标准化的 "type/name -> replicas" 映射。
func ExtractWorkloadReplicas(renderedYaml string) (map[string]int32, error) {
ret := make(map[string]int32)
if len(strings.TrimSpace(renderedYaml)) == 0 {
return ret, nil
}

for _, manifest := range util.SplitManifests(renderedYaml) {
if len(strings.TrimSpace(manifest)) == 0 {
continue
}

u, err := serializer.NewDecoder().YamlToUnstructured([]byte(manifest))
if err != nil {
return nil, err
}

switch NormalizeReplicaWorkloadType(u.GetKind()) {
case setting.Deployment, setting.StatefulSet, setting.CloneSet:
replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
if err != nil {
return nil, fmt.Errorf("failed to get replicas from %s/%s: %w", u.GetKind(), u.GetName(), err)
}
if !found {
continue
}
ret[ReplicaOverrideKey(u.GetKind(), u.GetName())] = int32(replicas)
}
}

return ret, nil
}

func GetWorkloadReplica(renderedYaml, workloadType, workloadName string) (int32, bool, error) {
replicaMap, err := ExtractWorkloadReplicas(renderedYaml)
if err != nil {
return 0, false, err
}

replicas, found := replicaMap[ReplicaOverrideKey(workloadType, workloadName)]
return replicas, found, nil
}
6 changes: 3 additions & 3 deletions pkg/microservice/aslan/core/environment/handler/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func OpenAPIScaleWorkloads(c *gin.Context) {
ctx.Logger.Errorf("CreateProductTemplate json.Unmarshal err : %v", err)
}

detail := fmt.Sprintf("环境名称:%s,%s:%s", req.EnvName, req.WorkloadType, req.WorkloadName)
detailEn := fmt.Sprintf("Environment name: %s,%s,%s", req.EnvName, req.WorkloadType, req.WorkloadName)
detail := fmt.Sprintf("环境名称:%s,服务名称:%s,%s:%s", req.EnvName, req.ServiceName, req.WorkloadType, req.WorkloadName)
detailEn := fmt.Sprintf("Environment name: %s, service name: %s,%s,%s", req.EnvName, req.ServiceName, req.WorkloadType, req.WorkloadName)
internalhandler.InsertDetailedOperationLog(
c, ctx.UserName+"(openAPI)",
req.ProjectKey, setting.OperationSceneEnv,
Expand Down Expand Up @@ -119,7 +119,7 @@ func OpenAPIScaleWorkloads(c *gin.Context) {
}
}

ctx.RespErr = service.OpenAPIScale(req, ctx.Logger)
ctx.RespErr = service.OpenAPIScale(req, ctx.UserName+"(openAPI)", ctx.Logger)
}

// @Summary 获取测试环境中k8s服务yaml
Expand Down
5 changes: 1 addition & 4 deletions pkg/microservice/aslan/core/environment/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,6 @@ func ScaleNewService(c *gin.Context) {
return
}

args := new(service.ScaleArgs)
args.Type = setting.Deployment

projectKey := c.Query("projectName")
serviceName := c.Param("serviceName")
envName := c.Param("name")
Expand Down Expand Up @@ -580,7 +577,7 @@ func ScaleNewService(c *gin.Context) {
Name: name,
Number: number,
Production: production,
}, ctx.Logger)
}, ctx.UserName, ctx.Logger)
}

type OpenAPIGetServiceResponse struct {
Expand Down
15 changes: 9 additions & 6 deletions pkg/microservice/aslan/core/environment/service/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ type UpdateEnv struct {
}

func UpdateMultipleK8sEnv(args []*UpdateEnv, envNames []string, productName, requestID string, force, production bool, username string, log *zap.SugaredLogger) ([]*EnvStatus, error) {
mutexAutoUpdate := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", productName))
mutexAutoUpdate := cache.NewRedisLock(updateMultipleProductLockKey(productName))
err := mutexAutoUpdate.Lock()
if err != nil {
return nil, e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to acquire lock, err: %s", err))
Expand Down Expand Up @@ -758,16 +758,19 @@ func updateProductImpl(updateRevisionSvcs []string, deployStrategy map[string]st
Revision: prodService.Revision,
Render: prodService.Render,
Containers: prodService.Containers,
WorkLoads: prodService.WorkLoads,
}

// need update service revision
if util.InStringArray(prodService.ServiceName, updateRevisionSvcs) {
svcRev, ok := serviceRevisionMap[prodService.ServiceName+prodService.Type]
svcRev, ok := serviceRevisionMap[serviceNameTypeKey(prodService.ServiceName, prodService.Type)]
if !ok {
groupSvcs = append(groupSvcs, prodService)
continue
}
service.Revision = svcRev.NextRevision
if svcRev.NextRevision > 0 {
service.Revision = svcRev.NextRevision
}
service.Containers = svcRev.Containers
service.UpdateTime = time.Now().Unix()
}
Expand Down Expand Up @@ -2237,7 +2240,7 @@ func updateHelmProductVariable(productResp *commonmodels.Product, userName, requ
}

func UpdateMultipleHelmEnv(requestID, userName string, args *UpdateMultiHelmProductArg, production bool, log *zap.SugaredLogger) ([]*EnvStatus, error) {
mutexAutoUpdate := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", args.ProductName))
mutexAutoUpdate := cache.NewRedisLock(updateMultipleProductLockKey(args.ProductName))
err := mutexAutoUpdate.Lock()
if err != nil {
return nil, e.ErrUpdateEnv.AddErr(fmt.Errorf("failed to acquire lock, err: %s", err))
Expand Down Expand Up @@ -2314,7 +2317,7 @@ func UpdateMultipleHelmEnv(requestID, userName string, args *UpdateMultiHelmProd
}

func UpdateMultipleHelmChartEnv(requestID, userName string, args *UpdateMultiHelmProductArg, production bool, log *zap.SugaredLogger) ([]*EnvStatus, error) {
mutexUpdateMultiHelm := cache.NewRedisLock(fmt.Sprintf("update_multiple_product:%s", args.ProductName))
mutexUpdateMultiHelm := cache.NewRedisLock(updateMultipleProductLockKey(args.ProductName))

err := mutexUpdateMultiHelm.Lock()
if err != nil {
Expand Down Expand Up @@ -3116,7 +3119,7 @@ func installProductHelmCharts(user, requestID string, args *commonmodels.Product
func getServiceRevisionMap(serviceRevisionList []*SvcRevision) map[string]*SvcRevision {
serviceRevisionMap := make(map[string]*SvcRevision)
for _, revision := range serviceRevisionList {
serviceRevisionMap[revision.ServiceName+revision.Type] = revision
serviceRevisionMap[serviceNameTypeKey(revision.ServiceName, revision.Type)] = revision
}
return serviceRevisionMap
}
Expand Down
Loading
Loading