diff --git a/Makefile b/Makefile index 296c78cd0..e9d4f54df 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ ifeq ($(GIT_DIFF), 1) GIT_TREESTATE = "dirty" endif BUILDDATE = $(shell date -u +'%Y-%m-%dT%H:%M:%SZ') +BUILDMODE?="MANUAL" LDFLAGS = "-X github.com/gocrane/crane/pkg/version.gitTag=$(GIT_VERSION) \ -X github.com/gocrane/crane/pkg/version.gitCommit=$(GIT_COMMIT_HASH) \ @@ -119,19 +120,19 @@ images: image-craned image-crane-agent image-metric-adapter image-dashboard .PHONY: image-craned image-craned: ## Build docker image with the crane manager. - docker build --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=craned -t ${MANAGER_IMG} . + docker build --build-arg BUILD=$(BUILDMODE) --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=craned -t ${MANAGER_IMG} . .PHONY: image-dashboard image-dashboard: ## Build docker image with the crane dashboard. - docker build --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=web -t ${DASHBOARD_IMG} ./pkg/web + docker build --build-arg BUILD=$(BUILDMODE) --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=web -t ${DASHBOARD_IMG} ./pkg/web .PHONY: image-crane-agent image-crane-agent: ## Build docker image with the crane agent. - docker build --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=crane-agent -t ${AGENT_IMG} . + docker build --build-arg BUILD=$(BUILDMODE) --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=crane-agent -t ${AGENT_IMG} . .PHONY: image-metric-adapter image-metric-adapter: ## Build docker image with the metric adapter. - docker build --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=metric-adapter -t ${ADAPTER_IMG} . + docker build --build-arg BUILD=$(BUILDMODE) --build-arg LDFLAGS=$(LDFLAGS) --build-arg PKGNAME=metric-adapter -t ${ADAPTER_IMG} . .PHONY: push-images push-images: push-image-craned push-image-crane-agent push-image-metric-adapter push-image-dashboard diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index fa236c6a2..d1f6d8d1d 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -124,6 +124,7 @@ func NewAgent(ctx context.Context, if err != nil { return agent, err } + legacyregistry.CustomMustRegister(metrics.NewNodeResourceCollector(nodeName, nodeInformer.Lister(), nodeResourceManager.GetResource)) managers = appendManagerIfNotNil(managers, nodeResourceManager) } @@ -134,6 +135,8 @@ func NewAgent(ctx context.Context, agent.managers = managers + legacyregistry.CustomMustRegister(metrics.NewPodResourceCollector(podInformer.Lister())) + return agent, nil } diff --git a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go index 8f331399c..c04be51e4 100644 --- a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go +++ b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go @@ -70,6 +70,7 @@ func NewCadvisorManager(cgroupDriver string) Manager { var includedMetrics = cadvisorcontainer.MetricSet{ cadvisorcontainer.CpuUsageMetrics: struct{}{}, cadvisorcontainer.ProcessSchedulerMetrics: struct{}{}, + cadvisorcontainer.MemoryUsageMetrics: struct{}{}, } allowDynamic := true @@ -162,11 +163,9 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { klog.Errorf("ContainerInfoRequest failed: %v", err) continue } - if hasExtMemRes && v.Stats[0].Memory != nil { extResMemUse += float64(v.Stats[0].Memory.WorkingSet) } - hasExtRes := hasExtCpuRes || hasExtMemRes var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes) if v.Stats[0].Memory != nil { diff --git a/pkg/known/types.go b/pkg/known/types.go index 8a9ad35b7..a31d6471b 100644 --- a/pkg/known/types.go +++ b/pkg/known/types.go @@ -1,5 +1,7 @@ package known +import "k8s.io/apimachinery/pkg/api/resource" + type Module string const ( @@ -9,3 +11,16 @@ const ( ModuleNodeResourceManager Module = "ModuleNodeResourceManager" ModulePodResourceManager Module = "ModulePodResourceManager" ) + +type ResourceStatus struct { + CPUReserved *resource.Quantity + CPUUsage *resource.Quantity + CPUUsageOffline *resource.Quantity + CPUSetIdle *resource.Quantity + MemoryReserved *resource.Quantity + MemoryUsage *resource.Quantity + MemoryUsageOffline *resource.Quantity + + CPUReservedTSP *resource.Quantity + MemoryReservedTSP *resource.Quantity +} diff --git a/pkg/known/vars.go b/pkg/known/vars.go index 8cc470dfc..08621a53f 100644 --- a/pkg/known/vars.go +++ b/pkg/known/vars.go @@ -1,6 +1,10 @@ package known -import "os" +import ( + "os" + + corev1 "k8s.io/api/core/v1" +) var ( CraneSystemNamespace = "crane-system" @@ -11,3 +15,13 @@ func init() { CraneSystemNamespace = namespace } } + +const ( + // ElasticResourcePrefix is crane resource namespace prefix. + ElasticResourcePrefix = "gocrane.io/" +) + +var ( + ElasticCPU = ElasticResourcePrefix + corev1.ResourceCPU + ElasticMemory = ElasticResourcePrefix + corev1.ResourceMemory +) diff --git a/pkg/metrics/node.go b/pkg/metrics/node.go new file mode 100644 index 000000000..b5850abe9 --- /dev/null +++ b/pkg/metrics/node.go @@ -0,0 +1,221 @@ +package metrics + +import ( + "github.com/gocrane/crane/pkg/known" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + v1 "k8s.io/client-go/listers/core/v1" + k8smetrics "k8s.io/component-base/metrics" + "k8s.io/klog/v2" +) + +const ( + CraneNodeSubsystem = "node" + CranePodSubsystem = "pod" +) + +var ( + podElasticCPUDesc = k8smetrics.NewDesc("crane_pod_elastic_cpu_request", + "The elastic cpu requested by pod", + []string{"pod", "namespace"}, + nil, + k8smetrics.ALPHA, + "", + ) + podElasticMemoryDesc = k8smetrics.NewDesc("crane_pod_elastic_memory_request", + "The elastic cpu requested by pod", + []string{"pod", "namespace"}, + nil, + k8smetrics.ALPHA, + "", + ) +) + +func NewPodResourceCollector(podLister v1.PodLister) *PodResourceCollector { + return &PodResourceCollector{ + podLister: podLister, + } +} + +type PodResourceCollector struct { + k8smetrics.BaseStableCollector + podLister v1.PodLister +} + +func (n *PodResourceCollector) DescribeWithStability(descs chan<- *k8smetrics.Desc) { + descs <- podElasticCPUDesc + descs <- podElasticMemoryDesc +} + +func (n *PodResourceCollector) CollectWithStability(metrics chan<- k8smetrics.Metric) { + pods, err := n.podLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "list pods failed") + return + } + + for _, pod := range pods { + if pod.Status.Phase != corev1.PodRunning { + continue + } + eCPU, eMemory := resource.NewQuantity(0, resource.DecimalSI), resource.NewQuantity(0, resource.BinarySI) + for _, container := range pod.Spec.Containers { + eCPU.Add(*container.Resources.Requests.Name(known.ElasticCPU, resource.DecimalSI)) + eMemory.Add(*container.Resources.Requests.Name(known.ElasticMemory, resource.DecimalSI)) + } + if eCPU.IsZero() && eMemory.IsZero() { + continue + } + metrics <- k8smetrics.NewLazyConstMetric(podElasticCPUDesc, k8smetrics.GaugeValue, eCPU.AsApproximateFloat64(), pod.Name, pod.Namespace) + metrics <- k8smetrics.NewLazyConstMetric(podElasticMemoryDesc, k8smetrics.GaugeValue, eMemory.AsApproximateFloat64(), pod.Name, pod.Namespace) + } +} + +var ( + nodeElasticCPUDesc = k8smetrics.NewDesc("crane_node_elastic_cpu_allocatable", + "The elastic cpu of the node.", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeElasticMemoryDesc = k8smetrics.NewDesc("crane_node_elastic_memory_allocatable", + "The elastic memory requested by pod", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeCPUAllocatableDesc = k8smetrics.NewDesc("crane_node_cpu_allocatable", + "The cpu allocatable of the node.", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeCPUCapacityDesc = k8smetrics.NewDesc("crane_node_cpu_capacity", + "The cpu capacity of the node.", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeMemoryAllocatableDesc = k8smetrics.NewDesc("crane_node_memory_allocatable", + "The memory allocatable requested by pod", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeMemoryCapacityDesc = k8smetrics.NewDesc("crane_node_memory_capacity", + "The memory capacity requested by pod", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeCPUReservedDesc = k8smetrics.NewDesc("crane_node_cpu_reserved", + "The reserved cpu of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeCPUUsageOnlineDesc = k8smetrics.NewDesc("crane_node_cpu_usage_online", + "The online cpu usage of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeCPUUsageOfflineDesc = k8smetrics.NewDesc("crane_node_cpu_usage_offline", + "The offline cpu usage of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeMemoryReservedDesc = k8smetrics.NewDesc("crane_node_memory_reserved", + "The reserved memory of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeMemoryUsageOnlineDesc = k8smetrics.NewDesc("crane_node_memory_usage_online", + "The online memory usage of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) + nodeMemoryUsageOfflineDesc = k8smetrics.NewDesc("crane_node_memory_usage_offline", + "The offline memory usage of node", + []string{"node"}, + nil, + k8smetrics.ALPHA, + "", + ) +) + +type NodeResourceCollector struct { + k8smetrics.BaseStableCollector + nodeName string + nodeLister v1.NodeLister + nodeResourceGetter func() *known.ResourceStatus +} + +func NewNodeResourceCollector(nodeName string, nodeLister v1.NodeLister, nodeResourceGetter func() *known.ResourceStatus) *NodeResourceCollector { + return &NodeResourceCollector{ + nodeName: nodeName, + nodeLister: nodeLister, + nodeResourceGetter: nodeResourceGetter, + } +} + +func (n *NodeResourceCollector) DescribeWithStability(descs chan<- *k8smetrics.Desc) { + // resource metrics from status of node + descs <- nodeElasticCPUDesc + descs <- nodeElasticMemoryDesc + descs <- nodeCPUAllocatableDesc + descs <- nodeCPUCapacityDesc + descs <- nodeMemoryAllocatableDesc + descs <- nodeMemoryCapacityDesc + + // usage metrics + descs <- nodeCPUReservedDesc + descs <- nodeCPUUsageOnlineDesc + descs <- nodeCPUUsageOfflineDesc + descs <- nodeMemoryReservedDesc + descs <- nodeMemoryUsageOnlineDesc + descs <- nodeMemoryUsageOfflineDesc +} + +func (n *NodeResourceCollector) CollectWithStability(metrics chan<- k8smetrics.Metric) { + node, err := n.nodeLister.Get(n.nodeName) + if err != nil { + klog.ErrorS(err, "list pods failed") + return + } + metrics <- k8smetrics.NewLazyConstMetric(nodeElasticCPUDesc, k8smetrics.GaugeValue, node.Status.Allocatable.Name(known.ElasticCPU, resource.DecimalSI).AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeElasticMemoryDesc, k8smetrics.GaugeValue, node.Status.Allocatable.Name(known.ElasticMemory, resource.BinarySI).AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeCPUAllocatableDesc, k8smetrics.GaugeValue, node.Status.Allocatable.Cpu().AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeMemoryAllocatableDesc, k8smetrics.GaugeValue, node.Status.Allocatable.Memory().AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeCPUCapacityDesc, k8smetrics.GaugeValue, node.Status.Capacity.Cpu().AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeMemoryCapacityDesc, k8smetrics.GaugeValue, node.Status.Capacity.Memory().AsApproximateFloat64(), node.Name) + + resourceStatus := n.nodeResourceGetter() + if resourceStatus == nil { + return + } + metrics <- k8smetrics.NewLazyConstMetric(nodeCPUReservedDesc, k8smetrics.GaugeValue, resourceStatus.CPUReserved.AsApproximateFloat64(), node.Name) + // TODO incorrect online define !! + metrics <- k8smetrics.NewLazyConstMetric(nodeCPUUsageOnlineDesc, k8smetrics.GaugeValue, resourceStatus.CPUUsage.AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeCPUUsageOfflineDesc, k8smetrics.GaugeValue, resourceStatus.CPUUsageOffline.AsApproximateFloat64(), node.Name) + + metrics <- k8smetrics.NewLazyConstMetric(nodeMemoryReservedDesc, k8smetrics.GaugeValue, resourceStatus.MemoryReserved.AsApproximateFloat64(), node.Name) + // TODO incorrect online define !! + metrics <- k8smetrics.NewLazyConstMetric(nodeMemoryUsageOnlineDesc, k8smetrics.GaugeValue, resourceStatus.MemoryUsage.AsApproximateFloat64(), node.Name) + metrics <- k8smetrics.NewLazyConstMetric(nodeMemoryUsageOfflineDesc, k8smetrics.GaugeValue, resourceStatus.MemoryUsageOffline.AsApproximateFloat64(), node.Name) +} diff --git a/pkg/resource/node_resource_manager.go b/pkg/resource/node_resource_manager.go index 130b3a5cb..7a60686f6 100644 --- a/pkg/resource/node_resource_manager.go +++ b/pkg/resource/node_resource_manager.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "math" + "reflect" "strconv" + "strings" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/json" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -33,7 +34,6 @@ import ( const ( MinDeltaRatio = 0.1 StateExpiration = 1 * time.Minute - TspUpdateInterval = 20 * time.Second NodeReserveResourcePercentageAnnotationPrefix = "reserve.node.gocrane.io/%s" ) @@ -43,32 +43,25 @@ var idToResourceMap = map[string]v1.ResourceName{ } // ReserveResource is the cpu and memory reserve configuration -type ReserveResource struct { - CpuPercent *float64 - MemPercent *float64 +type ReservedResource struct { + CpuPercent float64 + MemPercent float64 } type NodeResourceManager struct { - nodeName string - client clientset.Interface + nodeName string + tspName string + reservedResource ReservedResource + client clientset.Interface + recorder record.EventRecorder nodeLister corelisters.NodeLister nodeSynced cache.InformerSynced + tspLister predictionlisters.TimeSeriesPredictionLister + tspSynced cache.InformerSynced - tspLister predictionlisters.TimeSeriesPredictionLister - tspSynced cache.InformerSynced - - recorder record.EventRecorder - - stateChann chan map[string][]common.TimeSeries - - state map[string][]common.TimeSeries - // Updated when get new data from stateChann, used to determine whether state has expired - lastStateTime time.Time - - reserveResource ReserveResource - - tspName string + stateChann chan map[string][]common.TimeSeries + resourceStatus *known.ResourceStatus } func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, @@ -96,19 +89,15 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeRes tspSynced: tspInformer.Informer().HasSynced, recorder: recorder, stateChann: stateChann, - reserveResource: ReserveResource{ - CpuPercent: &reserveCpuPercent, - MemPercent: &reserveMemoryPercent, + reservedResource: ReservedResource{ + CpuPercent: reserveCpuPercent, + MemPercent: reserveMemoryPercent, }, tspName: tspName, } return o, nil } -func (o *NodeResourceManager) Name() string { - return "NodeResourceManager" -} - func (o *NodeResourceManager) Run(stop <-chan struct{}) { klog.Infof("Starting node resource manager.") @@ -122,16 +111,19 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { } go func() { - tspUpdateTicker := time.NewTicker(TspUpdateInterval) - defer tspUpdateTicker.Stop() for { select { case state := <-o.stateChann: - o.state = state - o.lastStateTime = time.Now() start := time.Now() metrics.UpdateLastTime(string(known.ModuleNodeResourceManager), metrics.StepUpdateNodeResource, start) - o.UpdateNodeResource() + if err := o.computeResourceStatus(state); err != nil { + klog.ErrorS(err, "build resource status failed") + continue + } + if err := o.updateResource(); err != nil { + klog.ErrorS(err, "build resource status failed") + continue + } metrics.UpdateDurationFromStart(string(known.ModuleNodeResourceManager), metrics.StepUpdateNodeResource, start) case <-stop: klog.Infof("node resource manager exit") @@ -143,133 +135,144 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { return } -func (o *NodeResourceManager) UpdateNodeResource() { - node := o.getNode() - if len(node.Status.Addresses) == 0 { - klog.Error("Node addresses is empty") - return - } - nodeCopy := node.DeepCopy() - - resourcesFrom := o.BuildNodeStatus(nodeCopy) - if !equality.Semantic.DeepEqual(&node.Status, &nodeCopy.Status) { - nodeCopyBytes, err := json.Marshal(nodeCopy) - if err != nil { - klog.Errorf("Failed to marshal node %s extended resource, %v", nodeCopy.Name, err) - return - } - - if _, err = o.client.CoreV1().Nodes().PatchStatus(context.TODO(), node.Name, nodeCopyBytes); err != nil { - klog.Errorf("Failed to update node %s extended resource, %v", nodeCopy.Name, err) - return +func (o *NodeResourceManager) computeResourceStatus(tsm map[string][]common.TimeSeries) error { + rs := &known.ResourceStatus{} + transform := func(name types.MetricName) (int64, error) { + if series, ok := tsm[string(name)]; !ok { + return 0, fmt.Errorf("series %s missed", name) + } else { + return int64(series[0].Samples[0].Value), nil } - klog.V(2).Infof("Update node %s extended resource successfully", node.Name) - o.recorder.Event(node, v1.EventTypeNormal, "UpdateNode", generateUpdateEventMessage(resourcesFrom)) } -} + // 1. Get resource usage + if val, err := transform(types.MetricNameCpuTotalUsage); err != nil { + return err + } else { + rs.CPUUsage = resource.NewMilliQuantity(val, resource.DecimalSI) + } + if val, err := transform(types.MetricNameExtResContainerCpuTotalUsage); err != nil { + return err + } else { + rs.CPUUsageOffline = resource.NewQuantity(val, resource.DecimalSI) + } + if val, err := transform(types.MetricNameExclusiveCPUIdle); err != nil { + return err + } else { + rs.CPUSetIdle = resource.NewMilliQuantity(val, resource.DecimalSI) + } + if val, err := transform(types.MetricNameMemoryTotalUsage); err != nil { + return err + } else { + rs.MemoryUsage = resource.NewQuantity(val, resource.BinarySI) + } + if val, err := transform(types.MetricNameExtResContainerMemTotalUsage); err != nil { + return err + } else { + rs.MemoryUsageOffline = resource.NewQuantity(val, resource.BinarySI) + } -func (o *NodeResourceManager) getNode() *v1.Node { - node, err := o.nodeLister.Get(o.nodeName) + // 2. Get resource reserved + node, err := o.getNode() if err != nil { - klog.Errorf("Failed to get node: %v", err) - return nil + return err + } + reservedCPUPercent := o.reservedResource.CpuPercent + if nodeReserveCpuPercent, ok := getReserveResourcePercentFromNodeAnnotations(node.GetAnnotations(), v1.ResourceCPU.String()); ok { + reservedCPUPercent = nodeReserveCpuPercent } - return node + reservedMemoryPercent := o.reservedResource.MemPercent + if nodeReserveMemPercent, ok := getReserveResourcePercentFromNodeAnnotations(node.GetAnnotations(), v1.ResourceMemory.String()); ok { + reservedMemoryPercent = nodeReserveMemPercent + } + rs.CPUReserved = resource.NewQuantity(int64(node.Status.Allocatable.Cpu().AsApproximateFloat64()*reservedCPUPercent), resource.DecimalSI) + rs.MemoryReserved = resource.NewQuantity(int64(node.Status.Allocatable.Memory().AsApproximateFloat64()*reservedMemoryPercent), resource.DecimalSI) + + // 3. Get resource from TSP + onlineResourceFromTSP := o.GetOnlineResourceFromTsp(node) + rs.CPUReservedTSP = resource.NewMilliQuantity(int64(onlineResourceFromTSP[v1.ResourceCPU]), resource.DecimalSI) + rs.MemoryReservedTSP = resource.NewQuantity(int64(onlineResourceFromTSP[v1.ResourceCPU]), resource.BinarySI) + + o.resourceStatus = rs + return nil +} + +func (o *NodeResourceManager) Name() string { + return "NodeResourceManager" +} + +func (o *NodeResourceManager) getNode() (*v1.Node, error) { + return o.nodeLister.Get(o.nodeName) } -func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPrediction, addresses []v1.NodeAddress) (bool, error) { +func (o *NodeResourceManager) NodeExisted(tsp *predictionapi.TimeSeriesPrediction, addresses []v1.NodeAddress) error { address := tsp.Spec.TargetRef.Name if address == "" { - return false, fmt.Errorf("tsp %s target is not specified", tsp.Name) + return fmt.Errorf("tsp %s target is not specified", tsp.Name) } // the reason we use node ip instead of node name as the target name is // some monitoring system does not persist node name for _, addr := range addresses { if addr.Address == address { - return true, nil + return nil } } - klog.V(4).Infof("Target %s mismatch this node", address) - return false, nil + return fmt.Errorf("address %s of TSP %s mismatch this node", tsp.Name, address) } -func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 { - tspCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromTsp(node) - localCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromLocal() - - reserveCpuPercent := o.reserveResource.CpuPercent - if nodeReserveCpuPercent, ok := getReserveResourcePercentFromNodeAnnotations(node.GetAnnotations(), v1.ResourceCPU.String()); ok { - reserveCpuPercent = &nodeReserveCpuPercent - } +func (o *NodeResourceManager) updateResource() error { - reserveMemPercent := o.reserveResource.MemPercent - if nodeReserveMemPercent, ok := getReserveResourcePercentFromNodeAnnotations(node.GetAnnotations(), v1.ResourceMemory.String()); ok { - reserveMemPercent = &nodeReserveMemPercent + origin, err := o.getNode() + if err != nil { + return err } - - extResourceFrom := map[string]int64{} - - for resourceName, value := range tspCanNotBeReclaimedResource { - klog.V(6).Infof("resourcename is %s", resourceName) - resourceFrom := "tsp" - maxUsage := value - if localCanNotBeReclaimedResource[resourceName] > maxUsage { - maxUsage = localCanNotBeReclaimedResource[resourceName] - resourceFrom = "local" - } - - var nextRecommendation float64 - switch resourceName { - case v1.ResourceCPU: - if *reserveCpuPercent != 0 { - nextRecommendation = float64(node.Status.Allocatable.Cpu().Value()) - float64(node.Status.Allocatable.Cpu().Value())*(*reserveCpuPercent) - maxUsage/1000 - } else { - nextRecommendation = float64(node.Status.Allocatable.Cpu().Value()) - maxUsage/1000 - } - case v1.ResourceMemory: - // unit of memory in prometheus is in Ki, need to be converted to byte - if *reserveMemPercent != 0 { - nextRecommendation = float64(node.Status.Allocatable.Memory().Value()) - float64(node.Status.Allocatable.Memory().Value())*(*reserveMemPercent) - maxUsage/1000 - } else { - klog.V(6).Infof("allocatable mem is %d, maxusage is %f", node.Status.Allocatable.Memory().Value(), maxUsage) - nextRecommendation = float64(node.Status.Allocatable.Memory().Value()) - maxUsage - } - default: - continue - } - if nextRecommendation < 0 { - nextRecommendation = 0 - } - metrics.UpdateNodeResourceRecommendedValue(metrics.SubComponentNodeResource, metrics.StepGetExtResourceRecommended, string(resourceName), resourceFrom, nextRecommendation) - extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName)) - resValue, exists := node.Status.Capacity[v1.ResourceName(extResourceName)] - if exists && resValue.Value() != 0 && - math.Abs(float64(resValue.Value())- - nextRecommendation)/float64(resValue.Value()) <= MinDeltaRatio { - continue - } - switch resourceName { - case v1.ResourceCPU: - node.Status.Capacity[v1.ResourceName(extResourceName)] = - *resource.NewQuantity(int64(nextRecommendation), resource.DecimalSI) - node.Status.Allocatable[v1.ResourceName(extResourceName)] = - *resource.NewQuantity(int64(nextRecommendation), resource.DecimalSI) - case v1.ResourceMemory: - node.Status.Capacity[v1.ResourceName(extResourceName)] = - *resource.NewQuantity(int64(nextRecommendation), resource.BinarySI) - node.Status.Allocatable[v1.ResourceName(extResourceName)] = - *resource.NewQuantity(int64(nextRecommendation), resource.BinarySI) + messages := []string{} + node := origin.DeepCopy() + updateIfNeed := func(name v1.ResourceName, next resource.Quantity) { + if existed := node.Status.Capacity[name]; math.Abs(existed.AsApproximateFloat64()-next.AsApproximateFloat64()) >= MinDeltaRatio*existed.AsApproximateFloat64() { + round := *resource.NewQuantity(next.Value(), next.Format) + node.Status.Capacity[name] = round + node.Status.Allocatable[name] = round + messages = append(messages, fmt.Sprintf("resource %s: %s -> %s (before round: %s)", name.String(), existed.String(), round.String(), next.String())) } + } - extResourceFrom[resourceFrom+"-"+resourceName.String()] = int64(nextRecommendation) + onlineCPU := o.resourceStatus.CPUUsage.DeepCopy() + onlineCPU.Sub(*o.resourceStatus.CPUUsageOffline) + onlineCPU.Add(*o.resourceStatus.CPUSetIdle) + if onlineCPU.Cmp(*o.resourceStatus.CPUReservedTSP) == -1 { + onlineCPU = o.resourceStatus.CPUReservedTSP.DeepCopy() } + onlineCPU.Add(*o.resourceStatus.CPUReserved) + // TODO should use allocatable CPU ??? + elasticCPU := node.Status.Allocatable.Cpu().DeepCopy() + elasticCPU.Sub(onlineCPU) + updateIfNeed(known.ElasticCPU, elasticCPU) + + onlineMemory := o.resourceStatus.MemoryUsage.DeepCopy() + onlineMemory.Sub(*o.resourceStatus.MemoryUsageOffline) + if onlineMemory.Cmp(*o.resourceStatus.MemoryReservedTSP) == -1 { + onlineMemory = o.resourceStatus.MemoryReservedTSP.DeepCopy() + } + onlineMemory.Add(*o.resourceStatus.MemoryReserved) + // TODO should use allocatable memory? + elasticMemory := node.Status.Allocatable.Memory().DeepCopy() + elasticMemory.Sub(onlineMemory) + updateIfNeed(known.ElasticMemory, elasticMemory) - return extResourceFrom + if reflect.DeepEqual(node.Status, origin.Status) { + return nil + } + if _, err = o.client.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}); err != nil { + return err + } + o.recorder.Event(node, v1.EventTypeNormal, "UpdateElasticResource", strings.Join(messages, ",")) + return nil } -func (o *NodeResourceManager) GetCanNotBeReclaimedResourceFromTsp(node *v1.Node) map[v1.ResourceName]float64 { - canNotBeReclaimedResource := map[v1.ResourceName]float64{ +// TODO GetOnlineResourceFromTsp should return error when get PredictionResource !!! +func (o *NodeResourceManager) GetOnlineResourceFromTsp(node *v1.Node) map[v1.ResourceName]float64 { + onlineResource := map[v1.ResourceName]float64{ v1.ResourceCPU: 0, v1.ResourceMemory: 0, } @@ -277,18 +280,12 @@ func (o *NodeResourceManager) GetCanNotBeReclaimedResourceFromTsp(node *v1.Node) tsp, err := o.tspLister.TimeSeriesPredictions(known.CraneSystemNamespace).Get(o.tspName) if err != nil { klog.Errorf("Failed to get tsp: %#v", err) - return canNotBeReclaimedResource - } - - tspMatched, err := o.FindTargetNode(tsp, node.Status.Addresses) - if err != nil { - klog.Error(err.Error()) - return canNotBeReclaimedResource + return onlineResource } - if !tspMatched { - klog.Errorf("Found tsp %s, but tsp not matched to node %s", o.tspName, node.Name) - return canNotBeReclaimedResource + if err := o.NodeExisted(tsp, node.Status.Addresses); err != nil { + klog.ErrorS(err, "match tsp and node failed") + return onlineResource } // build node status @@ -308,118 +305,30 @@ func (o *NodeResourceManager) GetCanNotBeReclaimedResourceFromTsp(node *v1.Node) continue } nextUsage = nextUsageFloat - if canNotBeReclaimedResource[resourceName] < nextUsage { - canNotBeReclaimedResource[resourceName] = nextUsage + if onlineResource[resourceName] < nextUsage { + onlineResource[resourceName] = nextUsage } } } } - return canNotBeReclaimedResource -} - -func (o *NodeResourceManager) GetCanNotBeReclaimedResourceFromLocal() map[v1.ResourceName]float64 { - return map[v1.ResourceName]float64{ - v1.ResourceCPU: o.GetCpuCoreCanNotBeReclaimedFromLocal(), - v1.ResourceMemory: o.GetMemCanNotBeReclaimedFromLocal(), - } -} - -func (o *NodeResourceManager) GetMemCanNotBeReclaimedFromLocal() float64 { - var memUsageTotal float64 - memUsage, ok := o.state[string(types.MetricNameMemoryTotalUsage)] - if ok { - memUsageTotal = memUsage[0].Samples[0].Value - klog.V(4).Infof("%s: %f", types.MetricNameMemoryTotalUsage, memUsageTotal) - - } else { - klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameMemoryTotalUsage) - } - - var extResContainerMemUsageTotal float64 = 0 - extResContainerCpuUsageTotalTimeSeries, ok := o.state[string(types.MetricNameExtResContainerMemTotalUsage)] - if ok { - extResContainerMemUsageTotal = extResContainerCpuUsageTotalTimeSeries[0].Samples[0].Value - } else { - klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExtResContainerCpuTotalUsage) - } - - klog.V(6).Infof("nodeMemUsageTotal: %f, extResContainerMemUsageTotal: %f", memUsageTotal, extResContainerMemUsageTotal) - - // 1. Exclusive tethered CPU cannot be reclaimed even if the free part is free, so add the exclusive CPUIdle to the CanNotBeReclaimed CPU - // 2. The CPU used by extRes-container needs to be reclaimed, otherwise it will be double-counted due to the allotted mechanism of k8s, so the extResContainerCpuUsageTotal is subtracted from the CanNotBeReclaimedCpu - nodeMemCannotBeReclaimedSeconds := memUsageTotal - extResContainerMemUsageTotal - - metrics.UpdateNodeMemCannotBeReclaimedSeconds(nodeMemCannotBeReclaimedSeconds) - return nodeMemCannotBeReclaimedSeconds + return onlineResource } -func (o *NodeResourceManager) GetCpuCoreCanNotBeReclaimedFromLocal() float64 { - if o.lastStateTime.Before(time.Now().Add(-20 * time.Second)) { - klog.V(1).Infof("NodeResourceManager local state has expired") - return 0 - } - - nodeCpuUsageTotalTimeSeries, ok := o.state[string(types.MetricNameCpuTotalUsage)] - if !ok { - klog.V(4).Infof("Can't get %s from NodeResourceManager local state, please make sure cpu metrics collector is defined in NodeQOS.", types.MetricNameCpuTotalUsage) - return 0 - } - nodeCpuUsageTotal := nodeCpuUsageTotalTimeSeries[0].Samples[0].Value - - var extResContainerCpuUsageTotal float64 = 0 - extResContainerCpuUsageTotalTimeSeries, ok := o.state[string(types.MetricNameExtResContainerCpuTotalUsage)] - if ok { - extResContainerCpuUsageTotal = extResContainerCpuUsageTotalTimeSeries[0].Samples[0].Value * 1000 - } else { - klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExtResContainerCpuTotalUsage) - } - - var exclusiveCPUIdle float64 = 0 - exclusiveCPUIdleTimeSeries, ok := o.state[string(types.MetricNameExclusiveCPUIdle)] - if ok { - exclusiveCPUIdle = exclusiveCPUIdleTimeSeries[0].Samples[0].Value - } else { - klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExclusiveCPUIdle) - } - - klog.V(6).Infof("nodeCpuUsageTotal: %f, exclusiveCPUIdle: %f, extResContainerCpuUsageTotal: %f", nodeCpuUsageTotal, exclusiveCPUIdle, extResContainerCpuUsageTotal) - - // 1. Exclusive tethered CPU cannot be reclaimed even if the free part is free, so add the exclusive CPUIdle to the CanNotBeReclaimed CPU - // 2. The CPU used by extRes-container needs to be reclaimed, otherwise it will be double-counted due to the allotted mechanism of k8s, so the extResContainerCpuUsageTotal is subtracted from the CanNotBeReclaimedCpu - nodeCpuCannotBeReclaimedSeconds := nodeCpuUsageTotal + exclusiveCPUIdle - extResContainerCpuUsageTotal - metrics.UpdateNodeCpuCannotBeReclaimedSeconds(nodeCpuCannotBeReclaimedSeconds) - return nodeCpuCannotBeReclaimedSeconds +func (o *NodeResourceManager) GetResource() *known.ResourceStatus { + return o.resourceStatus } func getReserveResourcePercentFromNodeAnnotations(annotations map[string]string, resourceName string) (float64, bool) { if annotations == nil { return 0, false } - var reserveResourcePercentStr string - var ok = false - switch resourceName { - case v1.ResourceCPU.String(): - reserveResourcePercentStr, ok = annotations[fmt.Sprintf(NodeReserveResourcePercentageAnnotationPrefix, v1.ResourceCPU.String())] - case v1.ResourceMemory.String(): - reserveResourcePercentStr, ok = annotations[fmt.Sprintf(NodeReserveResourcePercentageAnnotationPrefix, v1.ResourceMemory.String())] - default: - } + reserveResourcePercentStr, ok := annotations[fmt.Sprintf(NodeReserveResourcePercentageAnnotationPrefix, resourceName)] if !ok { return 0, false } - reserveResourcePercent, err := utils.ParsePercentage(reserveResourcePercentStr) if err != nil { return 0, false } - return reserveResourcePercent, ok } - -func generateUpdateEventMessage(resourcesFrom map[string]int64) string { - message := "" - for k, v := range resourcesFrom { - message = message + fmt.Sprintf("Updating elastic resource %s with %d.", k, v) - } - return message -}