1. 扩展实现目标

Framework的设计在官方文档中已经有明确的描述,当前还没有Stable, 本文目前基于1.18版本聊一聊除了官方描述外的实现的上的一些细节

1.1 阶段扩展点



1.2 context与CycleState


1.3 Bind Permit


2. 核心源码实现

2.1 Framework核心数据结构


2.1.1 插件集合

插件集合中会根据不同的插件类型,来进行分类保存, 其中还有一个插件的优先级存储map,目前只有在优选阶段使用,后续可能会加入预选的优先级

    pluginNameToWeightMap map[string]int queueSortPlugins      []QueueSortPlugin
    preFilterPlugins      []PreFilterPlugin
    filterPlugins         []FilterPlugin
    postFilterPlugins     []PostFilterPlugin
    scorePlugins          []ScorePlugin
    reservePlugins        []ReservePlugin
    preBindPlugins        []PreBindPlugin
    bindPlugins           []BindPlugin
    postBindPlugins       []PostBindPlugin
    unreservePlugins      []UnreservePlugin
    permitPlugins         []PermitPlugin

2.1.2 集群数据获取

主要是集群中的一些数据获取接口的实现,主要是为了实现FrameworkHandle, 该接口主要是提供一些数据的获取的接口和集群操作的接口给插件使用

    clientSet       clientset.Interface
    informerFactory informers.SharedInformerFactory
    volumeBinder    *volumebinder.VolumeBinder
    snapshotSharedLister  schedulerlisters.SharedLister

2.1.3 等待pod集合


    waitingPods           *waitingPodsMap

2.1.4 插件工厂注册表


    registry              Registry

2.2 插件工厂注册表

2.2.1 插件工厂函数


type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)

2.2.2 插件工厂的实现


type Registry map[string]PluginFactory // Register adds a new plugin to the registry. If a plugin with the same name // exists, it returns an error. func (r Registry) Register(name string, factory PluginFactory) error { if _, ok := r[name]; ok { return fmt.Errorf("a plugin named %v already exists", name)
    r[name] = factory return nil } // Unregister removes an existing plugin from the registry. If no plugin with // the provided name exists, it returns an error. func (r Registry) Unregister(name string) error { if _, ok := r[name]; !ok { return fmt.Errorf("no plugin named %v exists", name)
    } delete(r, name) return nil } // Merge merges the provided registry to the current one. func (r Registry) Merge(in Registry) error { for name, factory := range in { if err := r.Register(name, factory); err != nil { return err
    } return nil }

2.3 插件注册实现


2.3.1 Plugins


type Plugins struct { // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort *PluginSet // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework. PreFilter *PluginSet // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod. Filter *PluginSet // PostFilter is a list of plugins that are invoked after filtering out infeasible nodes. PostFilter *PluginSet // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase. Score *PluginSet // Reserve is a list of plugins invoked when reserving a node to run the pod. Reserve *PluginSet // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod. Permit *PluginSet // PreBind is a list of plugins that should be invoked before a pod is bound. PreBind *PluginSet // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework. // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success. Bind *PluginSet // PostBind is a list of plugins that should be invoked after a pod is successfully bound. PostBind *PluginSet // Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase. Unreserve *PluginSet

2.3.2 插件集合映射

该方法主要是为了实现对应插件类型和framework中保存对应插件类型数组的映射, 比如Prefilter与其关联的preFilterPlugins切片,string(插件类型)->[]PreFilterPlugin(&reflect.SliceHeader切片头)

func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint { return []extensionPoint{
        {plugins.PreFilter, &f.preFilterPlugins},
        {plugins.Filter, &f.filterPlugins},
        {plugins.Reserve, &f.reservePlugins},
        {plugins.PostFilter, &f.postFilterPlugins},
        {plugins.Score, &f.scorePlugins},
        {plugins.PreBind, &f.preBindPlugins},
        {plugins.Bind, &f.bindPlugins},
        {plugins.PostBind, &f.postBindPlugins},
        {plugins.Unreserve, &f.unreservePlugins},
        {plugins.Permit, &f.permitPlugins},
        {plugins.QueueSort, &f.queueSortPlugins},

2.3.3 扫描注册所有允许的插件


func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
    pgMap := make(map[string]config.Plugin) if plugins == nil { return pgMap
    } // 构建匿名函数,利用闭包来修改pgMap保存所有允许的插件集合 find := func(pgs *config.PluginSet) { if pgs == nil { return } for _, pg := range pgs.Enabled { // 遍历所有允许的插件集合 pgMap[pg.Name] = pg // 保存到map中 }
    } // 遍历上面的所有映射表 for _, e := range f.getExtensionPoints(plugins) {
    } return pgMap

2.3.4 插件工厂构造插件映射

会调用生成的插件工厂注册表,来通过每个插件的Factory构建Plugin插件实例, 保存到pluginsMap中

pluginsMap := make(map[string]Plugin) for name, factory := range r { // pg即上面生成的pgMap,这里只会生成需要使用的插件 if _, ok := pg[name]; !ok { continue }

        p, err := factory(pluginConfig[name], f) if err != nil { return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
        pluginsMap[name] = p // 进行权重保存 f.pluginNameToWeightMap[name] = int(pg[name].Weight) if f.pluginNameToWeightMap[name] == 0 {
            f.pluginNameToWeightMap[name] = 1 } // Checks totalPriority against MaxTotalScore to avoid overflow if int64(f.pluginNameToWeightMap[name])*MaxNodeScore > MaxTotalScore-totalPriority { return nil, fmt.Errorf("total score of Score plugins could overflow")
        totalPriority += int64(f.pluginNameToWeightMap[name]) * MaxNodeScore

2.3.5 按类型注册插件


    for _, e := range f.getExtensionPoints(plugins) { if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil { return nil, err


func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error { if pluginSet == nil { return nil } // 首先通过Elem获取当前数组的类型 plugins := reflect.ValueOf(pluginList).Elem() // 通过数组类型来获取数组内部元素的类型 pluginType := plugins.Type().Elem()
    set := sets.NewString() for _, ep := range pluginSet.Enabled {
        pg, ok := pluginsMap[ep.Name] if !ok { return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
        } // 合法性检查:如果发现当前插件未实现当前接口,则报错 if !reflect.TypeOf(pg).Implements(pluginType) { return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
        } if set.Has(ep.Name) { return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())

        set.Insert(ep.Name) // 追加插件到slice中,并保存指针指向 newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
    } return nil }

2.4 CycleState


2.4.1 数据结构


type CycleState struct {
    mx      sync.RWMutex
    storage map[StateKey]StateData // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool } // StateData is a generic type for arbitrary data stored in CycleState. type StateData interface { // Clone is an interface to make a copy of StateData. For performance reasons, // clone should make shallow copies for members (e.g., slices or maps) that are not // impacted by PreFilter's optional AddPod/RemovePod methods. Clone() StateData

2.4.2 对外接口实现


func (c *CycleState) Read(key StateKey) (StateData, error) { if v, ok := c.storage[key]; ok { return v, nil } return nil, errors.New(NotFound)
} // Write stores the given "val" in CycleState with the given "key". // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Write(key StateKey, val StateData) {
    c.storage[key] = val
} // Delete deletes data with the given key from CycleState. // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Delete(key StateKey) { delete(c.storage, key)
} // Lock acquires CycleState lock. func (c *CycleState) Lock() {
} // Unlock releases CycleState lock. func (c *CycleState) Unlock() {
} // RLock acquires CycleState read lock. func (c *CycleState) RLock() {
} // RUnlock releases CycleState read lock. func (c *CycleState) RUnlock() {

2.5 waitingPodMap与waitingPod


2.5.1 数据结构


type waitingPodsMap struct {
    pods map[types.UID]WaitingPod
    mu   sync.RWMutex

waitingPod则是一个具体的pod的等待实例,其内部通过pendingPlugins保存插件的定义的 timer等待时间,对外通过chan *status来接受当前pod的状态,并通过读写锁来进行串行化

type waitingPod struct {
    pod            *v1.Pod
    pendingPlugins map[string]*time.Timer
    s chan *Status
    mu             sync.RWMutex

2.5.2 构建waitingPod与计时器

会根据每个plugin的wait等待时间构建N个timer, 如果任一的timer到期,则就拒绝

func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
    wp := &waitingPod{
        pod: pod,
        s: make(chan *Status),

    wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the // lock here so that time.AfterFunc can only execute after newWaitingPod finishes. wp.mu.Lock() defer wp.mu.Unlock() // 根据插件的等待时间来构建timer,如果有任一timer到期,还未曾有任何plugin Allow则会进行Rejectj㐇 for k, v := range pluginsMaxWaitTime {
        plugin, waitTime := k, v
        wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
            msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
                waitTime, plugin)
    } return wp

2.5.3 停止定时器发送拒绝事件


func (w *waitingPod) Reject(msg string) bool {
    w.mu.RLock() defer w.mu.RUnlock() // 停止所有的timer for _, timer := range w.pendingPlugins {
    } // 通过管道发送拒绝事件 select { case w.s <- NewStatus(Unschedulable, msg): return true default: return false }

2.5.4 发送允许调度操作


func (w *waitingPod) Allow(pluginName string) bool {
    w.mu.Lock() defer w.mu.Unlock() if timer, exist := w.pendingPlugins[pluginName]; exist { // 停止当前plugin的定时器 timer.Stop() delete(w.pendingPlugins, pluginName)
    } // Only signal success status after all plugins have allowed if len(w.pendingPlugins) != 0 { return true } // 只有当所有的plugin都允许,才会发生成功允许事件 select { case w.s <- NewStatus(Success, ""): // 发送事件 return true default: return false }

2.5.5 Permit阶段Wait实现


func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
    startTime := time.Now() defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
    pluginsWaitTime := make(map[string]time.Duration)
    statusCode := Success for _, pl := range f.permitPlugins {
        status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() {
                msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
                klog.V(4).Infof(msg) return NewStatus(status.Code(), msg)
            } if status.Code() == Wait { // Not allowed to be greater than maxTimeout. if timeout > maxTimeout {
                    timeout = maxTimeout
                } // 记录当前plugin的等待时间 pluginsWaitTime[pl.Name()] = timeout
                statusCode = Wait
            } else {
                msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
                klog.Error(msg) return NewStatus(Error, msg)
    } // We now wait for the minimum duration if at least one plugin asked to // wait (and no plugin rejected the pod) if statusCode == Wait {
        startTime := time.Now() // 根据插件等待时间构建waitingPod w := newWaitingPod(pod, pluginsWaitTime) // 加入到waitingPods中 f.waitingPods.add(w) // 移除 defer f.waitingPods.remove(pod.UID)
        klog.V(4).Infof("waiting for pod %q at permit", pod.Name) // 等待状态消息 s := <-w.s
        metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) if !s.IsSuccess() { if s.IsUnschedulable() {
                msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
                klog.V(4).Infof(msg) return NewStatus(s.Code(), msg)
            msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
            klog.Error(msg) return NewStatus(Error, msg)
    } return nil }

2.6 插件调用方法实现概览




func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
    startTime := time.Now() defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
    }() for _, pl := range f.preFilterPlugins {
        status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() {
                msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
                klog.V(4).Infof(msg) return NewStatus(status.Code(), msg)
            msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
            klog.Error(msg) return NewStatus(Error, msg)
    } return nil }

2.6.2 RunFilterPlugins


unc (f *framework) RunFilterPlugins(
    ctx context.Context,
    state *CycleState,
    pod *v1.Pod,
    nodeInfo *schedulernodeinfo.NodeInfo,
) PluginToStatus { var firstFailedStatus *Status
    startTime := time.Now() defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime))
    statuses := make(PluginToStatus) for _, pl := range f.filterPlugins {
        pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if len(statuses) == 0 {
            firstFailedStatus = pluginStatus
        } if !pluginStatus.IsSuccess() { if !pluginStatus.IsUnschedulable() { // Filter plugins are not supposed to return any status other than // Success or Unschedulable. firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) return map[string]*Status{pl.Name(): firstFailedStatus}
            statuses[pl.Name()] = pluginStatus if !f.runAllFilters { // 不需要运行所有插件进行退出 return statuses
    } return statuses

今天就到这里吧,调度器修改还是蛮大的,但是可以预见的是,为了更多的调度插件可能都会集中到framework中,对kubernetes scheduler系列的学习,也算是告一段落了,作为一个kubernetes新手学习起来还是有点费劲,还好调度器设计的跟其他模块的耦合性相对小一点

