From 0c439328d27c2223f01da6f3d3b7e0697b517437 Mon Sep 17 00:00:00 2001 From: Boovan Rajendran Date: Wed, 6 Mar 2024 03:50:23 -0500 Subject: [PATCH] kubelet cpumanager keep normal containers off reserved CPUs When starting the kubelet process, two separate sets of reserved CPUs may be specified. With this change CPUs reserved via '--system-reserved=cpu' or '--kube-reserved=cpu' will be ignored by kubernetes itself. A small tweak to the default CPU affinity ensures that "normal" Kubernetes pods won't run on the reserved CPUs. Co-authored-by: Jim Gauld Signed-off-by: Sachin Gopala Krishna Signed-off-by: Ramesh Kumar Sivanandam Signed-off-by: Boovan Rajendran --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 6 ++- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 19 ++++++--- pkg/kubelet/cm/cpumanager/policy_static.go | 30 +++++++++++--- .../cm/cpumanager/policy_static_test.go | 40 ++++++++++++++----- 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 2d1b6eefebe..e0a359932b7 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -192,7 +192,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions) + // NOTE: Set excludeReserved unconditionally to exclude reserved CPUs from default cpuset. + // This variable is primarily to make testing easier. + excludeReserved := true + policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions, excludeReserved) + if err != nil { return nil, fmt.Errorf("new static policy error: %w", err) } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index f6563bbca23..e5f5d07a2ad 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -263,6 +263,7 @@ func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1 } func TestCPUManagerAdd(t *testing.T) { + testExcl := false testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ NumCPUs: 4, @@ -278,7 +279,8 @@ func TestCPUManagerAdd(t *testing.T) { 0, cpuset.New(), topologymanager.NewFakeManager(), - nil) + nil, + testExcl) testCases := []struct { description string updateErr error @@ -527,8 +529,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { }, } + testExcl := false for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl) mockState := &mockState{ assignments: testCase.stAssignments, @@ -753,6 +756,7 @@ func TestCPUManagerRemove(t *testing.T) { } func TestReconcileState(t *testing.T) { + testExcl := false testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ NumCPUs: 8, @@ -772,7 +776,8 @@ func TestReconcileState(t *testing.T) { 0, cpuset.New(), topologymanager.NewFakeManager(), - nil) + nil, + testExcl) testCases := []struct { description string @@ -1276,6 +1281,7 @@ func TestReconcileState(t *testing.T) { // above test cases are without kubelet --reserved-cpus cmd option // the following tests are with --reserved-cpus configured func TestCPUManagerAddWithResvList(t *testing.T) { + testExcl := false testPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ NumCPUs: 4, @@ -1291,7 +1297,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) { 1, cpuset.New(0), topologymanager.NewFakeManager(), - nil) + nil, + testExcl) testCases := []struct { description string updateErr error @@ -1416,6 +1423,7 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) { } func TestCPUManagerGetAllocatableCPUs(t *testing.T) { + testExcl := false nonePolicy, _ := NewNonePolicy(nil) staticPolicy, _ := NewStaticPolicy( &topology.CPUTopology{ @@ -1432,7 +1440,8 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) { 1, cpuset.New(0), topologymanager.NewFakeManager(), - nil) + nil, + testExcl) testCases := []struct { description string diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index d22a6a64d5e..9690eedab58 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -106,6 +106,8 @@ type staticPolicy struct { topology *topology.CPUTopology // set of CPUs that is not available for exclusive assignment reservedCPUs cpuset.CPUSet + // If true, default CPUSet should exclude reserved CPUs + excludeReserved bool // Superset of reservedCPUs. It includes not just the reservedCPUs themselves, // but also any siblings of those reservedCPUs on the same physical die. // NOTE: If the reserved set includes full physical CPUs from the beginning @@ -126,7 +128,7 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, excludeReserved bool) (Policy, error) { opts, err := NewStaticPolicyOptions(cpuPolicyOptions) if err != nil { return nil, err @@ -141,6 +143,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv policy := &staticPolicy{ topology: topology, affinity: affinity, + excludeReserved: excludeReserved, cpusToReuse: make(map[string]cpuset.CPUSet), options: opts, } @@ -202,7 +205,15 @@ func (p *staticPolicy) validateState(s state.State) error { } // state is empty initialize allCPUs := p.topology.CPUDetails.CPUs() - s.SetDefaultCPUSet(allCPUs) + if p.excludeReserved { + // Exclude reserved CPUs from the default CPUSet to keep containers off them + // unless explicitly affined. + s.SetDefaultCPUSet(allCPUs.Difference(p.reservedCPUs)) + } else { + s.SetDefaultCPUSet(allCPUs) + } + klog.Infof("[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, default:%v\n", + allCPUs, p.reservedCPUs, s.GetDefaultCPUSet()) return nil } @@ -210,11 +221,12 @@ func (p *staticPolicy) validateState(s state.State) error { // 1. Check if the reserved cpuset is not part of default cpuset because: // - kube/system reserved have changed (increased) - may lead to some containers not being able to start // - user tampered with file - if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { - return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", - p.reservedCPUs.String(), tmpDefaultCPUset.String()) + if !p.excludeReserved { + if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { + return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", + p.reservedCPUs.String(), tmpDefaultCPUset.String()) + } } - // 2. Check if state for static policy is consistent for pod := range tmpAssignments { for container, cset := range tmpAssignments[pod] { @@ -241,6 +253,9 @@ func (p *staticPolicy) validateState(s state.State) error { } } totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...) + if p.excludeReserved { + totalKnownCPUs = totalKnownCPUs.Union(p.reservedCPUs) + } if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) { return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"", p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String()) @@ -381,6 +396,9 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa cpusInUse := getAssignedCPUsOfSiblings(s, podUID, containerName) if toRelease, ok := s.GetCPUSet(podUID, containerName); ok { s.Delete(podUID, containerName) + if p.excludeReserved { + toRelease = toRelease.Difference(p.reservedCPUs) + } // Mutate the shared pool, adding released cpus. toRelease = toRelease.Difference(cpusInUse) s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 8cc3fc2be74..b060201e156 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -36,6 +36,7 @@ type staticPolicyTest struct { description string topo *topology.CPUTopology numReservedCPUs int + excludeReserved bool reservedCPUs *cpuset.CPUSet podUID string options map[string]string @@ -69,7 +70,8 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest { } func TestStaticPolicyName(t *testing.T) { - policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil) + testExcl := false + policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl) policyName := policy.Name() if policyName != "static" { @@ -99,6 +101,15 @@ func TestStaticPolicyStart(t *testing.T) { stDefaultCPUSet: cpuset.New(), expCSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), }, + { + description: "empty cpuset exclude reserved", + topo: topoDualSocketHT, + numReservedCPUs: 2, + excludeReserved: true, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.New(), + expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + }, { description: "reserved cores 0 & 6 are not present in available cpuset", topo: topoDualSocketHT, @@ -145,7 +156,8 @@ func TestStaticPolicyStart(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved) + policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, @@ -614,7 +626,8 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { if testCase.reservedCPUs != nil { cpus = testCase.reservedCPUs.Clone() } - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options) + testExcl := false + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options, testExcl) st := &mockState{ assignments: testCase.stAssignments, @@ -685,7 +698,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved) st := &mockState{ assignments: testCase.stAssignments, @@ -738,7 +751,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved) st := &mockState{ assignments: testCase.stAssignments, @@ -762,6 +775,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { } func TestStaticPolicyRemove(t *testing.T) { + excludeReserved := false testCases := []staticPolicyTest{ { description: "SingleSocketHT, DeAllocOneContainer", @@ -820,7 +834,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved) st := &mockState{ assignments: testCase.stAssignments, @@ -842,6 +856,7 @@ func TestStaticPolicyRemove(t *testing.T) { } func TestTopologyAwareAllocateCPUs(t *testing.T) { + excludeReserved := false testCases := []struct { description string topo *topology.CPUTopology @@ -910,7 +925,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil) + p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved) policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, @@ -982,9 +997,11 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"), }, } + testExcl := false for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl) + if !reflect.DeepEqual(err, testCase.expNewErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", testCase.description, testCase.expNewErr, err) @@ -1024,7 +1041,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { numReservedCPUs: 1, reserved: cpuset.New(0), stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), + stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 6, 7), pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request: requested=8, available=7"), expCPUAlloc: false, @@ -1036,7 +1053,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { numReservedCPUs: 2, reserved: cpuset.New(0, 1), stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), + stDefaultCPUSet: cpuset.New(2, 3, 4, 5, 6, 7), pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), expErr: nil, expCPUAlloc: true, @@ -1060,8 +1077,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { }, } + testExcl := true for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl) st := &mockState{ assignments: testCase.stAssignments, -- 2.25.1