node_exporter 是 Prometheus 官方提供的一个用于采集计算机硬件信息的 Exporter。几乎所有的指标都来源于文件系统:/proc/sys。这么做的好处显而易见,指标采集的效率非常的高。

项目架构

node_exporter 的入口为 node_exporter.go 文件,同级目录下的 collector 目录包含了所有的采集指标。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# tree -L 2
├── collector
│   ├── arp_linux.go
│   ├── bcache_linux.go
│   ├── bonding_linux.go
│   ├── bonding_linux_test.go
│   ├── boot_time_bsd.go
...
├── node_exporter.go
...

node_exporter 也是按照 Exporter简单入门 中提到的三个步骤,进行编写。

  • 「1」创建采集项 Collector
  • 「2」注册采集项对象
  • 「3」绑定HttpHandler,并启动web服务

只不过为了管理众多的采集指标,提取出了一个注册组件。所有的采集指标,都通过 registerCollector 函数进行注册。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// arp_linux.go
type arpCollector struct {
    fs           procfs.FS
    deviceFilter deviceFilter
    entries      *prometheus.Desc
    logger       log.Logger
}

func init() {
    registerCollector("arp", defaultEnabled, NewARPCollector)
}

// NewARPCollector returns a new Collector exposing ARP stats.
func NewARPCollector(logger log.Logger) (Collector, error) {
    fs, err := procfs.NewFS(*procPath)
    if err != nil {
        return nil, fmt.Errorf("failed to open procfs: %w", err)
    }

    return &arpCollector{
        fs:           fs,
        deviceFilter: newDeviceFilter(*arpDeviceExclude, *arpDeviceInclude),
        entries: prometheus.NewDesc(
            prometheus.BuildFQName(namespace, "arp", "entries"),
            "ARP entries by device",
            []string{"device"}, nil,
        ),
        logger: logger,
    }, nil
}

所有的采集指标都在 collector 目录之下,依赖语言特性,利用在每个指标项对应的 xxx.go 文件中的 init() 函数,实现自动注册。

注册核心代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// collector.go
var (
    factories              = make(map[string]func(logger log.Logger) (Collector, error))
    initiatedCollectorsMtx = sync.Mutex{}
    initiatedCollectors    = make(map[string]Collector)
    collectorState         = make(map[string]*bool)
    forcedCollectors       = map[string]bool{} // collectors which have been explicitly enabled or disabled
)

func registerCollector(collector string, isDefaultEnabled bool, factory func(logger log.Logger) (Collector, error)) {
    var helpDefaultState string
    if isDefaultEnabled {
        helpDefaultState = "enabled"
    } else {
        helpDefaultState = "disabled"
    }

    flagName := fmt.Sprintf("collector.%s", collector)
    flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", collector, helpDefaultState)
    defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

    flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(collector)).Bool()
    collectorState[collector] = flag

    factories[collector] = factory
}

registerCollector 函数,在接收到采集项注册调用后,会做下列事情:

  • 绑定采集项对应的终端启停命令,node_exporter help 信息中的 --[no-]collector.xxxx 便是通过这种形式来实现
  • 填充采集指标启停状态:collectorState[collector] = flag
  • 填充采集指标创建工厂方法:factories[collector] = factory

main 函数(node_exporter.go) 中,会进行 web 服务器 Handler 的创建

1
2
// node_exporter.go
http.Handle(*metricsPath, newHandler(!*disableExporterMetrics, *maxRequests, logger))

newHandler 函数会新建出一个 Http.ServeHTTP 用于响应 API 采集请求。在 newHandler 函数中,会新建一个采集对象 NodeCollector,实现 prometheus.Collector 的接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// collector.go
// NodeCollector implements the prometheus.Collector interface.
type NodeCollector struct {
    Collectors map[string]Collector
    logger     log.Logger
}

// NewNodeCollector creates a new NodeCollector.
func NewNodeCollector(logger log.Logger, filters ...string) (*NodeCollector, error) {
    f := make(map[string]bool)
    for _, filter := range filters {
        enabled, exist := collectorState[filter]
        if !exist {
            return nil, fmt.Errorf("missing collector: %s", filter)
        }
        if !*enabled {
            return nil, fmt.Errorf("disabled collector: %s", filter)
        }
        f[filter] = true
    }
    collectors := make(map[string]Collector)
    initiatedCollectorsMtx.Lock()
    defer initiatedCollectorsMtx.Unlock()
    for key, enabled := range collectorState {
        if !*enabled || (len(f) > 0 && !f[key]) {
            continue
        }
        if collector, ok := initiatedCollectors[key]; ok {
            collectors[key] = collector
        } else {
            collector, err := factories[key](log.With(logger, "collector", key))
            if err != nil {
                return nil, err
            }
            collectors[key] = collector
            initiatedCollectors[key] = collector
        }
    }
    return &NodeCollector{Collectors: collectors, logger: logger}, nil
}

// Describe implements the prometheus.Collector interface.
func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) {
    ch <- scrapeDurationDesc
    ch <- scrapeSuccessDesc
}

// Collect implements the prometheus.Collector interface.
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
    wg := sync.WaitGroup{}
    wg.Add(len(n.Collectors))
    for name, c := range n.Collectors {
        go func(name string, c Collector) {
            execute(name, c, ch, n.logger)    // 见下面内容
            wg.Done()
        }(name, c)
    }
    wg.Wait()
}

NewNodeCollector 函数中,会通过过滤条件,将符合要求的采集指标,通过工厂方法,创建出对应的 采集指标对象,添加到 NodeCollector.Collectors 字典中。

最终 API 采集的请求,会通过 NodeCollector.Collect 进行响应。并发遍历 NodeCollector.Collectors 字典中采集指标,调用采集指标的 Update 函数,进行指标采集。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// collector.go
func execute(name string, c Collector, ch chan<- prometheus.Metric, logger log.Logger) {
    begin := time.Now()
    err := c.Update(ch)
    duration := time.Since(begin)
    var success float64

    if err != nil {
        if IsNoDataError(err) {
            level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
        } else {
            level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
        }
        success = 0
    } else {
        level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
        success = 1
    }
    ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
    ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
}

指标项实现

每个采集指标项,均在 collector 目录之下。需要实现 Update 接口。

1
2
3
4
5
6
// collector.go
// Collector is the interface a collector has to implement.
type Collector interface {
    // Get new metrics and expose them via prometheus registry.
    Update(ch chan<- prometheus.Metric) error
}

Update 接口实现时,通过传入的管道参数,将采集指标的内容输出出去。例如 arpCollector:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *arpCollector) Update(ch chan<- prometheus.Metric) error {
    var enumeratedEntry map[string]uint32

    if *arpNetlink {
        var err error

        enumeratedEntry, err = getTotalArpEntriesRTNL()
        if err != nil {
            return fmt.Errorf("could not get ARP entries: %w", err)
        }
    } else {
        entries, err := c.fs.GatherARPEntries()
        if err != nil {
            return fmt.Errorf("could not get ARP entries: %w", err)
        }

        enumeratedEntry = getTotalArpEntries(entries)
    }

    for device, entryCount := range enumeratedEntry {
        if c.deviceFilter.ignored(device) {
            continue
        }
        ch <- prometheus.MustNewConstMetric(
            c.entries, prometheus.GaugeValue, float64(entryCount), device)
    }

    return nil
}

procfs

procfs 是从 node_exporter 中抽离出来的模块,node_exporter 中的大部分场景,需要从 /proc/sys 目录中读取系统信息。通过 procfs 的封装,能够实现系统信息的对象话。在添加指标项时,直接操作的是对象,很大程度上减轻了 Exporter 层的压力。

procfs 使用上也很简单,初始化一个 procfs 或者 sysfs 对象,然后直接调用对应的结构体方法即可。如下面的 CPU 信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// cpu_linux.go
// NewCPUCollector returns a new Collector exposing kernel/system statistics.
func NewCPUCollector(logger log.Logger) (Collector, error) {
    fs, err := procfs.NewFS(*procPath)
    if err != nil {
        return nil, fmt.Errorf("failed to open procfs: %w", err)
    }

    sysfs, err := sysfs.NewFS(*sysPath)
    if err != nil {
        return nil, fmt.Errorf("failed to open sysfs: %w", err)
    }

    isolcpus, err := sysfs.IsolatedCPUs()
    if err != nil {
        if !os.IsNotExist(err) {
            return nil, fmt.Errorf("Unable to get isolated cpus: %w", err)
        }
        level.Debug(logger).Log("msg", "Could not open isolated file", "error", err)
    }
    ...
}

总结

  • node_exporter 是非常重要的计算机系统信息采集工具
  • node_exporter 框架本质上也是按照 “三板斧” 来
  • 通过 Collector 层的封装,将多个采集指标项合并在一个 Collector 中,实现一个 API 多个指标项的采集,是非常好的 Exporter 实践
  • node_exporter 大部分场景可以直接通过 procfs 模块的调用,来实现系统信息的采集。procfs 是非常nice的系统信息对象话模块,减少了解析系统信息的压力

参考