integ/kubernetes/kubernetes-1.26.1/debian/deb_folder/patches/kubelet-cpumanager-introduc...

987 lines
39 KiB
Diff

From 36170864cc9ebb2183e6301cb745e49238d21397 Mon Sep 17 00:00:00 2001
From: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Date: Mon, 7 Nov 2022 13:33:03 -0500
Subject: [PATCH] kubelet cpumanager introduce concept of isolated CPUs
This introduces the concept of "isolated CPUs", which are CPUs that
have been isolated at the kernel level via the "isolcpus" kernel boot
parameter.
When starting the kubelet process, two separate sets of reserved CPUs
may be specified. With this change CPUs reserved via
'--system-reserved=cpu' will be used for infrastructure pods while the
isolated CPUs should be reserved via '--kube-reserved=cpu' to cause
kubelet to skip over them for "normal" CPU resource tracking. The
kubelet code will double-check that the specified isolated CPUs match
what the kernel exposes in "/sys/devices/system/cpu/isolated".
A plugin (outside the scope of this commit) will expose the isolated
CPUs to kubelet via the device plugin API.
If a pod specifies some number of "isolcpus" resources, the device
manager will allocate them. In this code we check whether such
resources have been allocated, and if so we set the container cpuset to
the isolated CPUs. This does mean that it really only makes sense to
specify "isolcpus" resources for best-effort or burstable pods, not for
guaranteed ones since that would throw off the accounting code. In
order to ensure the accounting still works as designed, if "isolcpus"
are specified for guaranteed pods, the affinity will be set to the
non-isolated CPUs.
This patch was refactored in 1.21.3 due to upstream API change
node: podresources: make GetDevices() consistent
(commit ad68f9588c72d6477b5a290c548a9031063ac659).
The routine podIsolCPUs() was refactored in 1.21.3 since the API
p.deviceManager.GetDevices() is returning multiple devices with
a device per cpu. The resultant cpuset needs to be the aggregate.
The routine NewStaticPolicy was refactored in 1.22.5, adding a new argument
in its signature: cpuPolicyOptions map[string]string. This change is implies
shifting the new arguments(deviceManager, excludeReserved) with one position
to the right.
Co-authored-by: Jim Gauld <james.gauld@windriver.com>
Co-authored-by: Chris Friesen <chris.friesen@windriver.com>
Signed-off-by: Gleb Aronsky <gleb.aronsky@windriver.com>
Signed-off-by: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Signed-off-by: Sachin Gopala Krishna <saching.krishna@windriver.com>
Signed-off-by: Kaustubh Dhokte <kaustubh.dhokte@windriver.com>
---
pkg/kubelet/cm/container_manager_linux.go | 1 +
pkg/kubelet/cm/cpumanager/cpu_manager.go | 35 +++-
pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 23 ++-
pkg/kubelet/cm/cpumanager/policy_static.go | 157 +++++++++++++--
.../cm/cpumanager/policy_static_test.go | 178 +++++++++++++++++-
pkg/kubelet/cm/devicemanager/manager_stub.go | 99 ++++++++++
6 files changed, 463 insertions(+), 30 deletions(-)
create mode 100644 pkg/kubelet/cm/devicemanager/manager_stub.go
diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go
index 332e99e72ee..5ac571d9abd 100644
--- a/pkg/kubelet/cm/container_manager_linux.go
+++ b/pkg/kubelet/cm/container_manager_linux.go
@@ -331,6 +331,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
+ cm.deviceManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go
index e2c89efeb2e..95a4246e840 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go
@@ -19,7 +19,9 @@ package cpumanager
import (
"context"
"fmt"
+ "io/ioutil"
"math"
+ "strings"
"sync"
"time"
@@ -33,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -51,6 +54,25 @@ type policyName string
// cpuManagerStateFileName is the file name where cpu manager stores its state
const cpuManagerStateFileName = "cpu_manager_state"
+// get the system-level isolated CPUs
+func getIsolcpus() cpuset.CPUSet {
+ dat, err := ioutil.ReadFile("/sys/devices/system/cpu/isolated")
+ if err != nil {
+ klog.Errorf("[cpumanager] unable to read sysfs isolcpus subdir")
+ return cpuset.NewCPUSet()
+ }
+
+ // The isolated cpus string ends in a newline
+ cpustring := strings.TrimSuffix(string(dat), "\n")
+ cset, err := cpuset.Parse(cpustring)
+ if err != nil {
+ klog.Errorf("[cpumanager] unable to parse sysfs isolcpus string to cpuset")
+ return cpuset.NewCPUSet()
+ }
+
+ return cset
+}
+
// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
@@ -154,7 +176,8 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
-func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
+func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store, deviceManager devicemanager.Manager) (Manager, error) {
+
var topo *topology.CPUTopology
var policy Policy
var err error
@@ -195,7 +218,15 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
// NOTE: Set excludeReserved unconditionally to exclude reserved CPUs from default cpuset.
// This variable is primarily to make testing easier.
excludeReserved := true
- policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions, excludeReserved)
+
+ // isolCPUs is the set of kernel-isolated CPUs. They should be a subset of specificCPUs or
+ // of the CPUs that NewStaticPolicy() will pick if numReservedCPUs is set. It's only in the
+ // argument list here for ease of testing, it's really internal to the policy.
+ isolCPUs := getIsolcpus()
+ policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, isolCPUs, affinity, cpuPolicyOptions, deviceManager, excludeReserved)
+ if err != nil {
+ return nil, fmt.Errorf("new static policy error: %v", err)
+ }
if err != nil {
return nil, fmt.Errorf("new static policy error: %w", err)
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
index e7c74453472..78b4ada1a73 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
@@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@@ -215,6 +216,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string })
}
func TestCPUManagerAdd(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testExcl := false
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
@@ -230,8 +232,10 @@ func TestCPUManagerAdd(t *testing.T) {
},
0,
cpuset.NewCPUSet(),
+ cpuset.NewCPUSet(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
description string
@@ -482,8 +486,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
}
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testExcl)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
mockState := &mockState{
assignments: testCase.stAssignments,
@@ -638,7 +643,9 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
- mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
+ testDM, err := devicemanager.NewManagerStub()
+ mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.NewCPUSet(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager(), testDM)
+
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
@@ -709,6 +716,7 @@ func TestCPUManagerRemove(t *testing.T) {
func TestReconcileState(t *testing.T) {
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 8,
@@ -727,8 +735,10 @@ func TestReconcileState(t *testing.T) {
},
0,
cpuset.NewCPUSet(),
+ cpuset.NewCPUSet(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
@@ -1234,6 +1244,7 @@ func TestReconcileState(t *testing.T) {
// the following tests are with --reserved-cpus configured
func TestCPUManagerAddWithResvList(t *testing.T) {
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@@ -1248,8 +1259,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
},
1,
cpuset.NewCPUSet(0),
+ cpuset.NewCPUSet(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
description string
@@ -1362,7 +1375,8 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
}
defer os.RemoveAll(sDir)
- _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.NewCPUSet(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
+ testDM, err := devicemanager.NewManagerStub()
+ _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.NewCPUSet(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager(), testDM)
if err == nil {
t.Errorf("Expected error, but NewManager succeeded")
}
@@ -1376,6 +1390,7 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
testExcl := false
+ testDm, _ := devicemanager.NewManagerStub()
nonePolicy, _ := NewNonePolicy(nil)
staticPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
@@ -1391,8 +1406,10 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
},
1,
cpuset.NewCPUSet(0),
+ cpuset.NewCPUSet(),
topologymanager.NewFakeManager(),
nil,
+ testDm,
testExcl)
testCases := []struct {
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
index 26eb400cee6..13bcad5a531 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
@@ -17,15 +17,22 @@ limitations under the License.
package cpumanager
import (
+ "context"
"fmt"
+ "strconv"
+ k8sclient "k8s.io/client-go/kubernetes"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ restclient "k8s.io/client-go/rest"
v1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@@ -41,6 +48,12 @@ const (
ErrorSMTAlignment = "SMTAlignmentError"
)
+type getPodNamespace func(string) (*v1.Namespace, error)
+type buildFromConfigFlag func(masterUrl string, kubeconfigPath string) (*restclient.Config, error)
+
+var varGetNamespaceObject getPodNamespace
+var varBuildConfigFromFlags buildFromConfigFlag
+
// SMTAlignmentError represents an error due to SMT alignment
type SMTAlignmentError struct {
RequestedCPUs int
@@ -56,11 +69,6 @@ func (e SMTAlignmentError) Type() string {
return ErrorSMTAlignment
}
-// Define namespaces used by platform infrastructure pods
-var infraNamespaces = [...]string{
- "kube-system", "armada", "cert-manager", "platform-deployment-manager", "portieris", "vault", "notification", "flux-helm", "metrics-server", "node-feature-discovery", "intel-power", "power-metrics", "sriov-fec-system",
-}
-
// staticPolicy is a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
@@ -104,6 +112,10 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
+ // subset of reserved CPUs with isolcpus attribute
+ isolcpus cpuset.CPUSet
+ // parent containerManager, used to get device list
+ deviceManager devicemanager.Manager
// If true, default CPUSet should exclude reserved CPUs
excludeReserved bool
// topology manager reference to get container Topology affinity
@@ -120,7 +132,8 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
-func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, excludeReserved bool) (Policy, error) {
+func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, isolCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, deviceManager devicemanager.Manager, excludeReserved bool) (Policy, error) {
+
opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
if err != nil {
return nil, err
@@ -135,6 +148,8 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
policy := &staticPolicy{
topology: topology,
affinity: affinity,
+ isolcpus: isolCPUs,
+ deviceManager: deviceManager,
excludeReserved: excludeReserved,
cpusToReuse: make(map[string]cpuset.CPUSet),
options: opts,
@@ -161,6 +176,12 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)
policy.reserved = reserved
+ if !isolCPUs.IsSubsetOf(reserved) {
+ klog.Errorf("[cpumanager] isolCPUs %v is not a subset of reserved %v", isolCPUs, reserved)
+ reserved = reserved.Union(isolCPUs)
+ klog.Warningf("[cpumanager] mismatch isolCPUs %v, force reserved %v", isolCPUs, reserved)
+ }
+
return policy, nil
}
@@ -194,8 +215,9 @@ func (p *staticPolicy) validateState(s state.State) error {
} else {
s.SetDefaultCPUSet(allCPUs)
}
- klog.Infof("[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, default:%v\n",
- allCPUs, p.reserved, s.GetDefaultCPUSet())
+ klog.Infof("[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, isolcpus:%v, default:%v\n",
+ allCPUs, p.reserved, p.isolcpus, s.GetDefaultCPUSet())
+
return nil
}
@@ -281,6 +303,9 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
}
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
+
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = clientcmd.BuildConfigFromFlags
// Process infra pods before guaranteed pods
if isKubeInfra(pod) {
// Container belongs in reserved pool.
@@ -290,16 +315,39 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
return nil
}
- cpuset := p.reserved
+ cpuset := p.reserved.Clone().Difference(p.isolcpus)
if cpuset.IsEmpty() {
// If this happens then someone messed up.
- return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reserved)
+ return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v, isolcpus:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reserved, p.isolcpus)
+
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
klog.Infof("[cpumanager] static policy: reserved: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset)
return nil
}
+ if isolcpus := p.podIsolCPUs(pod, container); isolcpus.Size() > 0 {
+ // container has requested isolated CPUs
+ if set, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
+ if set.Equals(isolcpus) {
+ klog.Infof("[cpumanager] isolcpus container already present in state, skipping (namespace: %s, pod UID: %s, pod: %s, container: %s)",
+ pod.Namespace, string(pod.UID), pod.Name, container.Name)
+ return nil
+ } else {
+ klog.Infof("[cpumanager] isolcpus container state has cpus %v, should be %v (namespace: %s, pod UID: %s, pod: %s, container: %s)",
+ isolcpus, set, pod.Namespace, string(pod.UID), pod.Name, container.Name)
+ }
+ }
+ // Note that we do not do anything about init containers here.
+ // It looks like devices are allocated per-pod based on effective requests/limits
+ // and extra devices from initContainers are not freed up when the regular containers start.
+ // TODO: confirm this is still true for 1.20
+ s.SetCPUSet(string(pod.UID), container.Name, isolcpus)
+ klog.Infof("[cpumanager] isolcpus: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v",
+ pod.Namespace, string(pod.UID), pod.Name, container.Name, isolcpus)
+ return nil
+ }
+
numCPUs := p.guaranteedCPUs(pod, container)
if numCPUs == 0 {
// container belongs in the shared pool (nothing to do; use default cpuset)
@@ -348,7 +396,9 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)
-
+ klog.Infof("[cpumanager] guaranteed: AddContainer "+
+ "(namespace: %s, pod UID: %s, pod: %s, container: %s); numCPUS=%d, cpuset=%v",
+ pod.Namespace, string(pod.UID), pod.Name, container.Name, numCPUs, cpuset)
return nil
}
@@ -647,14 +697,91 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu
return hints
}
+func getPodNamespaceObject(podNamespaceName string) (*v1.Namespace, error) {
+
+ cfg, err := varBuildConfigFromFlags("", "/etc/kubernetes/kubelet.conf")
+ if err != nil {
+ klog.Error("Failed to build client config from /etc/kubernetes/kubelet.conf: ", err.Error())
+ return nil, err
+ }
+
+ clientset, err := k8sclient.NewForConfig(cfg)
+ if err != nil {
+ klog.Error("Failed to get clientset for KUBECONFIG /etc/kubernetes/kubelet.conf: ", err.Error())
+ return nil, err
+ }
+
+ namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), podNamespaceName, metav1.GetOptions{})
+ if err != nil {
+ klog.Error("Error getting namespace object:", err.Error())
+ return nil, err
+ }
+
+ return namespaceObj, nil
+
+}
+
// check if a given pod is in a platform infrastructure namespace
func isKubeInfra(pod *v1.Pod) bool {
- for _, namespace := range infraNamespaces {
- if namespace == pod.Namespace {
- return true
- }
+
+ podName := pod.GetName()
+ podNamespaceName := pod.GetNamespace()
+
+ klog.InfoS("Checking pod ", podName , " for label 'app.starlingx.io/component=platform'.")
+ podLabels := pod.GetLabels()
+ val, ok := podLabels["app.starlingx.io/component"]
+ if (ok && val == "platform") {
+ klog.InfoS("Pod ", podName, " has 'app.starlingx.io/component=platform' label. Assigning platform CPUs.")
+ return true
}
+
+ klog.InfoS("Pod ", pod.GetName(), " does not have 'app.starlingx.io/component=platform' label. Checking its namespace information...")
+
+ namespaceObj, err := varGetNamespaceObject(podNamespaceName)
+ if err != nil {
+ return false
+ }
+
+ namespaceLabels := namespaceObj.GetLabels()
+ val, ok = namespaceLabels["app.starlingx.io/component"]
+ if ok && val == "platform" {
+ klog.InfoS("For pod: ", podName, ", its Namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Assigning platform CPUs.")
+ return true
+ }
+
+ klog.InfoS("Neither pod ", podName, " nor its namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Not assigning platform CPUs.")
return false
+
+}
+
+// get the isolated CPUs (if any) from the devices associated with a specific container
+func (p *staticPolicy) podIsolCPUs(pod *v1.Pod, container *v1.Container) cpuset.CPUSet {
+ // NOTE: This is required for TestStaticPolicyAdd() since makePod() does
+ // not create UID. We also need a way to properly stub devicemanager.
+ if len(string(pod.UID)) == 0 {
+ return cpuset.NewCPUSet()
+ }
+ resContDevices := p.deviceManager.GetDevices(string(pod.UID), container.Name)
+ cpuSet := cpuset.NewCPUSet()
+ for resourceName, resourceDevs := range resContDevices {
+ // this resource name needs to match the isolcpus device plugin
+ if resourceName == "windriver.com/isolcpus" {
+ for devID, _ := range resourceDevs {
+ cpuStrList := []string{devID}
+ if len(cpuStrList) > 0 {
+ // loop over the list of strings, convert each one to int, add to cpuset
+ for _, cpuStr := range cpuStrList {
+ cpu, err := strconv.Atoi(cpuStr)
+ if err != nil {
+ panic(err)
+ }
+ cpuSet = cpuSet.Union(cpuset.NewCPUSet(cpu))
+ }
+ }
+ }
+ }
+ }
+ return cpuSet
}
// isHintSocketAligned function return true if numa nodes in hint are socket aligned.
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
index 414e5ce144c..f79c23accb4 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
@@ -17,10 +17,13 @@ limitations under the License.
package cpumanager
import (
+ "errors"
"fmt"
"reflect"
"testing"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ restclient "k8s.io/client-go/rest"
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -28,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
)
@@ -69,8 +73,9 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
}
func TestStaticPolicyName(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testExcl := false
- policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testExcl)
+ policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
policyName := policy.Name()
if policyName != "static" {
@@ -80,6 +85,7 @@ func TestStaticPolicyName(t *testing.T) {
}
func TestStaticPolicyStart(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []staticPolicyTest{
{
description: "non-corrupted state",
@@ -155,7 +161,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
+ p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testCase.excludeReserved)
policy := p.(*staticPolicy)
st := &mockState{
@@ -203,7 +209,6 @@ func TestStaticPolicyAdd(t *testing.T) {
largeTopoCPUSet := largeTopoBuilder.Result()
largeTopoSock0CPUSet := largeTopoSock0Builder.Result()
largeTopoSock1CPUSet := largeTopoSock1Builder.Result()
-
// these are the cases which must behave the same regardless the policy options.
// So we will permutate the options to ensure this holds true.
@@ -577,8 +582,10 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
if testCase.topologyHint != nil {
tm = topologymanager.NewFakeManagerWithHint(testCase.topologyHint)
}
+ testDM, _ := devicemanager.NewManagerStub()
testExcl := false
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), tm, testCase.options, testExcl)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), tm, testCase.options, testDM, testExcl)
+
st := &mockState{
assignments: testCase.stAssignments,
@@ -625,6 +632,8 @@ func runStaticPolicyTestCaseWithFeatureGate(t *testing.T, testCase staticPolicyT
}
func TestStaticPolicyReuseCPUs(t *testing.T) {
+ excludeReserved := false
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []struct {
staticPolicyTest
expCSetAfterAlloc cpuset.CPUSet
@@ -649,7 +658,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
st := &mockState{
assignments: testCase.stAssignments,
@@ -682,6 +691,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
func TestStaticPolicyRemove(t *testing.T) {
excludeReserved := false
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []staticPolicyTest{
{
description: "SingleSocketHT, DeAllocOneContainer",
@@ -740,7 +750,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, excludeReserved)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
st := &mockState{
assignments: testCase.stAssignments,
@@ -763,6 +773,7 @@ func TestStaticPolicyRemove(t *testing.T) {
func TestTopologyAwareAllocateCPUs(t *testing.T) {
excludeReserved := false
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []struct {
description string
topo *topology.CPUTopology
@@ -831,7 +842,8 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
- p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, excludeReserved)
+ p, _ := NewStaticPolicy(tc.topo, 0, cpuset.NewCPUSet(), cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
+
policy := p.(*staticPolicy)
st := &mockState{
assignments: tc.stAssignments,
@@ -864,6 +876,7 @@ type staticPolicyTestWithResvList struct {
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
+ isolcpus cpuset.CPUSet
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
@@ -874,6 +887,8 @@ type staticPolicyTestWithResvList struct {
}
func TestStaticPolicyStartWithResvList(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
+ testExcl := false
testCases := []staticPolicyTestWithResvList{
{
description: "empty cpuset",
@@ -903,11 +918,10 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
},
}
- testExcl := false
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
if !reflect.DeepEqual(err, testCase.expNewErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expNewErr, err)
@@ -947,6 +961,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
reserved: cpuset.NewCPUSet(0),
+ isolcpus: cpuset.NewCPUSet(),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
@@ -959,6 +974,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
+ isolcpus: cpuset.NewCPUSet(),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
@@ -971,6 +987,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
+ isolcpus: cpuset.NewCPUSet(),
stAssignments: state.ContainerCPUAssignments{
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
@@ -987,6 +1004,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.NewCPUSet(0, 1),
+ isolcpus: cpuset.NewCPUSet(),
stAssignments: state.ContainerCPUAssignments{
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
@@ -998,11 +1016,29 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(0, 1),
},
+ {
+ description: "InfraPod, SingleSocketHT, Isolcpus, ExpectAllocReserved",
+ topo: topoSingleSocketHT,
+ numReservedCPUs: 2,
+ reserved: cpuset.NewCPUSet(0, 1),
+ isolcpus: cpuset.NewCPUSet(1),
+ stAssignments: state.ContainerCPUAssignments{
+ "fakePod": map[string]cpuset.CPUSet{
+ "fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7),
+ },
+ },
+ stDefaultCPUSet: cpuset.NewCPUSet(4, 5),
+ pod: infraPod,
+ expErr: nil,
+ expCPUAlloc: true,
+ expCSet: cpuset.NewCPUSet(0),
+ },
}
testExcl := true
+ testDM, _ := devicemanager.NewManagerStub()
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, testCase.isolcpus, topologymanager.NewFakeManager(), nil, testDM, testExcl)
st := &mockState{
assignments: testCase.stAssignments,
@@ -1128,3 +1164,125 @@ func TestStaticPolicyOptions(t *testing.T) {
})
}
}
+
+func makePodWithLabels(podLabels map[string]string) *v1.Pod {
+ return &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pod",
+ Namespace: "test-namespace",
+ Labels: podLabels,
+ },
+ }
+}
+
+func fakeBuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ return &restclient.Config{}, nil
+}
+
+func fakeBuildConfigFromFlagsError(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ errString := fmt.Sprintf("%s file not found", kubeconfigPath)
+ return nil, errors.New(errString)
+
+}
+
+func getFakeInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "app.starlingx.io/component": "platform",
+ },
+ }}, nil
+}
+
+func getFakeNonInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "fake": "label",
+ }}}, nil
+
+}
+
+type kubeInfraPodTestCase struct {
+ description string
+ pod *v1.Pod
+ namespaceFunc getPodNamespace
+ expectedValue bool
+}
+
+func TestKubeInfraPod(t *testing.T) {
+ testCases := []kubeInfraPodTestCase{
+ {
+ description: "Pod with platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "app.starlingx.io/component": "platform",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: true,
+
+ },
+ {
+ description: "Pod without platform label and namespace with platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "label",
+ }),
+ namespaceFunc: getFakeInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod without platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ },
+
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.description, func(t *testing.T) {
+
+ varGetNamespaceObject = testCase.namespaceFunc
+ varBuildConfigFromFlags = fakeBuildConfigFromFlags
+ gotValue := isKubeInfra(testCase.pod)
+
+ if gotValue != testCase.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ testCase.description, testCase.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", testCase.description)
+ }
+
+ })
+ }
+
+ test := kubeInfraPodTestCase{
+ description: "Failure reading kubeconfig file",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ }
+
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = fakeBuildConfigFromFlagsError
+
+ gotValue := isKubeInfra(test.pod)
+
+ if gotValue != test.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ test.description, test.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", test.description)
+ }
+
+}
+
diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go
new file mode 100644
index 00000000000..e6874f88d8a
--- /dev/null
+++ b/pkg/kubelet/cm/devicemanager/manager_stub.go
@@ -0,0 +1,99 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package devicemanager
+
+import (
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
+ "k8s.io/kubernetes/pkg/kubelet/config"
+ "k8s.io/kubernetes/pkg/kubelet/lifecycle"
+ "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
+ schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
+)
+
+// ManagerStub provides a simple stub implementation for the Device Manager.
+type ManagerStub struct{}
+
+// NewManagerStub creates a ManagerStub.
+func NewManagerStub() (*ManagerStub, error) {
+ return &ManagerStub{}, nil
+}
+
+// Start simply returns nil.
+func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
+ return nil
+}
+
+// Stop simply returns nil.
+func (h *ManagerStub) Stop() error {
+ return nil
+}
+
+// Allocate simply returns nil.
+func (h *ManagerStub) Allocate(pod *v1.Pod, container *v1.Container) error {
+ return nil
+}
+
+// UpdatePluginResources simply returns nil.
+func (h *ManagerStub) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
+ return nil
+}
+
+// GetDeviceRunContainerOptions simply returns nil.
+func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
+ return nil, nil
+}
+
+// GetCapacity simply returns nil capacity and empty removed resource list.
+func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
+ return nil, nil, []string{}
+}
+
+// GetWatcherHandler returns plugin watcher interface
+func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
+ return nil
+}
+
+// GetTopologyHints returns an empty TopologyHint map
+func (h *ManagerStub) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
+ return map[string][]topologymanager.TopologyHint{}
+}
+
+// GetPodTopologyHints returns an empty TopologyHint map
+func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ return map[string][]topologymanager.TopologyHint{}
+}
+
+// GetDevices returns nil
+func (h *ManagerStub) GetDevices(_, _ string) ResourceDeviceInstances {
+ return nil
+}
+
+// GetAllocatableDevices returns nothing
+func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances {
+ return nil
+}
+
+// ShouldResetExtendedResourceCapacity returns false
+func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
+ return false
+}
+
+// UpdateAllocatedDevices returns nothing
+func (h *ManagerStub) UpdateAllocatedDevices() {
+ return
+}
--
2.25.1