803 lines
32 KiB
Diff
803 lines
32 KiB
Diff
From 66130d332c561ab95853e1277f5076f6070c3002 Mon Sep 17 00:00:00 2001
|
|
From: Gleb Aronsky <gleb.aronsky@windriver.com>
|
|
Date: Tue, 25 Jan 2022 13:27:25 -0500
|
|
Subject: [PATCH] kubelet cpumanager introduce concept of isolated CPUs
|
|
|
|
This introduces the concept of "isolated CPUs", which are CPUs that
|
|
have been isolated at the kernel level via the "isolcpus" kernel boot
|
|
parameter.
|
|
|
|
When starting the kubelet process, two separate sets of reserved CPUs
|
|
may be specified. With this change CPUs reserved via
|
|
'--system-reserved=cpu' will be used for infrastructure pods while the
|
|
isolated CPUs should be reserved via '--kube-reserved=cpu' to cause
|
|
kubelet to skip over them for "normal" CPU resource tracking. The
|
|
kubelet code will double-check that the specified isolated CPUs match
|
|
what the kernel exposes in "/sys/devices/system/cpu/isolated".
|
|
|
|
A plugin (outside the scope of this commit) will expose the isolated
|
|
CPUs to kubelet via the device plugin API.
|
|
|
|
If a pod specifies some number of "isolcpus" resources, the device
|
|
manager will allocate them. In this code we check whether such
|
|
resources have been allocated, and if so we set the container cpuset to
|
|
the isolated CPUs. This does mean that it really only makes sense to
|
|
specify "isolcpus" resources for best-effort or burstable pods, not for
|
|
guaranteed ones since that would throw off the accounting code. In
|
|
order to ensure the accounting still works as designed, if "isolcpus"
|
|
are specified for guaranteed pods, the affinity will be set to the
|
|
non-isolated CPUs.
|
|
|
|
This patch was refactored in 1.21.3 due to upstream API change
|
|
node: podresources: make GetDevices() consistent
|
|
(commit ad68f9588c72d6477b5a290c548a9031063ac659).
|
|
|
|
The routine podIsolCPUs() was refactored in 1.21.3 since the API
|
|
p.deviceManager.GetDevices() is returning multiple devices with
|
|
a device per cpu. The resultant cpuset needs to be the aggregate.
|
|
|
|
The routine NewStaticPolicy was refactored in 1.22.5, adding a new argument
|
|
in its signature: cpuPolicyOptions map[string]string. This change is implies
|
|
shifting the new arguments(deviceManager, excludeReserved) with one position
|
|
to the right.
|
|
|
|
Co-authored-by: Jim Gauld <james.gauld@windriver.com>
|
|
Co-authored-by: Chris Friesen <chris.friesen@windriver.com>
|
|
Signed-off-by: Gleb Aronsky <gleb.aronsky@windriver.com>
|
|
Signed-off-by: Kaustubh Dhokte <kaustubh.dhokte@windriver.com>
|
|
---
|
|
pkg/kubelet/cm/container_manager_linux.go | 1 +
|
|
pkg/kubelet/cm/cpumanager/cpu_manager.go | 35 +++-
|
|
pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 14 +-
|
|
pkg/kubelet/cm/cpumanager/policy_static.go | 157 ++++++++++++++--
|
|
.../cm/cpumanager/policy_static_test.go | 176 +++++++++++++++++-
|
|
5 files changed, 355 insertions(+), 28 deletions(-)
|
|
|
|
diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go
|
|
index 0f09f3eb331..770922ca55d 100644
|
|
--- a/pkg/kubelet/cm/container_manager_linux.go
|
|
+++ b/pkg/kubelet/cm/container_manager_linux.go
|
|
@@ -321,6 +321,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
|
cm.GetNodeAllocatableReservation(),
|
|
nodeConfig.KubeletRootDir,
|
|
cm.topologyManager,
|
|
+ cm.deviceManager,
|
|
)
|
|
if err != nil {
|
|
klog.ErrorS(err, "Failed to initialize cpu manager")
|
|
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go
|
|
index 884c7323a79..ea466dbcd37 100644
|
|
--- a/pkg/kubelet/cm/cpumanager/cpu_manager.go
|
|
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go
|
|
@@ -18,7 +18,9 @@ package cpumanager
|
|
|
|
import (
|
|
"fmt"
|
|
+ "io/ioutil"
|
|
"math"
|
|
+ "strings"
|
|
"sync"
|
|
"time"
|
|
|
|
@@ -32,6 +34,7 @@ import (
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
|
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
|
@@ -50,6 +53,25 @@ type policyName string
|
|
// cpuManagerStateFileName is the file name where cpu manager stores its state
|
|
const cpuManagerStateFileName = "cpu_manager_state"
|
|
|
|
+// get the system-level isolated CPUs
|
|
+func getIsolcpus() cpuset.CPUSet {
|
|
+ dat, err := ioutil.ReadFile("/sys/devices/system/cpu/isolated")
|
|
+ if err != nil {
|
|
+ klog.Errorf("[cpumanager] unable to read sysfs isolcpus subdir")
|
|
+ return cpuset.NewCPUSet()
|
|
+ }
|
|
+
|
|
+ // The isolated cpus string ends in a newline
|
|
+ cpustring := strings.TrimSuffix(string(dat), "\n")
|
|
+ cset, err := cpuset.Parse(cpustring)
|
|
+ if err != nil {
|
|
+ klog.Errorf("[cpumanager] unable to parse sysfs isolcpus string to cpuset")
|
|
+ return cpuset.NewCPUSet()
|
|
+ }
|
|
+
|
|
+ return cset
|
|
+}
|
|
+
|
|
// Manager interface provides methods for Kubelet to manage pod cpus.
|
|
type Manager interface {
|
|
// Start is called during Kubelet initialization.
|
|
@@ -153,7 +175,8 @@ func (s *sourcesReadyStub) AddSource(source string) {}
|
|
func (s *sourcesReadyStub) AllReady() bool { return true }
|
|
|
|
// NewManager creates new cpu manager based on provided policy
|
|
-func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
|
|
+func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store, deviceManager devicemanager.Manager) (Manager, error) {
|
|
+
|
|
var topo *topology.CPUTopology
|
|
var policy Policy
|
|
var err error
|
|
@@ -194,7 +217,15 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
|
|
// NOTE: Set excludeReserved unconditionally to exclude reserved CPUs from default cpuset.
|
|
// This variable is primarily to make testing easier.
|
|
excludeReserved := true
|
|
- policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions, excludeReserved)
|
|
+
|
|
+ // isolCPUs is the set of kernel-isolated CPUs. They should be a subset of specificCPUs or
|
|
+ // of the CPUs that NewStaticPolicy() will pick if numReservedCPUs is set. It's only in the
|
|
+ // argument list here for ease of testing, it's really internal to the policy.
|
|
+ isolCPUs := getIsolcpus()
|
|
+ policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, isolCPUs, affinity, cpuPolicyOptions, deviceManager, excludeReserved)
|
|
+ if err != nil {
|
|
+ return nil, fmt.Errorf("new static policy error: %v", err)
|
|
+ }
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new static policy error: %w", err)
|
|
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
|
|
index 2c8349662c4..31e4d0585fb 100644
|
|
--- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
|
|
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
|
|
@@ -37,6 +37,7 @@ import (
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
|
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
|
)
|
|
|
|
@@ -215,6 +216,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
|
|
}
|
|
|
|
func TestCPUManagerAdd(t *testing.T) {
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testExcl := false
|
|
testPolicy, _ := NewStaticPolicy(
|
|
&topology.CPUTopology{
|
|
@@ -230,8 +232,10 @@ func TestCPUManagerAdd(t *testing.T) {
|
|
},
|
|
0,
|
|
cpuset.NewCPUSet(),
|
|
+ cpuset.NewCPUSet(),
|
|
topologymanager.NewFakeManager(),
|
|
nil,
|
|
+ testDM,
|
|
testExcl)
|
|
testCases := []struct {
|
|
description string
|
|
@@ -482,8 +486,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
|
|
}
|
|
|
|
testExcl := false
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
for _, testCase := range testCases {
|
|
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testExcl)
|
|
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
|
|
|
|
mockState := &mockState{
|
|
assignments: testCase.stAssignments,
|
|
@@ -638,7 +643,9 @@ func TestCPUManagerGenerate(t *testing.T) {
|
|
}
|
|
defer os.RemoveAll(sDir)
|
|
|
|
- mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
|
|
+ testDM, err := devicemanager.NewManagerStub()
|
|
+ mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager(), testDM)
|
|
+
|
|
if testCase.expectedError != nil {
|
|
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
|
|
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
|
|
@@ -1232,6 +1239,7 @@ func TestReconcileState(t *testing.T) {
|
|
// the following tests are with --reserved-cpus configured
|
|
func TestCPUManagerAddWithResvList(t *testing.T) {
|
|
testExcl := false
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testPolicy, _ := NewStaticPolicy(
|
|
&topology.CPUTopology{
|
|
NumCPUs: 4,
|
|
@@ -1246,8 +1254,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
|
|
},
|
|
1,
|
|
cpuset.NewCPUSet(0),
|
|
+ cpuset.NewCPUSet(),
|
|
topologymanager.NewFakeManager(),
|
|
nil,
|
|
+ testDM,
|
|
testExcl)
|
|
testCases := []struct {
|
|
description string
|
|
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
|
|
index 216b6ce9bf8..30c0afaca32 100644
|
|
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
|
|
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
|
|
@@ -17,14 +17,21 @@ limitations under the License.
|
|
package cpumanager
|
|
|
|
import (
|
|
+ "context"
|
|
"fmt"
|
|
+ "strconv"
|
|
|
|
+ k8sclient "k8s.io/client-go/kubernetes"
|
|
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
+ restclient "k8s.io/client-go/rest"
|
|
v1 "k8s.io/api/core/v1"
|
|
+ "k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/klog/v2"
|
|
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
|
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
|
|
)
|
|
@@ -39,6 +46,12 @@ const (
|
|
ErrorSMTAlignment = "SMTAlignmentError"
|
|
)
|
|
|
|
+type getPodNamespace func(string) (*v1.Namespace, error)
|
|
+type buildFromConfigFlag func(masterUrl string, kubeconfigPath string) (*restclient.Config, error)
|
|
+
|
|
+var varGetNamespaceObject getPodNamespace
|
|
+var varBuildConfigFromFlags buildFromConfigFlag
|
|
+
|
|
// SMTAlignmentError represents an error due to SMT alignment
|
|
type SMTAlignmentError struct {
|
|
RequestedCPUs int
|
|
@@ -53,11 +66,6 @@ func (e SMTAlignmentError) Type() string {
|
|
return ErrorSMTAlignment
|
|
}
|
|
|
|
-// Define namespaces used by platform infrastructure pods
|
|
-var infraNamespaces = [...]string{
|
|
- "kube-system", "armada", "cert-manager", "platform-deployment-manager", "portieris", "vault", "notification", "flux-helm", "metrics-server", "node-feature-discovery", "intel-power", "power-metrics", "sriov-fec-system",
|
|
-}
|
|
-
|
|
// staticPolicy is a CPU manager policy that does not change CPU
|
|
// assignments for exclusively pinned guaranteed containers after the main
|
|
// container process starts.
|
|
@@ -101,6 +109,10 @@ type staticPolicy struct {
|
|
topology *topology.CPUTopology
|
|
// set of CPUs that is not available for exclusive assignment
|
|
reserved cpuset.CPUSet
|
|
+ // subset of reserved CPUs with isolcpus attribute
|
|
+ isolcpus cpuset.CPUSet
|
|
+ // parent containerManager, used to get device list
|
|
+ deviceManager devicemanager.Manager
|
|
// If true, default CPUSet should exclude reserved CPUs
|
|
excludeReserved bool
|
|
// topology manager reference to get container Topology affinity
|
|
@@ -117,7 +129,8 @@ var _ Policy = &staticPolicy{}
|
|
// NewStaticPolicy returns a CPU manager policy that does not change CPU
|
|
// assignments for exclusively pinned guaranteed containers after the main
|
|
// container process starts.
|
|
-func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, excludeReserved bool) (Policy, error) {
|
|
+func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, isolCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, deviceManager devicemanager.Manager, excludeReserved bool) (Policy, error) {
|
|
+
|
|
opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
|
|
if err != nil {
|
|
return nil, err
|
|
@@ -128,6 +141,8 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
|
|
policy := &staticPolicy{
|
|
topology: topology,
|
|
affinity: affinity,
|
|
+ isolcpus: isolCPUs,
|
|
+ deviceManager: deviceManager,
|
|
excludeReserved: excludeReserved,
|
|
cpusToReuse: make(map[string]cpuset.CPUSet),
|
|
options: opts,
|
|
@@ -154,6 +169,12 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
|
|
klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)
|
|
policy.reserved = reserved
|
|
|
|
+ if !isolCPUs.IsSubsetOf(reserved) {
|
|
+ klog.Errorf("[cpumanager] isolCPUs %v is not a subset of reserved %v", isolCPUs, reserved)
|
|
+ reserved = reserved.Union(isolCPUs)
|
|
+ klog.Warningf("[cpumanager] mismatch isolCPUs %v, force reserved %v", isolCPUs, reserved)
|
|
+ }
|
|
+
|
|
return policy, nil
|
|
}
|
|
|
|
@@ -187,8 +208,9 @@ func (p *staticPolicy) validateState(s state.State) error {
|
|
} else {
|
|
s.SetDefaultCPUSet(allCPUs)
|
|
}
|
|
- klog.Infof("[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, default:%v\n",
|
|
- allCPUs, p.reserved, s.GetDefaultCPUSet())
|
|
+ klog.Infof("[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, isolcpus:%v, default:%v\n",
|
|
+ allCPUs, p.reserved, p.isolcpus, s.GetDefaultCPUSet())
|
|
+
|
|
return nil
|
|
}
|
|
|
|
@@ -269,6 +291,9 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
|
|
}
|
|
|
|
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
|
|
+
|
|
+ varGetNamespaceObject = getPodNamespaceObject
|
|
+ varBuildConfigFromFlags = clientcmd.BuildConfigFromFlags
|
|
// Process infra pods before guaranteed pods
|
|
if isKubeInfra(pod) {
|
|
// Container belongs in reserved pool.
|
|
@@ -278,10 +303,11 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
|
return nil
|
|
}
|
|
|
|
- cpuset := p.reserved
|
|
+ cpuset := p.reserved.Clone().Difference(p.isolcpus)
|
|
if cpuset.IsEmpty() {
|
|
// If this happens then someone messed up.
|
|
- return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reserved)
|
|
+ return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v, isolcpus:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reserved, p.isolcpus)
|
|
+
|
|
}
|
|
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
|
klog.Infof("[cpumanager] static policy: reserved: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset)
|
|
@@ -325,8 +351,34 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
|
}
|
|
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
|
p.updateCPUsToReuse(pod, container, cpuset)
|
|
+ klog.Infof("[cpumanager] guaranteed: AddContainer "+
|
|
+ "(namespace: %s, pod UID: %s, pod: %s, container: %s); numCPUS=%d, cpuset=%v",
|
|
+ pod.Namespace, string(pod.UID), pod.Name, container.Name, numCPUs, cpuset)
|
|
+ return nil
|
|
+ }
|
|
|
|
+ if isolcpus := p.podIsolCPUs(pod, container); isolcpus.Size() > 0 {
|
|
+ // container has requested isolated CPUs
|
|
+ if set, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
|
+ if set.Equals(isolcpus) {
|
|
+ klog.Infof("[cpumanager] isolcpus container already present in state, skipping (namespace: %s, pod UID: %s, pod: %s, container: %s)",
|
|
+ pod.Namespace, string(pod.UID), pod.Name, container.Name)
|
|
+ return nil
|
|
+ } else {
|
|
+ klog.Infof("[cpumanager] isolcpus container state has cpus %v, should be %v (namespace: %s, pod UID: %s, pod: %s, container: %s)",
|
|
+ isolcpus, set, pod.Namespace, string(pod.UID), pod.Name, container.Name)
|
|
+ }
|
|
+ }
|
|
+ // Note that we do not do anything about init containers here.
|
|
+ // It looks like devices are allocated per-pod based on effective requests/limits
|
|
+ // and extra devices from initContainers are not freed up when the regular containers start.
|
|
+ // TODO: confirm this is still true for 1.20
|
|
+ s.SetCPUSet(string(pod.UID), container.Name, isolcpus)
|
|
+ klog.Infof("[cpumanager] isolcpus: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v",
|
|
+ pod.Namespace, string(pod.UID), pod.Name, container.Name, isolcpus)
|
|
+ return nil
|
|
}
|
|
+
|
|
// container belongs in the shared pool (nothing to do; use default cpuset)
|
|
return nil
|
|
}
|
|
@@ -625,12 +677,89 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu
|
|
return hints
|
|
}
|
|
|
|
+func getPodNamespaceObject(podNamespaceName string) (*v1.Namespace, error) {
|
|
+
|
|
+ cfg, err := varBuildConfigFromFlags("", "/etc/kubernetes/kubelet.conf")
|
|
+ if err != nil {
|
|
+ klog.Error("Failed to build client config from /etc/kubernetes/kubelet.conf: ", err.Error())
|
|
+ return nil, err
|
|
+ }
|
|
+
|
|
+ clientset, err := k8sclient.NewForConfig(cfg)
|
|
+ if err != nil {
|
|
+ klog.Error("Failed to get clientset for KUBECONFIG /etc/kubernetes/kubelet.conf: ", err.Error())
|
|
+ return nil, err
|
|
+ }
|
|
+
|
|
+ namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), podNamespaceName, metav1.GetOptions{})
|
|
+ if err != nil {
|
|
+ klog.Error("Error getting namespace object:", err.Error())
|
|
+ return nil, err
|
|
+ }
|
|
+
|
|
+ return namespaceObj, nil
|
|
+
|
|
+}
|
|
+
|
|
// check if a given pod is in a platform infrastructure namespace
|
|
func isKubeInfra(pod *v1.Pod) bool {
|
|
- for _, namespace := range infraNamespaces {
|
|
- if namespace == pod.Namespace {
|
|
- return true
|
|
- }
|
|
+
|
|
+ podName := pod.GetName()
|
|
+ podNamespaceName := pod.GetNamespace()
|
|
+
|
|
+ klog.InfoS("Checking pod ", podName , " for label 'app.starlingx.io/component=platform'.")
|
|
+ podLabels := pod.GetLabels()
|
|
+ val, ok := podLabels["app.starlingx.io/component"]
|
|
+ if (ok && val == "platform") {
|
|
+ klog.InfoS("Pod ", podName, " has 'app.starlingx.io/component=platform' label. Assigning platform CPUs.")
|
|
+ return true
|
|
+ }
|
|
+
|
|
+ klog.InfoS("Pod ", pod.GetName(), " does not have 'app.starlingx.io/component=platform' label. Checking its namespace information...")
|
|
+
|
|
+ namespaceObj, err := varGetNamespaceObject(podNamespaceName)
|
|
+ if err != nil {
|
|
+ return false
|
|
+ }
|
|
+
|
|
+ namespaceLabels := namespaceObj.GetLabels()
|
|
+ val, ok = namespaceLabels["app.starlingx.io/component"]
|
|
+ if ok && val == "platform" {
|
|
+ klog.InfoS("For pod: ", podName, ", its Namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Assigning platform CPUs.")
|
|
+ return true
|
|
}
|
|
+
|
|
+ klog.InfoS("Neither pod ", podName, " nor its namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Not assigning platform CPUs.")
|
|
return false
|
|
+
|
|
+}
|
|
+
|
|
+// get the isolated CPUs (if any) from the devices associated with a specific container
|
|
+func (p *staticPolicy) podIsolCPUs(pod *v1.Pod, container *v1.Container) cpuset.CPUSet {
|
|
+ // NOTE: This is required for TestStaticPolicyAdd() since makePod() does
|
|
+ // not create UID. We also need a way to properly stub devicemanager.
|
|
+ if len(string(pod.UID)) == 0 {
|
|
+ return cpuset.NewCPUSet()
|
|
+ }
|
|
+ resContDevices := p.deviceManager.GetDevices(string(pod.UID), container.Name)
|
|
+ cpuSet := cpuset.NewCPUSet()
|
|
+ for resourceName, resourceDevs := range resContDevices {
|
|
+ // this resource name needs to match the isolcpus device plugin
|
|
+ if resourceName == "windriver.com/isolcpus" {
|
|
+ for devID, _ := range resourceDevs {
|
|
+ cpuStrList := []string{devID}
|
|
+ if len(cpuStrList) > 0 {
|
|
+ // loop over the list of strings, convert each one to int, add to cpuset
|
|
+ for _, cpuStr := range cpuStrList {
|
|
+ cpu, err := strconv.Atoi(cpuStr)
|
|
+ if err != nil {
|
|
+ panic(err)
|
|
+ }
|
|
+ cpuSet = cpuSet.Union(cpuset.NewCPUSet(cpu))
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ return cpuSet
|
|
}
|
|
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
|
|
index 7938f787a57..fa3e99fb2bb 100644
|
|
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
|
|
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
|
|
@@ -17,14 +17,19 @@ limitations under the License.
|
|
package cpumanager
|
|
|
|
import (
|
|
+ "errors"
|
|
"fmt"
|
|
"reflect"
|
|
"testing"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
+ restclient "k8s.io/client-go/rest"
|
|
+
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
|
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
|
|
)
|
|
@@ -65,8 +70,9 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
|
|
}
|
|
|
|
func TestStaticPolicyName(t *testing.T) {
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testExcl := false
|
|
- policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testExcl)
|
|
+ policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
|
|
|
|
policyName := policy.Name()
|
|
if policyName != "static" {
|
|
@@ -76,6 +82,7 @@ func TestStaticPolicyName(t *testing.T) {
|
|
}
|
|
|
|
func TestStaticPolicyStart(t *testing.T) {
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testCases := []staticPolicyTest{
|
|
{
|
|
description: "non-corrupted state",
|
|
@@ -151,7 +158,7 @@ func TestStaticPolicyStart(t *testing.T) {
|
|
}
|
|
for _, testCase := range testCases {
|
|
t.Run(testCase.description, func(t *testing.T) {
|
|
- p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
|
|
+ p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testCase.excludeReserved)
|
|
|
|
policy := p.(*staticPolicy)
|
|
st := &mockState{
|
|
@@ -199,7 +206,7 @@ func TestStaticPolicyAdd(t *testing.T) {
|
|
largeTopoCPUSet := largeTopoBuilder.Result()
|
|
largeTopoSock0CPUSet := largeTopoSock0Builder.Result()
|
|
largeTopoSock1CPUSet := largeTopoSock1Builder.Result()
|
|
-
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
// these are the cases which must behave the same regardless the policy options.
|
|
// So we will permutate the options to ensure this holds true.
|
|
optionsInsensitiveTestCases := []staticPolicyTest{
|
|
@@ -529,8 +536,9 @@ func TestStaticPolicyAdd(t *testing.T) {
|
|
}
|
|
|
|
func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testExcl := false
|
|
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options, testExcl)
|
|
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options, testDM, testExcl)
|
|
|
|
st := &mockState{
|
|
assignments: testCase.stAssignments,
|
|
@@ -596,7 +604,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
|
|
}
|
|
|
|
for _, testCase := range testCases {
|
|
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
|
|
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
|
|
|
|
st := &mockState{
|
|
assignments: testCase.stAssignments,
|
|
@@ -629,6 +637,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
|
|
|
|
func TestStaticPolicyRemove(t *testing.T) {
|
|
excludeReserved := false
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testCases := []staticPolicyTest{
|
|
{
|
|
description: "SingleSocketHT, DeAllocOneContainer",
|
|
@@ -710,6 +719,7 @@ func TestStaticPolicyRemove(t *testing.T) {
|
|
|
|
func TestTopologyAwareAllocateCPUs(t *testing.T) {
|
|
excludeReserved := false
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
testCases := []struct {
|
|
description string
|
|
topo *topology.CPUTopology
|
|
@@ -778,7 +788,8 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
- p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, excludeReserved)
|
|
+ p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
|
|
+
|
|
policy := p.(*staticPolicy)
|
|
st := &mockState{
|
|
assignments: tc.stAssignments,
|
|
@@ -811,6 +822,7 @@ type staticPolicyTestWithResvList struct {
|
|
topo *topology.CPUTopology
|
|
numReservedCPUs int
|
|
reserved cpuset.CPUSet
|
|
+ isolcpus cpuset.CPUSet
|
|
stAssignments state.ContainerCPUAssignments
|
|
stDefaultCPUSet cpuset.CPUSet
|
|
pod *v1.Pod
|
|
@@ -821,6 +833,8 @@ type staticPolicyTestWithResvList struct {
|
|
}
|
|
|
|
func TestStaticPolicyStartWithResvList(t *testing.T) {
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
+ testExcl := false
|
|
testCases := []staticPolicyTestWithResvList{
|
|
{
|
|
description: "empty cpuset",
|
|
@@ -850,11 +864,9 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
|
|
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
|
|
},
|
|
}
|
|
- testExcl := false
|
|
for _, testCase := range testCases {
|
|
t.Run(testCase.description, func(t *testing.T) {
|
|
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
|
|
-
|
|
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
|
|
if !reflect.DeepEqual(err, testCase.expNewErr) {
|
|
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
|
|
testCase.description, testCase.expNewErr, err)
|
|
@@ -894,6 +906,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
|
|
topo: topoSingleSocketHT,
|
|
numReservedCPUs: 1,
|
|
reserved: cpuset.NewCPUSet(0),
|
|
+ isolcpus: cpuset.NewCPUSet(),
|
|
stAssignments: state.ContainerCPUAssignments{},
|
|
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
|
|
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
|
|
@@ -906,6 +919,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
|
|
topo: topoSingleSocketHT,
|
|
numReservedCPUs: 2,
|
|
reserved: cpuset.NewCPUSet(0, 1),
|
|
+ isolcpus: cpuset.NewCPUSet(),
|
|
stAssignments: state.ContainerCPUAssignments{},
|
|
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7),
|
|
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
|
|
@@ -918,6 +932,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
|
|
topo: topoSingleSocketHT,
|
|
numReservedCPUs: 2,
|
|
reserved: cpuset.NewCPUSet(0, 1),
|
|
+ isolcpus: cpuset.NewCPUSet(),
|
|
stAssignments: state.ContainerCPUAssignments{
|
|
"fakePod": map[string]cpuset.CPUSet{
|
|
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
|
|
@@ -934,6 +949,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
|
|
topo: topoSingleSocketHT,
|
|
numReservedCPUs: 2,
|
|
reserved: cpuset.NewCPUSet(0, 1),
|
|
+ isolcpus: cpuset.NewCPUSet(),
|
|
stAssignments: state.ContainerCPUAssignments{
|
|
"fakePod": map[string]cpuset.CPUSet{
|
|
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
|
|
@@ -945,11 +961,29 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
|
|
expCPUAlloc: true,
|
|
expCSet: cpuset.NewCPUSet(0, 1),
|
|
},
|
|
+ {
|
|
+ description: "InfraPod, SingleSocketHT, Isolcpus, ExpectAllocReserved",
|
|
+ topo: topoSingleSocketHT,
|
|
+ numReservedCPUs: 2,
|
|
+ reserved: cpuset.NewCPUSet(0, 1),
|
|
+ isolcpus: cpuset.NewCPUSet(1),
|
|
+ stAssignments: state.ContainerCPUAssignments{
|
|
+ "fakePod": map[string]cpuset.CPUSet{
|
|
+ "fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
|
|
+ },
|
|
+ },
|
|
+ stDefaultCPUSet: cpuset.NewCPUSet(4, 5),
|
|
+ pod: infraPod,
|
|
+ expErr: nil,
|
|
+ expCPUAlloc: true,
|
|
+ expCSet: cpuset.NewCPUSet(0),
|
|
+ },
|
|
}
|
|
|
|
testExcl := true
|
|
+ testDM, _ := devicemanager.NewManagerStub()
|
|
for _, testCase := range testCases {
|
|
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
|
|
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, testCase.isolcpus, topologymanager.NewFakeManager(), nil, testDM, testExcl)
|
|
|
|
st := &mockState{
|
|
assignments: testCase.stAssignments,
|
|
@@ -1075,3 +1109,125 @@ func TestStaticPolicyOptions(t *testing.T) {
|
|
})
|
|
}
|
|
}
|
|
+
|
|
+func makePodWithLabels(podLabels map[string]string) *v1.Pod {
|
|
+ return &v1.Pod{
|
|
+ ObjectMeta: metav1.ObjectMeta{
|
|
+ Name: "test-pod",
|
|
+ Namespace: "test-namespace",
|
|
+ Labels: podLabels,
|
|
+ },
|
|
+ }
|
|
+}
|
|
+
|
|
+func fakeBuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
|
|
+
|
|
+ return &restclient.Config{}, nil
|
|
+}
|
|
+
|
|
+func fakeBuildConfigFromFlagsError(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
|
|
+
|
|
+ errString := fmt.Sprintf("%s file not found", kubeconfigPath)
|
|
+ return nil, errors.New(errString)
|
|
+
|
|
+}
|
|
+
|
|
+func getFakeInfraPodNamespace(_ string) (*v1.Namespace, error) {
|
|
+
|
|
+ return &v1.Namespace{
|
|
+ ObjectMeta: metav1.ObjectMeta{
|
|
+ Name: "test-namespace",
|
|
+ Labels: map[string]string{
|
|
+ "app.starlingx.io/component": "platform",
|
|
+ },
|
|
+ }}, nil
|
|
+}
|
|
+
|
|
+func getFakeNonInfraPodNamespace(_ string) (*v1.Namespace, error) {
|
|
+
|
|
+ return &v1.Namespace{
|
|
+ ObjectMeta: metav1.ObjectMeta{
|
|
+ Name: "test-namespace",
|
|
+ Labels: map[string]string{
|
|
+ "fake": "label",
|
|
+ }}}, nil
|
|
+
|
|
+}
|
|
+
|
|
+type kubeInfraPodTestCase struct {
|
|
+ description string
|
|
+ pod *v1.Pod
|
|
+ namespaceFunc getPodNamespace
|
|
+ expectedValue bool
|
|
+}
|
|
+
|
|
+func TestKubeInfraPod(t *testing.T) {
|
|
+ testCases := []kubeInfraPodTestCase{
|
|
+ {
|
|
+ description: "Pod with platform label and namespace without platform label",
|
|
+ pod: makePodWithLabels(map[string]string{
|
|
+ "app.starlingx.io/component": "platform",
|
|
+ }),
|
|
+ namespaceFunc: getFakeNonInfraPodNamespace,
|
|
+ expectedValue: true,
|
|
+
|
|
+ },
|
|
+ {
|
|
+ description: "Pod without platform label and namespace with platform label",
|
|
+ pod: makePodWithLabels(map[string]string{
|
|
+ "test": "label",
|
|
+ }),
|
|
+ namespaceFunc: getFakeInfraPodNamespace,
|
|
+ expectedValue: true,
|
|
+ },
|
|
+ {
|
|
+ description: "Pod without platform label and namespace without platform label",
|
|
+ pod: makePodWithLabels(map[string]string{
|
|
+ "test": "namespace",
|
|
+ }),
|
|
+ namespaceFunc: getFakeNonInfraPodNamespace,
|
|
+ expectedValue: false,
|
|
+ },
|
|
+
|
|
+ }
|
|
+
|
|
+ for _, testCase := range testCases {
|
|
+ t.Run(testCase.description, func(t *testing.T) {
|
|
+
|
|
+ varGetNamespaceObject = testCase.namespaceFunc
|
|
+ varBuildConfigFromFlags = fakeBuildConfigFromFlags
|
|
+ gotValue := isKubeInfra(testCase.pod)
|
|
+
|
|
+ if gotValue != testCase.expectedValue {
|
|
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
|
|
+ testCase.description, testCase.expectedValue, gotValue)
|
|
+ } else {
|
|
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", testCase.description)
|
|
+ }
|
|
+
|
|
+ })
|
|
+ }
|
|
+
|
|
+ test := kubeInfraPodTestCase{
|
|
+ description: "Failure reading kubeconfig file",
|
|
+ pod: makePodWithLabels(map[string]string{
|
|
+ "test": "namespace",
|
|
+ }),
|
|
+ namespaceFunc: getFakeNonInfraPodNamespace,
|
|
+ expectedValue: false,
|
|
+ }
|
|
+
|
|
+ varGetNamespaceObject = getPodNamespaceObject
|
|
+ varBuildConfigFromFlags = fakeBuildConfigFromFlagsError
|
|
+
|
|
+ gotValue := isKubeInfra(test.pod)
|
|
+
|
|
+ if gotValue != test.expectedValue {
|
|
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
|
|
+ test.description, test.expectedValue, gotValue)
|
|
+ } else {
|
|
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", test.description)
|
|
+ }
|
|
+
|
|
+}
|
|
+
|
|
--
|
|
2.25.1
|
|
|