From 5b6706de36860aa1d1165a64e0836a81a5936f8a Mon Sep 17 00:00:00 2001 From: Boovan Rajendran Date: Mon, 1 Apr 2024 03:28:34 -0400 Subject: [PATCH] Identify platform pods based on pod or namespace labels Pods with namespace 'kube-system', or labeled with 'app.starlingx.io/component=platform' are identified as 'platform'. These have isolated cpu affinity cpuset when cpu-manager 'static' policy is configured. Signed-off-by: Boovan Rajendran --- pkg/kubelet/cm/cpumanager/policy_static.go | 96 ++++++++++-- .../cm/cpumanager/policy_static_test.go | 148 +++++++++++++++++- .../cm/cpumanager/topology_hints_test.go | 4 + 3 files changed, 235 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 9d67f4bb68a..c6fb56b5544 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -17,14 +17,20 @@ limitations under the License. package cpumanager import ( + "context" "fmt" "strconv" + k8sclient "k8s.io/client-go/kubernetes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/clientcmd" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" @@ -45,6 +51,23 @@ const ( ErrorSMTAlignment = "SMTAlignmentError" ) +// Declared as variables so that they can easily more +// overridden during testing +type getPodNamespace func(string) (*v1.Namespace, error) +type buildFromConfigFlag func(masterUrl string, kubeconfigPath string) (*restclient.Config, error) +type isKubeInfraFunc func(pod *v1.Pod) bool + +var varGetNamespaceObject getPodNamespace +var varBuildConfigFromFlags buildFromConfigFlag +var varIsKubeInfra isKubeInfraFunc + +func init() { + varIsKubeInfra = isKubeInfra + varGetNamespaceObject = getPodNamespaceObject + varBuildConfigFromFlags = clientcmd.BuildConfigFromFlags +} + + // SMTAlignmentError represents an error due to SMT alignment type SMTAlignmentError struct { RequestedCPUs int @@ -64,11 +87,6 @@ func (e SMTAlignmentError) Type() string { return ErrorSMTAlignment } -// Define namespaces used by platform infrastructure pods -var infraNamespaces = [...]string{ - "kube-system", "armada", "cert-manager", "platform-deployment-manager", "portieris", "vault", "notification", "flux-helm", "metrics-server", -} - // staticPolicy is a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. @@ -324,7 +342,7 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { // Process infra pods before guaranteed pods - if isKubeInfra(pod) { + if varIsKubeInfra(pod) { // Container belongs in reserved pool. // We don't want to fall through to the p.guaranteedCPUs() clause below so return either nil or error. if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { @@ -522,7 +540,7 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int return 0 } // Infrastructure pods use reserved CPUs even if they're in the Guaranteed QoS class - if isKubeInfra(pod) { + if varIsKubeInfra(pod) { return 0 } // Safe downcast to do for all systems with < 2.1 billion CPUs. @@ -743,14 +761,68 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu return hints } -// check if a given pod is in a platform infrastructure namespace +func getPodNamespaceObject(podNamespaceName string) (*v1.Namespace, error) { + + kubeConfigPath := constants.GetKubeletKubeConfigPath() + cfg, err := varBuildConfigFromFlags("", kubeConfigPath) + if err != nil { + klog.Error("Failed to build client config from ", kubeConfigPath, err.Error()) + return nil, err + } + + clientset, err := k8sclient.NewForConfig(cfg) + if err != nil { + klog.Error("Failed to get clientset for KUBECONFIG ", kubeConfigPath, err.Error()) + return nil, err + } + + namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), podNamespaceName, metav1.GetOptions{}) + if err != nil { + klog.Error("Error getting namespace object:", err.Error()) + return nil, err + } + + return namespaceObj, nil + +} + +// check if a given pod is labelled as platform pod or +// is in a namespace labelled as a platform namespace func isKubeInfra(pod *v1.Pod) bool { - for _, namespace := range infraNamespaces { - if namespace == pod.Namespace { - return true - } + + podName := pod.GetName() + podNamespaceName := pod.GetNamespace() + + if podNamespaceName == "kube-system" { + klog.Infof("Pod %s has %s namespace. Treating as platform pod.", podName , podNamespaceName) + return true + } + + klog.InfoS("Checking pod ", podName , " for label 'app.starlingx.io/component=platform'.") + podLabels := pod.GetLabels() + val, ok := podLabels["app.starlingx.io/component"] + if (ok && val == "platform") { + klog.InfoS("Pod ", podName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.") + return true } + + klog.V(4).InfoS("Pod ", pod.GetName(), " does not have 'app.starlingx.io/component=platform' label. Checking its namespace information...") + + namespaceObj, err := varGetNamespaceObject(podNamespaceName) + if err != nil { + return false + } + + namespaceLabels := namespaceObj.GetLabels() + val, ok = namespaceLabels["app.starlingx.io/component"] + if ok && val == "platform" { + klog.InfoS("For pod: ", podName, ", its Namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.") + return true + } + + klog.InfoS("Neither pod ", podName, " nor its namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Not treating as platform pod.") return false + } // get the isolated CPUs (if any) from the devices associated with a specific container diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index c25ee484a94..0f9b428bc18 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -17,10 +17,13 @@ limitations under the License. package cpumanager import ( + "errors" "fmt" "reflect" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -926,6 +929,7 @@ type staticPolicyTestWithResvList struct { stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod + isKubeInfraPodfunc isKubeInfraFunc expErr error expNewErr error expCPUAlloc bool @@ -998,6 +1002,14 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { } } +func fakeIsKubeInfraTrue(pod *v1.Pod) bool { + return true +} + +func fakeIsKubeInfraFalse(pod *v1.Pod) bool { + return false +} + func TestStaticPolicyAddWithResvList(t *testing.T) { infraPod := makePod("fakePod", "fakeContainer2", "200m", "200m") infraPod.Namespace = "kube-system" @@ -1011,6 +1023,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 6, 7), pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"), + isKubeInfraPodfunc: fakeIsKubeInfraFalse, expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.New(), @@ -1024,6 +1037,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.New(2, 3, 4, 5, 6, 7), pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), + isKubeInfraPodfunc: fakeIsKubeInfraFalse, expErr: nil, expCPUAlloc: true, expCSet: cpuset.New(4), // expect sibling of partial core @@ -1041,6 +1055,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { }, stDefaultCPUSet: cpuset.New(0, 1, 4, 5), pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"), + isKubeInfraPodfunc: fakeIsKubeInfraFalse, expErr: nil, expCPUAlloc: true, expCSet: cpuset.New(4, 5), @@ -1058,6 +1073,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { }, stDefaultCPUSet: cpuset.New(4, 5), pod: infraPod, + isKubeInfraPodfunc: fakeIsKubeInfraTrue, expErr: nil, expCPUAlloc: true, expCSet: cpuset.New(0, 1), @@ -1075,6 +1091,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { }, stDefaultCPUSet: cpuset.New(4, 5), pod: infraPod, + isKubeInfraPodfunc: fakeIsKubeInfraTrue, expErr: nil, expCPUAlloc: true, expCSet: cpuset.New(0), @@ -1090,7 +1107,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, } - + varIsKubeInfra = testCase.isKubeInfraPodfunc container := &testCase.pod.Spec.Containers[0] err := policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { @@ -1215,3 +1232,132 @@ func newCPUSetPtr(cpus ...int) *cpuset.CPUSet { ret := cpuset.New(cpus...) return &ret } + +func makePodWithLabels(podLabels map[string]string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + Labels: podLabels, + }, + } +} + +func fakeBuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*restclient.Config, error) { + + return &restclient.Config{}, nil +} + +func fakeBuildConfigFromFlagsError(masterUrl string, kubeconfigPath string) (*restclient.Config, error) { + + errString := fmt.Sprintf("%s file not found", kubeconfigPath) + return nil, errors.New(errString) + +} + +func getFakeInfraPodNamespace(_ string) (*v1.Namespace, error) { + + return &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-namespace", + Labels: map[string]string{ + "app.starlingx.io/component": "platform", + }, + }}, nil +} + +func getFakeNonInfraPodNamespace(_ string) (*v1.Namespace, error) { + + return &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-namespace", + Labels: map[string]string{ + "fake": "label", + }}}, nil + +} + +type kubeInfraPodTestCase struct { + description string + pod *v1.Pod + namespaceFunc getPodNamespace + expectedValue bool +} + +func TestKubeInfraPod(t *testing.T) { + testCases := []kubeInfraPodTestCase{ + { + description: "Pod with platform label and namespace with platform label", + pod: makePodWithLabels(map[string]string{ + "app.starlingx.io/component": "platform", + }), + namespaceFunc: getFakeInfraPodNamespace, + expectedValue: true, + }, + { + description: "Pod with platform label and namespace without platform label", + pod: makePodWithLabels(map[string]string{ + "app.starlingx.io/component": "platform", + }), + namespaceFunc: getFakeNonInfraPodNamespace, + expectedValue: true, + + }, + { + description: "Pod without platform label and namespace with platform label", + pod: makePodWithLabels(map[string]string{ + "test": "label", + }), + namespaceFunc: getFakeInfraPodNamespace, + expectedValue: true, + }, + { + description: "Pod without platform label and namespace without platform label", + pod: makePodWithLabels(map[string]string{ + "test": "namespace", + }), + namespaceFunc: getFakeNonInfraPodNamespace, + expectedValue: false, + }, + + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + + varGetNamespaceObject = testCase.namespaceFunc + varBuildConfigFromFlags = fakeBuildConfigFromFlags + gotValue := isKubeInfra(testCase.pod) + + if gotValue != testCase.expectedValue { + t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v", + testCase.description, testCase.expectedValue, gotValue) + } else { + fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", testCase.description) + } + + }) + } + + test := kubeInfraPodTestCase{ + description: "Failure reading kubeconfig file", + pod: makePodWithLabels(map[string]string{ + "test": "namespace", + }), + namespaceFunc: getFakeNonInfraPodNamespace, + expectedValue: false, + } + + varGetNamespaceObject = getPodNamespaceObject + varBuildConfigFromFlags = fakeBuildConfigFromFlagsError + + gotValue := isKubeInfra(test.pod) + + if gotValue != test.expectedValue { + t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v", + test.description, test.expectedValue, gotValue) + } else { + fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", test.description) + } + +} diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index 3cd5c85740b..e1303c90418 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -145,6 +145,7 @@ func TestPodGuaranteedCPUs(t *testing.T) { expectedCPU: 6, }, } + varIsKubeInfra = fakeIsKubeInfraFalse for _, tc := range tcases { requestedCPU := p.podGuaranteedCPUs(tc.pod) @@ -187,6 +188,7 @@ func TestGetTopologyHints(t *testing.T) { sourcesReady: &sourcesReadyStub{}, } + varIsKubeInfra = fakeIsKubeInfraFalse hints := m.GetTopologyHints(&tc.pod, &tc.container)[string(v1.ResourceCPU)] if len(tc.expectedHints) == 0 && len(hints) == 0 { continue @@ -240,6 +242,7 @@ func TestGetPodTopologyHints(t *testing.T) { sourcesReady: &sourcesReadyStub{}, } + varIsKubeInfra = fakeIsKubeInfraFalse podHints := m.GetPodTopologyHints(&tc.pod)[string(v1.ResourceCPU)] if len(tc.expectedHints) == 0 && len(podHints) == 0 { continue @@ -423,6 +426,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) { sourcesReady: &sourcesReadyStub{}, } + varIsKubeInfra = fakeIsKubeInfraFalse podHints := m.GetPodTopologyHints(&testCase.pod)[string(v1.ResourceCPU)] sort.SliceStable(podHints, func(i, j int) bool { return podHints[i].LessThan(podHints[j]) -- 2.25.1