Wait (with a timeout):执行wait操作后,一个Pod保持在Permit阶段知道一个Plugin对它approve;如果timeout了,wait将会变为den y,之后Pod被返回调度队列,触发Un-reserve插件
WaitPod
在framework interface中定义了Permit阶段中处于等待的WaitingPod。
1
2
3
4
5
6
7
8
9
10
11
12
13
// WaitingPod represents a pod currently waiting in the permit phase.
typeWaitingPodinterface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod// GetPendingPlugins returns a list of pending permit plugin's name.
GetPendingPlugins() []string// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
// If this is the last remaining plugin to allow, then a success signal is delivered
// to unblock the pod.
Allow(pluginNamestring)
// Reject declares the waiting pod unschedulable.
Reject(msgstring)
}
funcnewWaitingPod(pod*v1.Pod, pluginsMaxWaitTimemap[string]time.Duration) *waitingPod {
wp:=&waitingPod{
pod: pod,
s: make(chan*Status, 1),
}
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
wp.mu.Lock()
deferwp.mu.Unlock()
// 根据插件的等待时间来构建timer,如果有任一timer到期,还未曾有任何plugin Allow则会进行Reject
fork, v:=rangepluginsMaxWaitTime {
plugin, waitTime:=k, vwp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg:=fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(msg)
})
}
returnwp}
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f*framework) WaitOnPermit(ctxcontext.Context, pod*v1.Pod) (status*Status) {
waitingPod:=f.waitingPods.get(pod.UID)
ifwaitingPod==nil {
returnnil }
deferf.waitingPods.remove(pod.UID)
klog.V(4).Infof("pod %q waiting on permit", pod.Name)
startTime:=time.Now()
s:=<-waitingPod.smetrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
ifs.IsUnschedulable() {
msg:=fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
returnNewStatus(s.Code(), msg)
}
msg:=fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
returnNewStatus(Error, msg)
}
returnnil}
The CycleState also provides an API similar to context.WithValue that can be used to pass data between plugins at different extension points. Multiple plugins can share the state or communicate via this mechanism. The state is preserved only during a single scheduling context. It is worth noting that plugins are assumed to be trusted. The scheduler does not prevent one plugin from accessing or modifying another plugin’s state.
WARNING: The data available through a CycleState is not valid after a scheduling context ends, and plugins should not hold references to that data longer than necessary.
typeCycleStatestruct {
mxsync.RWMutexstoragemap[StateKey]StateData// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetricsbool}
// StateData is a generic type for arbitrary data stored in CycleState.
typeStateDatainterface {
// Clone is an interface to make a copy of StateData. For performance reasons,
// clone should make shallow copies for members (e.g., slices or maps) that are not
// impacted by PreFilter's optional AddPod/RemovePod methods.
Clone() StateData}
While the CycleState provides APIs relevant to a single scheduling context, the FrameworkHandle provides APIs relevant to the lifetime of a plugin. This is how plugins can get a client (kubernetes.Interface) and SharedInformerFactory, or read data from the scheduler’s cache of cluster state. The handle will also provide APIs to list and approve or reject waiting pods.
typeFrameworkHandleinterface {
SnapshotSharedLister() schedulerlisters.SharedLister// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callbackfunc(WaitingPod))
// GetWaitingPod returns a waiting pod given its UID.
GetWaitingPod(uidtypes.UID) WaitingPod// RejectWaitingPod rejects a waiting pod given its UID.
RejectWaitingPod(uidtypes.UID)
// ClientSet returns a kubernetes clientSet.
ClientSet() clientset.InterfaceSharedInformerFactory() informers.SharedInformerFactory// VolumeBinder returns the volume binder used by scheduler.
VolumeBinder() scheduling.SchedulerVolumeBinder}
// NodeAffinity is a plugin that checks if a pod node selector matches the node label.
typeNodeAffinitystruct {
handleframework.FrameworkHandle}
// New initializes a new plugin and returns it.
funcNew(_*runtime.Unknown, hframework.FrameworkHandle) (framework.Plugin, error) {
return&NodeAffinity{handle: h}, nil}
typeextensionPointstruct {
// the set of plugins to be configured at this extension point.
plugins*config.PluginSet// a pointer to the slice storing plugins implementations that will run at this
// extension point.
slicePtrinterface{}
}
There are two steps to plugin initialization. First, plugins are registered. Second, the scheduler uses its configuration to decide which plugins to instantiate. If a plugin registers for multiple extension points, it is instantiated only once.
There are two types of concurrency that plugin writers should consider. A plugin might be invoked several times concurrently when evaluating multiple nodes, and a plugin may be called concurrently from different scheduling contexts.
Note: Within one scheduling context, each extension point is evaluated serially.
In the main thread of the scheduler, only one scheduling cycle is processed at a time. Any extension point up to and including reserve will be finished before the next scheduling cycle begins*. After the reserve phase, the binding cycle is executed asynchronously. This means that a plugin could be called concurrently from two different scheduling contexts, provided that at least one of the calls is to an extension point after reserve. Stateful plugins should take care to handle these situations.
Finally, un-reserve plugins may be called from either the Permit thread or the Bind thread, depending on how the pod was rejected.
* The queue sort extension point is a special case. It is not part of a scheduling context and may be called concurrently for many pod pairs.
image
Configuring Plugins
Interaction with Cluster Autoscaler
调度器核心组件流程
调度器初始化
调度器参数初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
vardefaultSchedulerOptions = schedulerOptions{
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles' default plugins are set from the algorithm provider.
{SchedulerName: v1.DefaultSchedulerName},
},
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(),
},
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}
插件工厂注册表的初始化
插件工厂注册表的初始化分为两个部分in tree和out of tree即当前版本自带的和用户自定义的两部分
funcMakeNextPodFunc(queueSchedulingQueue) func() *framework.PodInfo {
returnfunc() *framework.PodInfo {
podInfo, err:=queue.Pop()
iferr==nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
returnpodInfo }
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
returnnil }
}
func (sched*Scheduler) profileForPod(pod*v1.Pod) (*profile.Profile, error) {
prof, ok:=sched.Profiles[pod.Spec.SchedulerName]
if !ok {
returnnil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
returnprof, nil}
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
func (sched*Scheduler) skipPodSchedule(prof*profile.Profile, pod*v1.Pod) bool {
// Case 1: pod is being deleted.
ifpod.DeletionTimestamp!=nil {
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
returntrue }
// Case 2: pod has been assumed and pod updates could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
ifsched.skipPodUpdate(pod) {
returntrue }
returnfalse}
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName, Annotations,
// ManagedFields, Finalizers and/or Conditions updated.
func (sched*Scheduler) skipPodUpdate(pod*v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err:=sched.SchedulerCache.IsAssumedPod(pod)
iferr!=nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
returnfalse }
if !isAssumed {
returnfalse }
// Gets the assumed pod from the cache.
assumedPod, err:=sched.SchedulerCache.GetPod(pod)
iferr!=nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
returnfalse }
// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f:=func(pod*v1.Pod) *v1.Pod {
p:=pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil// Same as above, when annotations are modified with ServerSideApply,
// ManagedFields may also change and must be excluded
p.ManagedFields = nil// The following might be changed by external controllers, but they don't
// affect scheduling decisions.
p.Finalizers = nilp.Status.Conditions = nilreturnp }
assumedPodCopy, podCopy:=f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
returnfalse }
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
returntrue}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
gofunc() {
bindingCycleCtx, cancel:=context.WithCancel(ctx)
defercancel()
waitOnPermitStatus:=prof.WaitOnPermit(bindingCycleCtx, assumedPod)
// Bind volumes first before Pod
if !allBound {
err:=sched.bindVolumes(assumedPod)
}
// Run "prebind" plugins.
preBindStatus:=prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
err:=sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
// Run "postbind" plugins.
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}()
抢占流程
如果是预选失败的,并且当前调度器允许抢占功能,则会进行抢占调度处理即sched.preempt
1
2
3
4
5
6
7
8
9
10
11
12
13
iffitError, ok:=err.(*core.FitError); ok {
// 如果是预选失败则进行
ifsched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration."+" No preemption is performed.")
} else {
preemptionStartTime:=time.Now()
// 抢占调度
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
获取抢占者
首先通过apiserver获取当前需要执行抢占的pod的最新Pod信息
1
2
3
4
5
preemptor, err:=sched.podPreemptor.getUpdatedPod(preemptor)
iferr!=nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return"", err }
通过抢占算法筛选
通过Preempt筛选要进行抢占操作的node节点、待驱逐的pod、待驱逐的提议的pod
1
2
3
4
5
node, victims, nominatedPodsToClear, err:=sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr)
iferr!=nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return"", err }
for_, victim:=rangevictims {
// 调用apiserver进行删除pod
iferr:=sched.podPreemptor.deletePod(victim); err!=nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return"", err }
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
ifwaitingPod:=fwk.GetWaitingPod(victim.UID); waitingPod!=nil {
waitingPod.Reject("preempted")
}
sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
更新被抢占的提议节点
针对那些已经被提议调度到当前node的pod,会将其node设置为空,重新进行调度选择
1
2
3
4
5
6
7
8
for_, p:=rangenominatedPodsToClear {
// 清理这些提议的pod
rErr:=sched.podPreemptor.removeNominatedNodeName(p)
ifrErr!=nil {
klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
// We do not return as this error is not critical.
}
}